diff --git a/capa/features/extractors/cape/call.py b/capa/features/extractors/cape/call.py index 8e216730..97e235a9 100644 --- a/capa/features/extractors/cape/call.py +++ b/capa/features/extractors/cape/call.py @@ -7,29 +7,24 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Any, Dict, List, Tuple, Iterator +from typing import Tuple, Iterator -import capa.features.extractors.cape.file -import capa.features.extractors.cape.thread -import capa.features.extractors.cape.global_ -import capa.features.extractors.cape.process +from capa.helpers import assert_never from capa.features.insn import API, Number from capa.features.common import String, Feature from capa.features.address import Address +from capa.features.extractors.cape.models import Call from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) -def extract_call_features( - behavior: Dict, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Iterator[Tuple[Feature, Address]]: +def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: """ - this method extrcts the given call's features (api name and arguments), + this method extrcts the given call's features (such as API name and arguments), and returns them as API, Number, and String features. args: - behavior: a dictionary of behavioral artifacts extracted by the sandbox ph: process handle (for defining the extraction scope) th: thread handle (for defining the extraction scope) ch: call handle (for defining the extraction scope) @@ -37,27 +32,29 @@ def extract_call_features( yields: Feature, address; where Feature is either: API, Number, or String. """ - # TODO(yelhamer): find correct base address used at runtime. - # this address may vary from the PE header, may read actual base from procdump.pe.imagebase or similar. - # https://github.com/mandiant/capa/issues/1618 - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) - calls: List[Dict[str, Any]] = process["calls"] - call = calls[ch.address.id] - assert call["thread_id"] == str(th.address.tid) + call: Call = ch.inner + # list similar to disassembly: arguments right-to-left, call - for arg in call["arguments"][::-1]: - try: - yield Number(int(arg["value"], 16)), ch.address - except ValueError: - yield String(arg["value"]), ch.address - yield API(call["api"]), ch.address + for arg in reversed(call.arguments): + if isinstance(arg, list) and len(arg) == 0: + # unsure why CAPE captures arguments as empty lists? + continue + + elif isinstance(arg, str): + yield String(arg), ch.address + + elif isinstance(arg, int): + yield Number(arg), ch.address + + else: + assert_never(arg) + + yield API(call.api), ch.address -def extract_features( - behavior: Dict, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Iterator[Tuple[Feature, Address]]: +def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: for handler in CALL_HANDLERS: - for feature, addr in handler(behavior, ph, th, ch): + for feature, addr in handler(ph, th, ch): yield feature, addr diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index dda9228c..3374ee99 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -14,8 +14,10 @@ import capa.features.extractors.cape.file import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process -from capa.features.common import Feature -from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress +from capa.exceptions import UnsupportedFormatError +from capa.features.common import Feature, Characteristic +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress +from capa.features.extractors.cape.models import CapeReport from capa.features.extractors.base_extractor import ( CallHandle, SampleHashes, @@ -26,26 +28,26 @@ from capa.features.extractors.base_extractor import ( logger = logging.getLogger(__name__) -TESTED_VERSIONS = ("2.2-CAPE",) +TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE"} class CapeExtractor(DynamicFeatureExtractor): - def __init__(self, cape_version: str, static: Dict, behavior: Dict): + def __init__(self, report: CapeReport): super().__init__() - self.cape_version = cape_version - self.static = static - self.behavior = behavior + self.report: CapeReport = report + self.sample_hashes = SampleHashes( - md5=static["file"]["md5"].lower(), - sha1=static["file"]["sha1"].lower(), - sha256=static["file"]["sha256"].lower(), + md5=self.report.target.file.md5.lower(), + sha1=self.report.target.file.sha1.lower(), + sha256=self.report.target.file.sha256.lower(), ) - self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) + self.global_features = capa.features.extractors.cape.global_.extract_features(self.report) def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]: # value according to the PE header, the actual trace may use a different imagebase - return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) + assert self.report.static is not None and self.report.static.pe is not None + return AbsoluteVirtualAddress(self.report.static.pe.imagebase) def get_sample_hashes(self) -> SampleHashes: return self.sample_hashes @@ -54,44 +56,43 @@ class CapeExtractor(DynamicFeatureExtractor): yield from self.global_features def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: - yield from capa.features.extractors.cape.file.extract_features(self.static) + yield from capa.features.extractors.cape.file.extract_features(self.report) def get_processes(self) -> Iterator[ProcessHandle]: - yield from capa.features.extractors.cape.file.get_processes(self.behavior) + yield from capa.features.extractors.cape.file.get_processes(self.report) def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: - yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph) + yield from capa.features.extractors.cape.process.extract_features(ph) def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: - yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph) + yield from capa.features.extractors.cape.process.get_threads(ph) def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: - yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th) + if False: + # force this routine to be a generator, + # but we don't actually have any elements to generate. + yield Characteristic("never"), NO_ADDRESS + return def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: - yield from capa.features.extractors.cape.thread.get_calls(self.behavior, ph, th) + yield from capa.features.extractors.cape.thread.get_calls(ph, th) def extract_call_features( self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle ) -> Iterator[Tuple[Feature, Address]]: - yield from capa.features.extractors.cape.call.extract_features(self.behavior, ph, th, ch) + yield from capa.features.extractors.cape.call.extract_features(ph, th, ch) @classmethod def from_report(cls, report: Dict) -> "CapeExtractor": - cape_version = report["info"]["version"] - if cape_version not in TESTED_VERSIONS: - logger.warning("CAPE version '%s' not tested/supported yet", cape_version) + cr = CapeReport.model_validate(report) - static = report["static"] - format_ = list(static.keys())[0] - static = static[format_] - static.update(report["behavior"].pop("summary")) - static.update(report["target"]) - static.update({"processtree": report["behavior"]["processtree"]}) - static.update({"strings": report["strings"]}) - static.update({"format": format_}) + if cr.info.version not in TESTED_VERSIONS: + logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version) - behavior = report.pop("behavior") - behavior["network"] = report.pop("network") + if cr.static is None: + raise UnsupportedFormatError("CAPE report missing static analysis") - return cls(cape_version, static, behavior) + if cr.static.pe is None: + raise UnsupportedFormatError("CAPE report missing static analysis") + + return cls(cr) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 61a8c790..34821975 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -7,106 +7,98 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Dict, Tuple, Iterator +from typing import Tuple, Iterator from capa.features.file import Export, Import, Section from capa.features.common import String, Feature from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress from capa.features.extractors.helpers import generate_symbols +from capa.features.extractors.cape.models import CapeReport from capa.features.extractors.base_extractor import ProcessHandle logger = logging.getLogger(__name__) -def get_processes(static: Dict) -> Iterator[ProcessHandle]: +def get_processes(report: CapeReport) -> Iterator[ProcessHandle]: """ get all the created processes for a sample """ - - def rec(process): - address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"]) - inner: Dict[str, str] = {"name": process["name"]} - yield ProcessHandle(address=address, inner=inner) - for child in process["children"]: - yield from rec(child) - - for process in static["processtree"]: - yield from rec(process) + for process in report.behavior.processes: + addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id) + yield ProcessHandle(address=addr, inner=process) -def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: +def extract_import_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: """ extract imported function names """ - imports = static["imports"] + assert report.static is not None and report.static.pe is not None + imports = report.static.pe.imports - """ - 2.2-CAPE - "imports": [ - { - "dll": "RPCRT4.dll", - "imports": [{"address": "0x40504c","name": "NdrSimpleTypeUnmarshall"}, ...] - }, - ... - ] - - 2.4-CAPE - "imports": { - "ADVAPI32": { - "dll": "ADVAPI32.dll", - "imports": [{"address": "0x522000", "name": "OpenSCManagerA"}, ...], - ... - }, - ... - } - """ if isinstance(imports, dict): - imports = imports.values() + imports = list(imports.values()) + + assert isinstance(imports, list) for library in imports: - for function in library["imports"]: - addr = int(function["address"], 16) - for name in generate_symbols(library["dll"], function["name"]): - yield Import(name), AbsoluteVirtualAddress(addr) + for function in library.imports: + for name in generate_symbols(library.dll, function.name): + yield Import(name), AbsoluteVirtualAddress(function.address) -def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: - for function in static["exports"]: - name, address = function["name"], int(function["address"], 16) - yield Export(name), AbsoluteVirtualAddress(address) +def extract_export_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + assert report.static is not None and report.static.pe is not None + for function in report.static.pe.exports: + yield Export(function.name), AbsoluteVirtualAddress(function.address) -def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: - # be consistent with static extractors and use section VA - base = int(static["imagebase"], 16) - for section in static["sections"]: - name, address = section["name"], int(section["virtual_address"], 16) - yield Section(name), AbsoluteVirtualAddress(base + address) +def extract_section_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + assert report.static is not None and report.static.pe is not None + for section in report.static.pe.sections: + yield Section(section.name), AbsoluteVirtualAddress(section.virtual_address) -def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]: - for string_ in static["strings"]: - yield String(string_), NO_ADDRESS +def extract_file_strings(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + if report.strings is not None: + for string in report.strings: + yield String(string), NO_ADDRESS -def extract_used_regkeys(static: Dict) -> Iterator[Tuple[Feature, Address]]: - for regkey in static["keys"]: +def extract_used_regkeys(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for regkey in report.behavior.summary.keys: yield String(regkey), NO_ADDRESS -def extract_used_files(static: Dict) -> Iterator[Tuple[Feature, Address]]: - for filename in static["files"]: - yield String(filename), NO_ADDRESS +def extract_used_files(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for file in report.behavior.summary.files: + yield String(file), NO_ADDRESS -def extract_used_mutexes(static: Dict) -> Iterator[Tuple[Feature, Address]]: - for mutex in static["mutexes"]: +def extract_used_mutexes(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for mutex in report.behavior.summary.mutexes: yield String(mutex), NO_ADDRESS -def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]: +def extract_used_commands(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for cmd in report.behavior.summary.executed_commands: + yield String(cmd), NO_ADDRESS + + +def extract_used_apis(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for symbol in report.behavior.summary.resolved_apis: + yield String(symbol), NO_ADDRESS + + +def extract_used_services(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: + for svc in report.behavior.summary.created_services: + yield String(svc), NO_ADDRESS + for svc in report.behavior.summary.started_services: + yield String(svc), NO_ADDRESS + + +def extract_features(report: CapeReport) -> Iterator[Tuple[Feature, Address]]: for handler in FILE_HANDLERS: - for feature, addr in handler(static): + for feature, addr in handler(report): yield feature, addr @@ -118,4 +110,6 @@ FILE_HANDLERS = ( extract_used_regkeys, extract_used_files, extract_used_mutexes, + extract_used_apis, + extract_used_services, ) diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index d6219e25..9d5b7ace 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -63,6 +63,10 @@ EmptyDict: TypeAlias = BaseModel EmptyList: TypeAlias = List[Any] +class Info(FlexibleModel): + version: str + + class ImportedSymbol(ExactModel): address: HexInt name: str @@ -251,7 +255,7 @@ class ProcessFile(File): class Argument(ExactModel): name: str # unsure why empty list is provided here - value: Union[HexInt, str, EmptyList] + value: Union[HexInt, int, str, EmptyList] pretty_value: Optional[str] = None @@ -359,6 +363,8 @@ class CAPE(ExactModel): class CapeReport(FlexibleModel): # the input file, I think target: Target + # info about the processing job, like machine and distributed metadata. + info: Info # # static analysis results @@ -397,8 +403,6 @@ class CapeReport(FlexibleModel): # screenshot hash values deduplicated_shots: Skip = None - # info about the processing job, like machine and distributed metadata. - info: Skip = None # k-v pairs describing the time it took to run each stage. statistics: Skip = None # k-v pairs of ATT&CK ID to signature name or similar. diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index e94c43dd..909a9637 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -7,50 +7,41 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Dict, List, Tuple, Iterator +from typing import List, Tuple, Iterator -import capa.features.extractors.cape.file -import capa.features.extractors.cape.thread -import capa.features.extractors.cape.global_ -import capa.features.extractors.cape.process from capa.features.common import String, Feature from capa.features.address import Address, ThreadAddress +from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) -def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]: +def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]: """ get the threads associated with a given process """ - - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) - threads: List = process["threads"] + process: Process = ph.inner + threads: List[int] = process.threads for thread in threads: - address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread)) + address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread) yield ThreadHandle(address=address, inner={}) -def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: +def extract_environ_strings(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: """ extract strings from a process' provided environment variables. """ + process: Process = ph.inner - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) - environ: Dict[str, str] = process["environ"] - - if not environ: - return - - for value in (value for value in environ.values() if value): + for value in (value for value in process.environ.values() if value): yield String(value), ph.address -def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: +def extract_features(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: for handler in PROCESS_HANDLERS: - for feature, addr in handler(behavior, ph): + for feature, addr in handler(ph): yield feature, addr diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index dc509a8d..24c2d3b2 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -7,38 +7,22 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Any, Dict, List, Tuple, Iterator +from typing import Iterator -import capa.features.extractors.cape.helpers -from capa.features.common import Feature -from capa.features.address import NO_ADDRESS, Address, DynamicCallAddress +from capa.features.address import DynamicCallAddress +from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) -def get_calls(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) - calls: List[Dict[str, Any]] = process["calls"] +def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: + process: Process = ph.inner - tid = str(th.address.tid) - for call in calls: - if call["thread_id"] != tid: + tid = th.address.tid + for call_index, call in enumerate(process.calls): + if call.thread_id != tid: continue - addr = DynamicCallAddress(thread=th.address, id=call["id"]) - ch = CallHandle(address=addr, inner={}) - yield ch - - -def extract_thread_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: - yield from ((Feature(0), NO_ADDRESS),) - - -def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: - for handler in THREAD_HANDLERS: - for feature, addr in handler(behavior, ph, th): - yield feature, addr - - -THREAD_HANDLERS = (extract_thread_features,) + addr = DynamicCallAddress(thread=th.address, id=call_index) + yield CallHandle(address=addr, inner=call)