cape: use pydantic model

This commit is contained in:
Willi Ballenthin
2023-08-16 11:12:05 +00:00
committed by GitHub
parent e943a71dff
commit 6f7bf96776
6 changed files with 141 additions and 170 deletions

View File

@@ -7,29 +7,24 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
from typing import Tuple, Iterator
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.helpers import assert_never
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.cape.models import Call
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def extract_call_features(
behavior: Dict, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
"""
this method extrcts the given call's features (api name and arguments),
this method extrcts the given call's features (such as API name and arguments),
and returns them as API, Number, and String features.
args:
behavior: a dictionary of behavioral artifacts extracted by the sandbox
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
ch: call handle (for defining the extraction scope)
@@ -37,27 +32,29 @@ def extract_call_features(
yields:
Feature, address; where Feature is either: API, Number, or String.
"""
# TODO(yelhamer): find correct base address used at runtime.
# this address may vary from the PE header, may read actual base from procdump.pe.imagebase or similar.
# https://github.com/mandiant/capa/issues/1618
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
calls: List[Dict[str, Any]] = process["calls"]
call = calls[ch.address.id]
assert call["thread_id"] == str(th.address.tid)
call: Call = ch.inner
# list similar to disassembly: arguments right-to-left, call
for arg in call["arguments"][::-1]:
try:
yield Number(int(arg["value"], 16)), ch.address
except ValueError:
yield String(arg["value"]), ch.address
yield API(call["api"]), ch.address
for arg in reversed(call.arguments):
if isinstance(arg, list) and len(arg) == 0:
# unsure why CAPE captures arguments as empty lists?
continue
elif isinstance(arg, str):
yield String(arg), ch.address
elif isinstance(arg, int):
yield Number(arg), ch.address
else:
assert_never(arg)
yield API(call.api), ch.address
def extract_features(
behavior: Dict, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(behavior, ph, th, ch):
for feature, addr in handler(ph, th, ch):
yield feature, addr

View File

@@ -14,8 +14,10 @@ import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress
from capa.exceptions import UnsupportedFormatError
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.cape.models import CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
@@ -26,26 +28,26 @@ from capa.features.extractors.base_extractor import (
logger = logging.getLogger(__name__)
TESTED_VERSIONS = ("2.2-CAPE",)
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE"}
class CapeExtractor(DynamicFeatureExtractor):
def __init__(self, cape_version: str, static: Dict, behavior: Dict):
def __init__(self, report: CapeReport):
super().__init__()
self.cape_version = cape_version
self.static = static
self.behavior = behavior
self.report: CapeReport = report
self.sample_hashes = SampleHashes(
md5=static["file"]["md5"].lower(),
sha1=static["file"]["sha1"].lower(),
sha256=static["file"]["sha256"].lower(),
md5=self.report.target.file.md5.lower(),
sha1=self.report.target.file.sha1.lower(),
sha256=self.report.target.file.sha256.lower(),
)
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
self.global_features = capa.features.extractors.cape.global_.extract_features(self.report)
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.static["pe"]["imagebase"])
assert self.report.static is not None and self.report.static.pe is not None
return AbsoluteVirtualAddress(self.report.static.pe.imagebase)
def get_sample_hashes(self) -> SampleHashes:
return self.sample_hashes
@@ -54,44 +56,43 @@ class CapeExtractor(DynamicFeatureExtractor):
yield from self.global_features
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.file.extract_features(self.static)
yield from capa.features.extractors.cape.file.extract_features(self.report)
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.cape.file.get_processes(self.behavior)
yield from capa.features.extractors.cape.file.get_processes(self.report)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph)
yield from capa.features.extractors.cape.process.extract_features(ph)
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph)
yield from capa.features.extractors.cape.process.get_threads(ph)
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th)
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.cape.thread.get_calls(self.behavior, ph, th)
yield from capa.features.extractors.cape.thread.get_calls(ph, th)
def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.call.extract_features(self.behavior, ph, th, ch)
yield from capa.features.extractors.cape.call.extract_features(ph, th, ch)
@classmethod
def from_report(cls, report: Dict) -> "CapeExtractor":
cape_version = report["info"]["version"]
if cape_version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cape_version)
cr = CapeReport.model_validate(report)
static = report["static"]
format_ = list(static.keys())[0]
static = static[format_]
static.update(report["behavior"].pop("summary"))
static.update(report["target"])
static.update({"processtree": report["behavior"]["processtree"]})
static.update({"strings": report["strings"]})
static.update({"format": format_})
if cr.info.version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version)
behavior = report.pop("behavior")
behavior["network"] = report.pop("network")
if cr.static is None:
raise UnsupportedFormatError("CAPE report missing static analysis")
return cls(cape_version, static, behavior)
if cr.static.pe is None:
raise UnsupportedFormatError("CAPE report missing static analysis")
return cls(cr)

