improve and fix various dynamic parts (#1809)

* improve and fix various dynamic parts
This commit is contained in:
Moritz
2023-10-18 10:59:41 +02:00
committed by GitHub
parent 21f9e0736d
commit 2cfd45022a
8 changed files with 99 additions and 23 deletions

View File

@@ -19,3 +19,7 @@ class UnsupportedArchError(ValueError):
class UnsupportedOSError(ValueError):
pass
class EmptyReportError(ValueError):
pass

View File

@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
"""
this method extrcts the given call's features (such as API name and arguments),
this method extracts the given call's features (such as API name and arguments),
and returns them as API, Number, and String features.
args:

View File

@@ -14,10 +14,10 @@ import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.exceptions import UnsupportedFormatError
from capa.exceptions import EmptyReportError, UnsupportedFormatError
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.cape.models import CapeReport
from capa.features.extractors.cape.models import Static, CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
@@ -85,10 +85,18 @@ class CapeExtractor(DynamicFeatureExtractor):
if cr.info.version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version)
# observed in 2.4-CAPE reports from capesandbox.com
if cr.static is None and cr.target.file.pe is not None:
cr.static = Static()
cr.static.pe = cr.target.file.pe
if cr.static is None:
raise UnsupportedFormatError("CAPE report missing static analysis")
if cr.static.pe is None:
raise UnsupportedFormatError("CAPE report missing PE analysis")
if len(cr.behavior.processes) == 0:
raise EmptyReportError("CAPE did not capture any processes")
return cls(cr)

View File

@@ -132,13 +132,21 @@ class DigitalSigner(FlexibleModel):
extensions_subjectKeyIdentifier: Optional[str] = None
class AuxSigner(ExactModel):
name: str
issued_to: str = Field(alias="Issued to")
issued_by: str = Field(alias="Issued by")
expires: str = Field(alias="Expires")
sha1_hash: str = Field(alias="SHA1 hash")
class Signer(ExactModel):
aux_sha1: Optional[TODO] = None
aux_timestamp: Optional[None] = None
aux_sha1: Optional[str] = None
aux_timestamp: Optional[str] = None
aux_valid: Optional[bool] = None
aux_error: Optional[bool] = None
aux_error_desc: Optional[str] = None
aux_signers: Optional[ListTODO] = None
aux_signers: Optional[List[AuxSigner]] = None
class Overlay(ExactModel):
@@ -197,7 +205,10 @@ class PE(ExactModel):
guest_signers: Signer
class File(ExactModel):
# TODO(mr-tz): target.file.dotnet, target.file.extracted_files, target.file.extracted_files_tool,
# target.file.extracted_files_time
# https://github.com/mandiant/capa/issues/1814
class File(FlexibleModel):
type: str
cape_type_code: Optional[int] = None
cape_type: Optional[str] = None
@@ -350,6 +361,7 @@ class Behavior(ExactModel):
class Target(ExactModel):
category: str
file: File
pe: Optional[PE] = None
class Static(ExactModel):
@@ -385,7 +397,7 @@ class CapeReport(FlexibleModel):
# post-processed results: payloads and extracted configs
CAPE: Optional[Cape] = None
dropped: Optional[List[File]] = None
procdump: List[ProcessFile]
procdump: Optional[List[ProcessFile]] = None
procmemory: ListTODO
# =========================================================================

View File

@@ -10,6 +10,7 @@ import logging
from typing import Iterator
from capa.features.address import DynamicCallAddress
from capa.features.extractors.helpers import is_aw_function
from capa.features.extractors.cape.models import Process
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
@@ -24,5 +25,22 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
if call.thread_id != tid:
continue
addr = DynamicCallAddress(thread=th.address, id=call_index)
yield CallHandle(address=addr, inner=call)
for symbol in generate_symbols(call.api):
call.api = symbol
addr = DynamicCallAddress(thread=th.address, id=call_index)
yield CallHandle(address=addr, inner=call)
def generate_symbols(symbol: str) -> Iterator[str]:
"""
for a given symbol name, generate variants.
we over-generate features to make matching easier.
"""
# CreateFileA
yield symbol
if is_aw_function(symbol):
# CreateFile
yield symbol[:-1]

View File

@@ -156,9 +156,9 @@ def log_unsupported_format_error():
logger.error("-" * 80)
def log_unsupported_cape_report_error():
def log_unsupported_cape_report_error(error: str):
logger.error("-" * 80)
logger.error(" Input file is not a valid CAPE report.")
logger.error("Input file is not a valid CAPE report: %s", error)
logger.error(" ")
logger.error(" capa currently only supports analyzing standard CAPE json reports.")
logger.error(
@@ -167,6 +167,14 @@ def log_unsupported_cape_report_error():
logger.error("-" * 80)
def log_empty_cape_report_error(error: str):
logger.error("-" * 80)
logger.error(" CAPE report is empty or only contains little useful data: %s", error)
logger.error(" ")
logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.")
logger.error("-" * 80)
def log_unsupported_os_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported OS.")

View File

@@ -62,10 +62,17 @@ from capa.helpers import (
log_unsupported_os_error,
redirecting_print_to_tqdm,
log_unsupported_arch_error,
log_empty_cape_report_error,
log_unsupported_format_error,
log_unsupported_cape_report_error,
)
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError
from capa.exceptions import (
EmptyReportError,
UnsupportedOSError,
UnsupportedArchError,
UnsupportedFormatError,
UnsupportedRuntimeError,
)
from capa.features.common import (
OS_AUTO,
OS_LINUX,
@@ -1501,12 +1508,17 @@ def main(argv: Optional[List[str]] = None):
except (ELFError, OverflowError) as e:
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
return E_CORRUPT_FILE
except UnsupportedFormatError:
except UnsupportedFormatError as e:
if format_ == FORMAT_CAPE:
log_unsupported_cape_report_error()
log_unsupported_cape_report_error(str(e))
else:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except EmptyReportError as e:
if format_ == FORMAT_CAPE:
log_empty_cape_report_error(str(e))
else:
log_unsupported_format_error()
for file_extractor in file_extractors:
if isinstance(file_extractor, DynamicFeatureExtractor):
@@ -1564,6 +1576,9 @@ def main(argv: Optional[List[str]] = None):
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
# TODO(mr-tz): this should be wrapped and refactored as it's tedious to update everywhere
# see same code and show-features above examples
# https://github.com/mandiant/capa/issues/1813
try:
extractor = get_extractor(
args.sample,
@@ -1574,9 +1589,9 @@ def main(argv: Optional[List[str]] = None):
should_save_workspace,
disable_progress=args.quiet or args.debug,
)
except UnsupportedFormatError:
except UnsupportedFormatError as e:
if format_ == FORMAT_CAPE:
log_unsupported_cape_report_error()
log_unsupported_cape_report_error(str(e))
else:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE

View File

@@ -83,7 +83,15 @@ import capa.features.address
import capa.features.extractors.pefile
from capa.helpers import get_auto_format, log_unsupported_runtime_error
from capa.features.insn import API, Number
from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, String, Feature, is_global_feature
from capa.features.common import (
FORMAT_AUTO,
FORMAT_CAPE,
FORMAT_FREEZE,
DYNAMIC_FORMATS,
String,
Feature,
is_global_feature,
)
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger("capa.show-features")
@@ -132,8 +140,11 @@ def main(argv=None):
extractor = capa.main.get_extractor(
args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
except capa.exceptions.UnsupportedFormatError as e:
if format_ == FORMAT_CAPE:
capa.helpers.log_unsupported_cape_report_error(str(e))
else:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
log_unsupported_runtime_error()
@@ -248,13 +259,13 @@ def print_static_features(functions, extractor: StaticFeatureExtractor):
def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
for p in processes:
print(f"proc: {p.inner['name']} (ppid={p.address.ppid}, pid={p.address.pid})")
print(f"proc: {p.inner.process_name} (ppid={p.address.ppid}, pid={p.address.pid})")
for feature, addr in extractor.extract_process_features(p):
if is_global_feature(feature):
continue
print(f" proc: {p.inner['name']}: {feature}")
print(f" proc: {p.inner.process_name}: {feature}")
for t in extractor.get_threads(p):
print(f" thread: {t.address.tid}")
@@ -283,7 +294,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
print(f" arguments=[{', '.join(arguments)}]")
for cid, api in apis:
print(f"call {cid}: {api}({', '.join(arguments)})")
print(f" call {cid}: {api}({', '.join(arguments)})")
def ida_main():