mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
add support for determining the format of a sandbox report
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import binascii
|
||||
import contextlib
|
||||
@@ -18,6 +19,7 @@ from capa.features.common import (
|
||||
FORMAT_PE,
|
||||
FORMAT_ELF,
|
||||
OS_WINDOWS,
|
||||
FORMAT_CAPE,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
Arch,
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import os
|
||||
import json
|
||||
import inspect
|
||||
import logging
|
||||
import contextlib
|
||||
@@ -18,7 +19,7 @@ from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC6
|
||||
|
||||
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
|
||||
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
|
||||
EXTENSIONS_CAPE = ("json", "json_")
|
||||
EXTENSIONS_DYNAMIC = ("json", "json_")
|
||||
EXTENSIONS_ELF = "elf_"
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
@@ -53,16 +54,25 @@ def assert_never(value) -> NoReturn:
|
||||
assert False, f"Unhandled value: {value} ({type(value).__name__})"
|
||||
|
||||
|
||||
def get_format_from_extension(sample: str) -> str:
|
||||
if sample.endswith(EXTENSIONS_SHELLCODE_32):
|
||||
return FORMAT_SC32
|
||||
elif sample.endswith(EXTENSIONS_SHELLCODE_64):
|
||||
return FORMAT_SC64
|
||||
elif sample.endswith(EXTENSIONS_CAPE):
|
||||
# once we have support for more sandboxes that use json-formatted reports,
|
||||
# we update this logic to ask the user to explicity specify the format
|
||||
def get_format_from_report(sample: str) -> str:
|
||||
with open(sample, "rb") as f:
|
||||
report = json.load(f)
|
||||
if FORMAT_CAPE.upper() in report.keys():
|
||||
return FORMAT_CAPE
|
||||
return FORMAT_UNKNOWN
|
||||
else:
|
||||
# unknown report format
|
||||
return FORMAT_UNKNOWN
|
||||
|
||||
|
||||
def get_format_from_extension(sample: str) -> str:
|
||||
format_ = FORMAT_UNKNOWN
|
||||
if sample.endswith(EXTENSIONS_SHELLCODE_32):
|
||||
format_ = FORMAT_SC32
|
||||
elif sample.endswith(EXTENSIONS_SHELLCODE_64):
|
||||
format_ = FORMAT_SC64
|
||||
elif sample.endswith(EXTENSIONS_DYNAMIC):
|
||||
format_ = get_format_from_report(sample)
|
||||
return format_
|
||||
|
||||
|
||||
def get_auto_format(path: str) -> str:
|
||||
|
||||
@@ -78,10 +78,10 @@ import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.render.verbose as v
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
import capa.features.address
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
from capa.helpers import get_auto_format, log_unsupported_runtime_error
|
||||
from capa.features.common import FORMAT_AUTO, FORMAT_CAPE, FORMAT_FREEZE, is_global_feature
|
||||
from capa.features.extractors.base_extractor import DynamicExtractor, FeatureExtractor
|
||||
|
||||
logger = logging.getLogger("capa.show-features")
|
||||
@@ -115,12 +115,8 @@ def main(argv=None):
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
is_dynamic = (
|
||||
(args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in capa.helpers.EXTENSIONS_CAPE)
|
||||
)
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample)
|
||||
if format_ == FORMAT_FREEZE:
|
||||
# this should be moved above the previous if clause after implementing
|
||||
# feature freeze for the dynamic analysis flavor
|
||||
with open(args.sample, "rb") as f:
|
||||
@@ -129,7 +125,7 @@ def main(argv=None):
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
@@ -138,7 +134,7 @@ def main(argv=None):
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
if is_dynamic:
|
||||
if format_ in (FORMAT_CAPE):
|
||||
print_dynamic_analysis(cast(DynamicExtractor, extractor), args)
|
||||
else:
|
||||
print_static_analysis(extractor, args)
|
||||
@@ -203,7 +199,7 @@ def print_function_features(functions, extractor: FeatureExtractor):
|
||||
print(f"func: {format_address(f.address)}")
|
||||
|
||||
for feature, addr in extractor.extract_function_features(f):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if f.address != addr:
|
||||
@@ -213,7 +209,7 @@ def print_function_features(functions, extractor: FeatureExtractor):
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
for feature, addr in extractor.extract_basic_block_features(f, bb):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if bb.address != addr:
|
||||
@@ -223,7 +219,7 @@ def print_function_features(functions, extractor: FeatureExtractor):
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
for feature, addr in extractor.extract_insn_features(f, bb, insn):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
try:
|
||||
@@ -244,14 +240,14 @@ def print_process_features(processes, extractor: DynamicExtractor):
|
||||
print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})")
|
||||
|
||||
for feature, addr in extractor.extract_process_features(p):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
print(f" proc: {p.inner['name']}: {feature}")
|
||||
|
||||
for t in extractor.get_threads(p):
|
||||
for feature, addr in extractor.extract_thread_features(p, t):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
print(f" thread: {t.tid}: {feature}")
|
||||
|
||||
Reference in New Issue
Block a user