View File

@@ -7,106 +7,98 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
from typing import Tuple, Iterator
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.cape.models import CapeReport
from capa.features.extractors.base_extractor import ProcessHandle
logger = logging.getLogger(__name__)
def get_processes(static: Dict) -> Iterator[ProcessHandle]:
def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
"""
get all the created processes for a sample
"""
def rec(process):
address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"])
inner: Dict[str, str] = {"name": process["name"]}
yield ProcessHandle(address=address, inner=inner)
for child in process["children"]:
yield from rec(child)
for process in static["processtree"]:
yield from rec(process)
for process in report.behavior.processes:
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
yield ProcessHandle(address=addr, inner=process)
def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
def extract_import_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
"""
extract imported function names
"""
imports = static["imports"]
assert report.static is not None and report.static.pe is not None
imports = report.static.pe.imports
"""
2.2-CAPE
"imports": [
{
"dll": "RPCRT4.dll",
"imports": [{"address": "0x40504c","name": "NdrSimpleTypeUnmarshall"}, ...]
},
...
]
2.4-CAPE
"imports": {
"ADVAPI32": {
"dll": "ADVAPI32.dll",
"imports": [{"address": "0x522000", "name": "OpenSCManagerA"}, ...],
...
},
...
}
"""
if isinstance(imports, dict):
imports = imports.values()
imports = list(imports.values())
assert isinstance(imports, list)
for library in imports:
for function in library["imports"]:
addr = int(function["address"], 16)
for name in generate_symbols(library["dll"], function["name"]):
yield Import(name), AbsoluteVirtualAddress(addr)
for function in library.imports:
for name in generate_symbols(library.dll, function.name):
yield Import(name), AbsoluteVirtualAddress(function.address)
def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for function in static["exports"]:
name, address = function["name"], int(function["address"], 16)
yield Export(name), AbsoluteVirtualAddress(address)
def extract_export_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
for function in report.static.pe.exports:
yield Export(function.name), AbsoluteVirtualAddress(function.address)
def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
# be consistent with static extractors and use section VA
base = int(static["imagebase"], 16)
for section in static["sections"]:
name, address = section["name"], int(section["virtual_address"], 16)
yield Section(name), AbsoluteVirtualAddress(base + address)
def extract_section_names(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
for section in report.static.pe.sections:
yield Section(section.name), AbsoluteVirtualAddress(section.virtual_address)
def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for string_ in static["strings"]:
yield String(string_), NO_ADDRESS
def extract_file_strings(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
if report.strings is not None:
for string in report.strings:
yield String(string), NO_ADDRESS
def extract_used_regkeys(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for regkey in static["keys"]:
def extract_used_regkeys(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for regkey in report.behavior.summary.keys:
yield String(regkey), NO_ADDRESS
def extract_used_files(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for filename in static["files"]:
yield String(filename), NO_ADDRESS
def extract_used_files(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for file in report.behavior.summary.files:
yield String(file), NO_ADDRESS
def extract_used_mutexes(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for mutex in static["mutexes"]:
def extract_used_mutexes(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for mutex in report.behavior.summary.mutexes:
yield String(mutex), NO_ADDRESS
def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]:
def extract_used_commands(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for cmd in report.behavior.summary.executed_commands:
yield String(cmd), NO_ADDRESS
def extract_used_apis(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for symbol in report.behavior.summary.resolved_apis:
yield String(symbol), NO_ADDRESS
def extract_used_services(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for svc in report.behavior.summary.created_services:
yield String(svc), NO_ADDRESS
for svc in report.behavior.summary.started_services:
yield String(svc), NO_ADDRESS
def extract_features(report: CapeReport) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(static):
for feature, addr in handler(report):
yield feature, addr
@@ -118,4 +110,6 @@ FILE_HANDLERS = (
extract_used_regkeys,
extract_used_files,
extract_used_mutexes,
extract_used_apis,
extract_used_services,
)

View File

@@ -63,6 +63,10 @@ EmptyDict: TypeAlias = BaseModel
EmptyList: TypeAlias = List[Any]
class Info(FlexibleModel):
version: str
class ImportedSymbol(ExactModel):
address: HexInt
name: str
@@ -251,7 +255,7 @@ class ProcessFile(File):
class Argument(ExactModel):
name: str
# unsure why empty list is provided here
value: Union[HexInt, str, EmptyList]
value: Union[HexInt, int, str, EmptyList]
pretty_value: Optional[str] = None
@@ -359,6 +363,8 @@ class CAPE(ExactModel):
class CapeReport(FlexibleModel):
# the input file, I think
target: Target
# info about the processing job, like machine and distributed metadata.
info: Info
#
# static analysis results
@@ -397,8 +403,6 @@ class CapeReport(FlexibleModel):
# screenshot hash values
deduplicated_shots: Skip = None
# info about the processing job, like machine and distributed metadata.
info: Skip = None
# k-v pairs describing the time it took to run each stage.
statistics: Skip = None
# k-v pairs of ATT&CK ID to signature name or similar.

View File

@@ -7,50 +7,41 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Iterator
from typing import List, Tuple, Iterator
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import String, Feature
from capa.features.address import Address, ThreadAddress
from capa.features.extractors.cape.models import Process
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]:
def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]:
"""
get the threads associated with a given process
"""
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
threads: List = process["threads"]
process: Process = ph.inner
threads: List[int] = process.threads
for thread in threads:
address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread))
address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread)
yield ThreadHandle(address=address, inner={})
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_environ_strings(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract strings from a process' provided environment variables.
"""
process: Process = ph.inner
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
environ: Dict[str, str] = process["environ"]
if not environ:
return
for value in (value for value in environ.values() if value):
for value in (value for value in process.environ.values() if value):
yield String(value), ph.address
def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_features(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in PROCESS_HANDLERS:
for feature, addr in handler(behavior, ph):
for feature, addr in handler(ph):
yield feature, addr

View File

@@ -7,38 +7,22 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
from typing import Iterator
import capa.features.extractors.cape.helpers
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DynamicCallAddress
from capa.features.address import DynamicCallAddress
from capa.features.extractors.cape.models import Process
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def get_calls(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
calls: List[Dict[str, Any]] = process["calls"]
def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
process: Process = ph.inner
tid = str(th.address.tid)
for call in calls:
if call["thread_id"] != tid:
tid = th.address.tid
for call_index, call in enumerate(process.calls):
if call.thread_id != tid:
continue
addr = DynamicCallAddress(thread=th.address, id=call["id"])
ch = CallHandle(address=addr, inner={})
yield ch
def extract_thread_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
yield from ((Feature(0), NO_ADDRESS),)
def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in THREAD_HANDLERS:
for feature, addr in handler(behavior, ph, th):
yield feature, addr
THREAD_HANDLERS = (extract_thread_features,)
addr = DynamicCallAddress(thread=th.address, id=call_index)
yield CallHandle(address=addr, inner=call)