fix bugs and refactor code

This commit is contained in:
Yacine Elhamer
2023-06-19 23:40:09 +01:00
parent d6fa832d83
commit 3c8abab574
6 changed files with 65 additions and 63 deletions

View File

@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
class CapeExtractor(DynamicExtractor):
def __init__(self, static: Dict, behavior: Dict, network: Dict):
def __init__(self, static: Dict, behavior: Dict):
super().__init__()
self.static = static
self.behavior = behavior
@@ -30,7 +30,7 @@ class CapeExtractor(DynamicExtractor):
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def get_file_features(self) -> Iterator[Tuple[Feature, Address]]:
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.file.extract_features(self.static)
def get_processes(self) -> Iterator[ProcessHandle]:
@@ -39,19 +39,19 @@ class CapeExtractor(DynamicExtractor):
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph)
def get_threads(self, ph: ProcessHandle) -> Iterator[ProcessHandle]:
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph)
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th)
@classmethod
def from_report(cls, report: Dict) -> "DynamicExtractor":
def from_report(cls, report: Dict) -> "CapeExtractor":
static = report["static"]
format_ = list(static.keys())[0]
static = static[format_]
static.update(report["target"])
static.update(report["behavior"].pop("summary"))
static.update(report["target"])
static.update({"processtree": report["behavior"]["processtree"]})
static.update({"strings": report["strings"]})
static.update({"format": format_})

View File

@@ -7,9 +7,9 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
from typing import Dict, Tuple, Iterator
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ProcessHandle
@@ -21,13 +21,15 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]:
"""
get all the created processes for a sample
"""
def rec(process):
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]}
yield ProcessHandle(pid=process["pid"], inner=inner)
for child in process["children"]:
rec(child)
yield from rec(child)
yield from rec(static["processtree"])
for process in static["processtree"]:
yield from rec(process)
def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
@@ -35,20 +37,21 @@ def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
extract the names of imported library files, for example: USER32.dll
"""
for library in static["imports"]:
name, address = library["name"], int(library["virtual_address"], 16)
yield Import(name), address
for function in library["imports"]:
name, address = function["name"], int(function["address"], 16)
yield Import(name), AbsoluteVirtualAddress(address)
def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for function in static["exports"]:
name, address = function["name"], int(function["virtual_address"], 16)
yield Export(name), address
name, address = function["name"], int(function["address"], 16)
yield Export(name), AbsoluteVirtualAddress(address)
def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for section in static["sections"]:
name, address = section["name"], int(section["virtual_address"], 16)
yield Section(name), address
yield Section(name), AbsoluteVirtualAddress(address)
def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -32,51 +32,51 @@ logger = logging.getLogger(__name__)
def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]:
# operating systems recognized by the file command: https://github.com/file/file/blob/master/src/readelf.c#L609
if "Linux" in file_output:
return OS(OS_LINUX), NO_ADDRESS
yield OS(OS_LINUX), NO_ADDRESS
elif "Hurd" in file_output:
return OS("hurd"), NO_ADDRESS
yield OS("hurd"), NO_ADDRESS
elif "Solaris" in file_output:
return OS("solaris"), NO_ADDRESS
yield OS("solaris"), NO_ADDRESS
elif "kFreeBSD" in file_output:
return OS("freebsd"), NO_ADDRESS
yield OS("freebsd"), NO_ADDRESS
elif "kNetBSD" in file_output:
return OS("netbsd"), NO_ADDRESS
yield OS("netbsd"), NO_ADDRESS
else:
return OS(OS_ANY), NO_ADDRESS
yield OS(OS_ANY), NO_ADDRESS
def extract_arch(static) -> Iterator[Tuple[Feature, Address]]:
if "Intel 80386" in static["target"]["type"]:
return Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in static["target"]["type"]:
return Arch(ARCH_AMD64), NO_ADDRESS
if "Intel 80386" in static["file"]["type"]:
yield Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in static["file"]["type"]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
return Arch(ARCH_ANY)
yield Arch(ARCH_ANY), NO_ADDRESS
def extract_format(static) -> Iterator[Tuple[Feature, Address]]:
if "PE" in static["target"]["type"]:
return Format(FORMAT_PE), NO_ADDRESS
elif "ELF" in static["target"]["type"]:
return Format(FORMAT_ELF), NO_ADDRESS
if "PE" in static["file"]["type"]:
yield Format(FORMAT_PE), NO_ADDRESS
elif "ELF" in static["file"]["type"]:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
logger.debug(f"unknown file format, file command output: {static['target']['type']}")
return Format(FORMAT_UNKNOWN), NO_ADDRESS
logger.debug(f"unknown file format, file command output: {static['file']['type']}")
yield Format(FORMAT_UNKNOWN), NO_ADDRESS
def extract_os(static) -> Iterator[Tuple[Feature, Address]]:
# this variable contains the output of the file command
file_command = static["target"]["type"]
file_command = static["file"]["type"]
if "WINDOWS" in file_command:
return OS(OS_WINDOWS), NO_ADDRESS
yield OS(OS_WINDOWS), NO_ADDRESS
elif "ELF" in file_command:
# implement os guessing from the cape trace
return guess_elf_os(file_command)
yield from guess_elf_os(file_command)
else:
# the sample is shellcode
logger.debug(f"unsupported file format, file command output: {file_command}")
return OS(OS_ANY), NO_ADDRESS
yield OS(OS_ANY), NO_ADDRESS
def extract_features(static) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -19,37 +19,27 @@ from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle,
logger = logging.getLogger(__name__)
def get_processes(behavior: Dict) -> Iterator[ProcessHandle]:
"""
get all created processes for a sample
"""
for process in behavior["processes"]:
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]}
yield ProcessHandle(pid=process["process_id"], inner=inner)
def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]:
"""
get a thread's child processes
"""
threads: List = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
threads = process["threads"]
threads: List = process["threads"]
for thread in threads:
yield ThreadHandle(int(thread))
yield ThreadHandle(int(thread), inner={})
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract strings from a process' provided environment variables.
"""
environ: Dict[str, str] = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
environ = process["environ"]
environ: Dict[str, str] = process["environ"]
if not environ:
return

