mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
layout: capture call names
so that they can be rendered to output
This commit is contained in:
@@ -456,5 +456,15 @@ class DynamicFeatureExtractor:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> str:
|
||||
"""
|
||||
Returns the human-readable name for the given call,
|
||||
such as as rendered API log entry, like:
|
||||
|
||||
Foo(1, "two", b"\x00\x11") -> -1
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
FeatureExtractor: TypeAlias = Union[StaticFeatureExtractor, DynamicFeatureExtractor]
|
||||
|
||||
@@ -17,7 +17,7 @@ import capa.features.extractors.cape.process
|
||||
from capa.exceptions import EmptyReportError, UnsupportedFormatError
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
|
||||
from capa.features.extractors.cape.models import Static, Process, CapeReport
|
||||
from capa.features.extractors.cape.models import Call, Static, Process, CapeReport
|
||||
from capa.features.extractors.base_extractor import (
|
||||
CallHandle,
|
||||
SampleHashes,
|
||||
@@ -82,6 +82,43 @@ class CapeExtractor(DynamicFeatureExtractor):
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
yield from capa.features.extractors.cape.call.extract_features(ph, th, ch)
|
||||
|
||||
def get_call_name(self, ph, th, ch) -> str:
|
||||
call: Call = ch.inner
|
||||
|
||||
parts = []
|
||||
parts.append(call.api)
|
||||
parts.append("(")
|
||||
for argument in call.arguments:
|
||||
parts.append(argument.name)
|
||||
parts.append("=")
|
||||
|
||||
if argument.pretty_value:
|
||||
parts.append(argument.pretty_value)
|
||||
else:
|
||||
if isinstance(argument.value, int):
|
||||
parts.append(hex(argument.value))
|
||||
elif isinstance(argument.value, str):
|
||||
parts.append('"')
|
||||
parts.append(argument.value)
|
||||
parts.append('"')
|
||||
elif isinstance(argument.value, list):
|
||||
pass
|
||||
else:
|
||||
capa.helpers.assert_never(argument.value)
|
||||
|
||||
parts.append(", ")
|
||||
if call.arguments:
|
||||
# remove the trailing comma
|
||||
parts.pop()
|
||||
parts.append(")")
|
||||
parts.append(" -> ")
|
||||
if call.pretty_return:
|
||||
parts.append(call.pretty_return)
|
||||
else:
|
||||
parts.append(hex(call.return_))
|
||||
|
||||
return "".join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_report(cls, report: Dict) -> "CapeExtractor":
|
||||
cr = CapeReport.model_validate(report)
|
||||
|
||||
@@ -97,6 +97,7 @@ class NullStaticFeatureExtractor(StaticFeatureExtractor):
|
||||
|
||||
@dataclass
|
||||
class CallFeatures:
|
||||
name: str
|
||||
features: List[Tuple[Address, Feature]]
|
||||
|
||||
|
||||
@@ -162,5 +163,8 @@ class NullDynamicFeatureExtractor(DynamicFeatureExtractor):
|
||||
for address, feature in self.processes[ph.address].threads[th.address].calls[ch.address].features:
|
||||
yield feature, address
|
||||
|
||||
def get_call_name(self, ph, th, ch) -> str:
|
||||
return self.processes[ph.address].threads[th.address].calls[ch.address].name
|
||||
|
||||
|
||||
NullFeatureExtractor: TypeAlias = Union[NullStaticFeatureExtractor, NullDynamicFeatureExtractor]
|
||||
|
||||
@@ -289,6 +289,7 @@ class FunctionFeatures(BaseModel):
|
||||
|
||||
class CallFeatures(BaseModel):
|
||||
address: Address
|
||||
name: str
|
||||
features: Tuple[CallFeature, ...]
|
||||
|
||||
|
||||
@@ -490,6 +491,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
|
||||
calls = []
|
||||
for call in extractor.get_calls(p, t):
|
||||
caddr = Address.from_capa(call.address)
|
||||
cname = extractor.get_call_name(p, t, call)
|
||||
cfeatures = [
|
||||
CallFeature(
|
||||
call=caddr,
|
||||
@@ -502,6 +504,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
|
||||
calls.append(
|
||||
CallFeatures(
|
||||
address=caddr,
|
||||
name=cname,
|
||||
features=tuple(cfeatures),
|
||||
)
|
||||
)
|
||||
@@ -605,7 +608,8 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor:
|
||||
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features],
|
||||
calls={
|
||||
c.address.to_capa(): null.CallFeatures(
|
||||
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in c.features]
|
||||
name=c.name,
|
||||
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in c.features],
|
||||
)
|
||||
for c in t.calls
|
||||
},
|
||||
|
||||
54
capa/main.py
54
capa/main.py
@@ -20,7 +20,7 @@ import textwrap
|
||||
import itertools
|
||||
import contextlib
|
||||
import collections
|
||||
from typing import Any, Dict, List, Tuple, Callable, Optional
|
||||
from typing import Any, Set, Dict, List, Tuple, Callable, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import halo
|
||||
@@ -1050,7 +1050,7 @@ def collect_metadata(
|
||||
)
|
||||
|
||||
|
||||
def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabilities) -> rdoc.DynamicLayout:
|
||||
def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabilities: MatchResults) -> rdoc.DynamicLayout:
|
||||
"""
|
||||
compute a metadata structure that links threads
|
||||
to the processes in which they're found.
|
||||
@@ -1060,23 +1060,43 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
|
||||
a large amount of un-referenced data.
|
||||
"""
|
||||
assert isinstance(extractor, DynamicFeatureExtractor)
|
||||
|
||||
matched_threads: Set[Address] = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if capa.rules.Scope.THREAD in rule.scopes:
|
||||
for addr, _ in matches:
|
||||
matched_threads.add(addr)
|
||||
|
||||
matched_calls: Set[Address] = set()
|
||||
|
||||
def result_rec(result: capa.features.common.Result):
|
||||
for loc in result.locations:
|
||||
if isinstance(loc, capa.features.address.DynamicCallAddress):
|
||||
matched_calls.add(loc)
|
||||
for child in result.children:
|
||||
result_rec(child)
|
||||
|
||||
for matches in capabilities.values():
|
||||
for _, result in matches:
|
||||
result_rec(result)
|
||||
|
||||
processes_by_thread: Dict[Address, Address] = {}
|
||||
threads_by_processes: Dict[Address, List[Address]] = {}
|
||||
names_by_process: Dict[Address, str] = {}
|
||||
calls_by_thread: Dict[Address, List[Address]] = {}
|
||||
names_by_call: Dict[Address, str] = {}
|
||||
for p in extractor.get_processes():
|
||||
threads_by_processes[p.address] = []
|
||||
names_by_process[p.address] = extractor.get_process_name(p)
|
||||
for t in extractor.get_threads(p):
|
||||
processes_by_thread[t.address] = p.address
|
||||
threads_by_processes[p.address].append(t.address)
|
||||
|
||||
matched_threads = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if capa.rules.Scope.THREAD in rule.scopes:
|
||||
for addr, _ in matches:
|
||||
assert addr in processes_by_thread
|
||||
matched_threads.add(addr)
|
||||
calls_by_thread[t.address] = []
|
||||
for c in extractor.get_calls(p, t):
|
||||
calls_by_thread[t.address].append(c.address)
|
||||
if c.address in matched_calls:
|
||||
names_by_call[c.address] = extractor.get_call_name(p, t, c)
|
||||
|
||||
layout = rdoc.DynamicLayout(
|
||||
processes=tuple(
|
||||
@@ -1084,7 +1104,19 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
|
||||
address=frz.Address.from_capa(p),
|
||||
name=names_by_process[p],
|
||||
matched_threads=tuple(
|
||||
rdoc.ThreadLayout(address=frz.Address.from_capa(t)) for t in threads if t in matched_threads
|
||||
rdoc.ThreadLayout(
|
||||
address=frz.Address.from_capa(t),
|
||||
matched_calls=tuple(
|
||||
rdoc.CallLayout(
|
||||
address=frz.Address.from_capa(c),
|
||||
name=names_by_call[c],
|
||||
)
|
||||
for c in calls_by_thread[t]
|
||||
if c in matched_calls
|
||||
),
|
||||
)
|
||||
for t in threads
|
||||
if t in matched_threads
|
||||
) # this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
)
|
||||
|
||||
@@ -225,7 +225,19 @@ def dynamic_analysis_to_pb2(analysis: rd.DynamicAnalysis) -> capa_pb2.DynamicAna
|
||||
capa_pb2.ProcessLayout(
|
||||
address=addr_to_pb2(p.address),
|
||||
name=p.name,
|
||||
matched_threads=[capa_pb2.ThreadLayout(address=addr_to_pb2(t.address)) for t in p.matched_threads],
|
||||
matched_threads=[
|
||||
capa_pb2.ThreadLayout(
|
||||
address=addr_to_pb2(t.address),
|
||||
matched_calls=[
|
||||
capa_pb2.CallLayout(
|
||||
address=addr_to_pb2(c.address),
|
||||
name=c.name,
|
||||
)
|
||||
for c in t.matched_calls
|
||||
],
|
||||
)
|
||||
for t in p.matched_threads
|
||||
],
|
||||
)
|
||||
for p in analysis.layout.processes
|
||||
]
|
||||
@@ -708,7 +720,18 @@ def dynamic_analysis_from_pb2(analysis: capa_pb2.DynamicAnalysis) -> rd.DynamicA
|
||||
address=addr_from_pb2(p.address),
|
||||
name=p.name,
|
||||
matched_threads=tuple(
|
||||
[rd.ThreadLayout(address=addr_from_pb2(t.address)) for t in p.matched_threads]
|
||||
[
|
||||
rd.ThreadLayout(
|
||||
address=addr_from_pb2(t.address),
|
||||
matched_calls=tuple(
|
||||
[
|
||||
rd.CallLayout(address=addr_from_pb2(c.address), name=c.name)
|
||||
for c in t.matched_calls
|
||||
]
|
||||
),
|
||||
)
|
||||
for t in p.matched_threads
|
||||
]
|
||||
),
|
||||
)
|
||||
for p in analysis.layout.processes
|
||||
|
||||
@@ -430,8 +430,14 @@ message SubstringFeature {
|
||||
optional string description = 3;
|
||||
}
|
||||
|
||||
message CallLayout {
|
||||
Address address = 1;
|
||||
string name = 2;
|
||||
}
|
||||
|
||||
message ThreadLayout {
|
||||
Address address = 1;
|
||||
repeated CallLayout matched_calls = 2;
|
||||
}
|
||||
|
||||
message Addresses { repeated Address address = 1; }
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -1632,19 +1632,43 @@ class SubstringFeature(google.protobuf.message.Message):
|
||||
global___SubstringFeature = SubstringFeature
|
||||
|
||||
@typing_extensions.final
|
||||
class ThreadLayout(google.protobuf.message.Message):
|
||||
class CallLayout(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
ADDRESS_FIELD_NUMBER: builtins.int
|
||||
NAME_FIELD_NUMBER: builtins.int
|
||||
@property
|
||||
def address(self) -> global___Address: ...
|
||||
name: builtins.str
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
address: global___Address | None = ...,
|
||||
name: builtins.str = ...,
|
||||
) -> None: ...
|
||||
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
|
||||
def ClearField(self, field_name: typing_extensions.Literal["address", b"address"]) -> None: ...
|
||||
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "name", b"name"]) -> None: ...
|
||||
|
||||
global___CallLayout = CallLayout
|
||||
|
||||
@typing_extensions.final
|
||||
class ThreadLayout(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
ADDRESS_FIELD_NUMBER: builtins.int
|
||||
MATCHED_CALLS_FIELD_NUMBER: builtins.int
|
||||
@property
|
||||
def address(self) -> global___Address: ...
|
||||
@property
|
||||
def matched_calls(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___CallLayout]: ...
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
address: global___Address | None = ...,
|
||||
matched_calls: collections.abc.Iterable[global___CallLayout] | None = ...,
|
||||
) -> None: ...
|
||||
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
|
||||
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "matched_calls", b"matched_calls"]) -> None: ...
|
||||
|
||||
global___ThreadLayout = ThreadLayout
|
||||
|
||||
|
||||
@@ -49,8 +49,14 @@ class FunctionLayout(Model):
|
||||
matched_basic_blocks: Tuple[BasicBlockLayout, ...]
|
||||
|
||||
|
||||
class CallLayout(Model):
|
||||
address: frz.Address
|
||||
name: str
|
||||
|
||||
|
||||
class ThreadLayout(Model):
|
||||
address: frz.Address
|
||||
matched_calls: Tuple[CallLayout, ...]
|
||||
|
||||
|
||||
class ProcessLayout(Model):
|
||||
|
||||
@@ -34,6 +34,25 @@ def _get_process_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
|
||||
call = addr.to_capa()
|
||||
assert isinstance(call, capa.features.address.DynamicCallAddress)
|
||||
|
||||
thread = frz.Address.from_capa(call.thread)
|
||||
process = frz.Address.from_capa(call.thread.process)
|
||||
|
||||
# danger: O(n**3)
|
||||
for p in layout.processes:
|
||||
if p.address == process:
|
||||
for t in p.matched_threads:
|
||||
if t.address == thread:
|
||||
for c in t.matched_calls:
|
||||
if c.address == addr:
|
||||
return c.name
|
||||
logger.debug("name not found for call: %s", addr)
|
||||
return ""
|
||||
|
||||
|
||||
def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str:
|
||||
process = addr.to_capa()
|
||||
assert isinstance(process, capa.features.address.ProcessAddress)
|
||||
@@ -51,8 +70,10 @@ def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str:
|
||||
def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str:
|
||||
call = addr.to_capa()
|
||||
assert isinstance(call, capa.features.address.DynamicCallAddress)
|
||||
name = _get_process_name(layout, frz.Address.from_capa(call.thread.process))
|
||||
return f"{name}[{call.thread.process.pid}:{call.thread.tid}] XXX[{call.id}](A, B, C)"
|
||||
|
||||
pname = _get_process_name(layout, frz.Address.from_capa(call.thread.process))
|
||||
cname = _get_call_name(layout, addr)
|
||||
return f"{pname}[{call.thread.process.pid}:{call.thread.tid}][{call.id}] {cname}"
|
||||
|
||||
|
||||
def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address]):
|
||||
|
||||
@@ -54,6 +54,7 @@ EXTRACTOR = capa.features.extractors.null.NullDynamicFeatureExtractor(
|
||||
DynamicCallAddress(
|
||||
thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1
|
||||
): capa.features.extractors.null.CallFeatures(
|
||||
name="CreateFile(12)",
|
||||
features=[
|
||||
(
|
||||
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1),
|
||||
@@ -68,6 +69,7 @@ EXTRACTOR = capa.features.extractors.null.NullDynamicFeatureExtractor(
|
||||
DynamicCallAddress(
|
||||
thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=2
|
||||
): capa.features.extractors.null.CallFeatures(
|
||||
name="WriteFile()",
|
||||
features=[
|
||||
(
|
||||
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=2),
|
||||
|
||||
@@ -158,6 +158,8 @@ def test_render_vverbose_feature(feature, expected):
|
||||
captures={},
|
||||
)
|
||||
|
||||
capa.render.vverbose.render_feature(ostream, matches, feature, indent=0)
|
||||
layout = capa.render.result_document.StaticLayout(functions=())
|
||||
|
||||
capa.render.vverbose.render_feature(ostream, layout, matches, feature, indent=0)
|
||||
|
||||
assert ostream.getvalue().strip() == expected
|
||||
|
||||
Reference in New Issue
Block a user