diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index 01836fee..79be0b24 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) class CapeExtractor(DynamicExtractor): - def __init__(self, static: Dict, behavior: Dict, network: Dict): + def __init__(self, static: Dict, behavior: Dict): super().__init__() self.static = static self.behavior = behavior @@ -30,7 +30,7 @@ class CapeExtractor(DynamicExtractor): def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: yield from self.global_features - def get_file_features(self) -> Iterator[Tuple[Feature, Address]]: + def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.cape.file.extract_features(self.static) def get_processes(self) -> Iterator[ProcessHandle]: @@ -39,19 +39,19 @@ class CapeExtractor(DynamicExtractor): def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph) - def get_threads(self, ph: ProcessHandle) -> Iterator[ProcessHandle]: + def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph) def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th) @classmethod - def from_report(cls, report: Dict) -> "DynamicExtractor": + def from_report(cls, report: Dict) -> "CapeExtractor": static = report["static"] format_ = list(static.keys())[0] static = static[format_] - static.update(report["target"]) static.update(report["behavior"].pop("summary")) + static.update(report["target"]) static.update({"processtree": report["behavior"]["processtree"]}) static.update({"strings": report["strings"]}) static.update({"format": format_}) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 12caad2b..fcace6d1 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -7,9 +7,9 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Any, Dict, List, Tuple, Iterator +from typing import Dict, Tuple, Iterator -from capa.features.file import Export, Import, Section, FunctionName +from capa.features.file import Export, Import, Section from capa.features.common import String, Feature from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import ProcessHandle @@ -21,13 +21,15 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]: """ get all the created processes for a sample """ + def rec(process): inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} yield ProcessHandle(pid=process["pid"], inner=inner) for child in process["children"]: - rec(child) + yield from rec(child) - yield from rec(static["processtree"]) + for process in static["processtree"]: + yield from rec(process) def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: @@ -35,20 +37,21 @@ def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: extract the names of imported library files, for example: USER32.dll """ for library in static["imports"]: - name, address = library["name"], int(library["virtual_address"], 16) - yield Import(name), address + for function in library["imports"]: + name, address = function["name"], int(function["address"], 16) + yield Import(name), AbsoluteVirtualAddress(address) def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: for function in static["exports"]: - name, address = function["name"], int(function["virtual_address"], 16) - yield Export(name), address + name, address = function["name"], int(function["address"], 16) + yield Export(name), AbsoluteVirtualAddress(address) def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: for section in static["sections"]: name, address = section["name"], int(section["virtual_address"], 16) - yield Section(name), address + yield Section(name), AbsoluteVirtualAddress(address) def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/global_.py b/capa/features/extractors/cape/global_.py index 6479f109..70b5d2bf 100644 --- a/capa/features/extractors/cape/global_.py +++ b/capa/features/extractors/cape/global_.py @@ -32,51 +32,51 @@ logger = logging.getLogger(__name__) def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]: # operating systems recognized by the file command: https://github.com/file/file/blob/master/src/readelf.c#L609 if "Linux" in file_output: - return OS(OS_LINUX), NO_ADDRESS + yield OS(OS_LINUX), NO_ADDRESS elif "Hurd" in file_output: - return OS("hurd"), NO_ADDRESS + yield OS("hurd"), NO_ADDRESS elif "Solaris" in file_output: - return OS("solaris"), NO_ADDRESS + yield OS("solaris"), NO_ADDRESS elif "kFreeBSD" in file_output: - return OS("freebsd"), NO_ADDRESS + yield OS("freebsd"), NO_ADDRESS elif "kNetBSD" in file_output: - return OS("netbsd"), NO_ADDRESS + yield OS("netbsd"), NO_ADDRESS else: - return OS(OS_ANY), NO_ADDRESS + yield OS(OS_ANY), NO_ADDRESS def extract_arch(static) -> Iterator[Tuple[Feature, Address]]: - if "Intel 80386" in static["target"]["type"]: - return Arch(ARCH_I386), NO_ADDRESS - elif "x86-64" in static["target"]["type"]: - return Arch(ARCH_AMD64), NO_ADDRESS + if "Intel 80386" in static["file"]["type"]: + yield Arch(ARCH_I386), NO_ADDRESS + elif "x86-64" in static["file"]["type"]: + yield Arch(ARCH_AMD64), NO_ADDRESS else: - return Arch(ARCH_ANY) + yield Arch(ARCH_ANY), NO_ADDRESS def extract_format(static) -> Iterator[Tuple[Feature, Address]]: - if "PE" in static["target"]["type"]: - return Format(FORMAT_PE), NO_ADDRESS - elif "ELF" in static["target"]["type"]: - return Format(FORMAT_ELF), NO_ADDRESS + if "PE" in static["file"]["type"]: + yield Format(FORMAT_PE), NO_ADDRESS + elif "ELF" in static["file"]["type"]: + yield Format(FORMAT_ELF), NO_ADDRESS else: - logger.debug(f"unknown file format, file command output: {static['target']['type']}") - return Format(FORMAT_UNKNOWN), NO_ADDRESS + logger.debug(f"unknown file format, file command output: {static['file']['type']}") + yield Format(FORMAT_UNKNOWN), NO_ADDRESS def extract_os(static) -> Iterator[Tuple[Feature, Address]]: # this variable contains the output of the file command - file_command = static["target"]["type"] + file_command = static["file"]["type"] if "WINDOWS" in file_command: - return OS(OS_WINDOWS), NO_ADDRESS + yield OS(OS_WINDOWS), NO_ADDRESS elif "ELF" in file_command: # implement os guessing from the cape trace - return guess_elf_os(file_command) + yield from guess_elf_os(file_command) else: # the sample is shellcode logger.debug(f"unsupported file format, file command output: {file_command}") - return OS(OS_ANY), NO_ADDRESS + yield OS(OS_ANY), NO_ADDRESS def extract_features(static) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index efb11299..8139e4a3 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -19,37 +19,27 @@ from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, logger = logging.getLogger(__name__) -def get_processes(behavior: Dict) -> Iterator[ProcessHandle]: - """ - get all created processes for a sample - """ - for process in behavior["processes"]: - inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} - yield ProcessHandle(pid=process["process_id"], inner=inner) - - -def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: +def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]: """ get a thread's child processes """ - threads: List = None for process in behavior["processes"]: if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: - threads = process["threads"] + threads: List = process["threads"] for thread in threads: - yield ThreadHandle(int(thread)) + yield ThreadHandle(int(thread), inner={}) def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: """ extract strings from a process' provided environment variables. """ - environ: Dict[str, str] = None + for process in behavior["processes"]: if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: - environ = process["environ"] + environ: Dict[str, str] = process["environ"] if not environ: return diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 9a4438d2..3a1217c9 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -11,7 +11,7 @@ from typing import Any, Dict, List, Tuple, Iterator from capa.features.insn import API, Number from capa.features.common import String, Feature -from capa.features.address import Address +from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -31,17 +31,24 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) - Feature, address; where Feature is either: API, Number, or String. """ - calls: List[Dict] = None for process in behavior["processes"]: if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: - calls: List[Dict] = process + calls: List[Dict] = process["calls"] tid = str(th.tid) for call in calls: if call["thread_id"] != tid: continue - yield Number(int(call["return"], 16)), int(call["caller"], 16) - yield API(call["api"]), int(call["caller"], 16) + + caller = int(call["caller"], 16) + caller = AbsoluteVirtualAddress(caller) + for arg in call["arguments"]: + try: + yield Number(int(arg["value"], 16)), caller + except ValueError: + continue + yield Number(int(call["return"], 16)), caller + yield API(call["api"]), caller def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/insn.py b/capa/features/insn.py index 1e977e5a..4f4a78d0 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -25,8 +25,8 @@ class API(Feature): if signature.isidentifier(): # api call is in the legacy format super().__init__(signature, description=description) - self.args = {} - self.ret = False + self.args: Dict[str, str] = {} + self.ret = "" else: # api call is in the strace format and therefore has to be parsed name, self.args, self.ret = self.parse_signature(signature) @@ -43,30 +43,32 @@ class API(Feature): return False assert isinstance(other, API) - if {} in (self.args, other.args) or False in (self.ret, other.ret): + if {} in (self.args, other.args) or "" in (self.ret, other.ret): # Legacy API feature return super().__eq__(other) # API call with arguments return super().__eq__(other) and self.args == other.args and self.ret == other.ret - def parse_signature(self, signature: str) -> Tuple[str, Optional[Dict[str, str]], Optional[str]]: + def parse_signature(self, signature: str) -> Tuple[str, Dict[str, str], str]: # todo: optimize this method and improve the code quality import re - args = ret = False + args: Dict[str, str] = {} + ret = "" match = re.findall(r"(.+\(.*\)) ?=? ?([^=]*)", signature) if not match: - return "", None, None + return "", {}, "" if len(match[0]) == 2: ret = match[0][1] match = re.findall(r"(.*)\((.*)\)", match[0][0]) if len(match[0]) == 2: - args = (match[0][1] + ", ").split(", ") + args_: Dict[str, str] = (match[0][1] + ", ").split(", ") map(lambda x: {f"arg{x[0]}": x[1]}, enumerate(args)) - args = [{} | arg for arg in args][0] + for num, arg in enumerate(args_): + args.update({f"arg {0}": arg}) return match[0][0], args, ret