View File

@@ -11,7 +11,7 @@ from typing import Any, Dict, List, Tuple, Iterator
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
@@ -31,17 +31,24 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -
Feature, address; where Feature is either: API, Number, or String.
"""
calls: List[Dict] = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
calls: List[Dict] = process
calls: List[Dict] = process["calls"]
tid = str(th.tid)
for call in calls:
if call["thread_id"] != tid:
continue
yield Number(int(call["return"], 16)), int(call["caller"], 16)
yield API(call["api"]), int(call["caller"], 16)
caller = int(call["caller"], 16)
caller = AbsoluteVirtualAddress(caller)
for arg in call["arguments"]:
try:
yield Number(int(arg["value"], 16)), caller
except ValueError:
continue
yield Number(int(call["return"], 16)), caller
yield API(call["api"]), caller
def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -25,8 +25,8 @@ class API(Feature):
if signature.isidentifier():
# api call is in the legacy format
super().__init__(signature, description=description)
self.args = {}
self.ret = False
self.args: Dict[str, str] = {}
self.ret = ""
else:
# api call is in the strace format and therefore has to be parsed
name, self.args, self.ret = self.parse_signature(signature)
@@ -43,30 +43,32 @@ class API(Feature):
return False
assert isinstance(other, API)
if {} in (self.args, other.args) or False in (self.ret, other.ret):
if {} in (self.args, other.args) or "" in (self.ret, other.ret):
# Legacy API feature
return super().__eq__(other)
# API call with arguments
return super().__eq__(other) and self.args == other.args and self.ret == other.ret
def parse_signature(self, signature: str) -> Tuple[str, Optional[Dict[str, str]], Optional[str]]:
def parse_signature(self, signature: str) -> Tuple[str, Dict[str, str], str]:
# todo: optimize this method and improve the code quality
import re
args = ret = False
args: Dict[str, str] = {}
ret = ""
match = re.findall(r"(.+\(.*\)) ?=? ?([^=]*)", signature)
if not match:
return "", None, None
return "", {}, ""
if len(match[0]) == 2:
ret = match[0][1]
match = re.findall(r"(.*)\((.*)\)", match[0][0])
if len(match[0]) == 2:
args = (match[0][1] + ", ").split(", ")
args_: Dict[str, str] = (match[0][1] + ", ").split(", ")
map(lambda x: {f"arg{x[0]}": x[1]}, enumerate(args))
args = [{} | arg for arg in args][0]
for num, arg in enumerate(args_):
args.update({f"arg {0}": arg})
return match[0][0], args, ret