mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
add Address abstraction to handle various ways of identifing things in files
This commit is contained in:
@@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Set, Dict, List, Tuple, Mapping, Iterable
|
||||
import capa.perf
|
||||
import capa.features.common
|
||||
from capa.features.common import Result, Feature
|
||||
from capa.features.address import Address
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# circular import, otherwise
|
||||
@@ -26,7 +27,7 @@ if TYPE_CHECKING:
|
||||
# to collect the locations of a feature, do: `features[Number(0x10)]`
|
||||
#
|
||||
# aliased here so that the type can be documented and xref'd.
|
||||
FeatureSet = Dict[Feature, Set[int]]
|
||||
FeatureSet = Dict[Feature, Set[Address]]
|
||||
|
||||
|
||||
class Statement:
|
||||
@@ -257,10 +258,10 @@ class Subscope(Statement):
|
||||
# inspect(match_details)
|
||||
#
|
||||
# aliased here so that the type can be documented and xref'd.
|
||||
MatchResults = Mapping[str, List[Tuple[int, Result]]]
|
||||
MatchResults = Mapping[str, List[Tuple[Address, Result]]]
|
||||
|
||||
|
||||
def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[int]):
|
||||
def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[Address]):
|
||||
"""
|
||||
record into the given featureset that the given rule matched at the given locations.
|
||||
|
||||
@@ -277,7 +278,7 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
|
||||
namespace, _, _ = namespace.rpartition("/")
|
||||
|
||||
|
||||
def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]:
|
||||
def match(rules: List["capa.rules.Rule"], features: FeatureSet, addr: Address) -> Tuple[FeatureSet, MatchResults]:
|
||||
"""
|
||||
match the given rules against the given features,
|
||||
returning an updated set of features and the matches.
|
||||
@@ -315,10 +316,10 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl
|
||||
# sanity check
|
||||
assert bool(res) is True
|
||||
|
||||
results[rule.name].append((va, res))
|
||||
results[rule.name].append((addr, res))
|
||||
# we need to update the current `features`
|
||||
# because subsequent iterations of this loop may use newly added features,
|
||||
# such as rule or namespace matches.
|
||||
index_rule_matches(features, rule, [va])
|
||||
index_rule_matches(features, rule, [addr])
|
||||
|
||||
return (features, results)
|
||||
|
||||
68
capa/features/address.py
Normal file
68
capa/features/address.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import abc
|
||||
|
||||
from dncil.clr.token import Token
|
||||
|
||||
|
||||
class Address(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __lt__(self, other):
|
||||
# implement < so that addresses can be sorted from low to high
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def __hash__(self):
|
||||
# implement hash so that addresses can be used in sets and dicts
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def __str__(self):
|
||||
# implement str so the address can be rendered in capa output
|
||||
...
|
||||
|
||||
|
||||
class AbsoluteVirtualAddress(int, Address):
|
||||
"""an absolute memory address"""
|
||||
def __new__(cls, v):
|
||||
assert v > 0
|
||||
return int.__new__(cls, v)
|
||||
|
||||
|
||||
class RelativeVirtualAddress(int, Address):
|
||||
"""a memory address relative to a base address"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class FileOffsetAddress(int, Address):
|
||||
"""an address relative to the start of a file"""
|
||||
def __new__(cls, v):
|
||||
assert v > 0
|
||||
return int.__new__(cls, v)
|
||||
|
||||
|
||||
class DNTokenAddress(Token, Address):
|
||||
"""a .NET token"""
|
||||
pass
|
||||
|
||||
|
||||
class DNTokenOffsetAddress(Address):
|
||||
"""an offset into an object specified by a .NET token"""
|
||||
|
||||
def __init__(self, token: Token, rva: int):
|
||||
assert rva > 0
|
||||
self.token = token
|
||||
self.rva = rva
|
||||
|
||||
|
||||
class _NoAddress(Address):
|
||||
def __lt__(self, other):
|
||||
return False
|
||||
|
||||
def __hash__(self):
|
||||
return hash(0)
|
||||
|
||||
def __str__(self):
|
||||
return "no address"
|
||||
|
||||
|
||||
NO_ADDRESS = _NoAddress()
|
||||
@@ -11,7 +11,7 @@ import abc
|
||||
import codecs
|
||||
import logging
|
||||
import collections
|
||||
from typing import TYPE_CHECKING, Set, Dict, List, Union
|
||||
from typing import TYPE_CHECKING, Set, Dict, List, Union, Optional, Sequence
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# circular import, otherwise
|
||||
@@ -20,6 +20,7 @@ if TYPE_CHECKING:
|
||||
import capa.perf
|
||||
import capa.features
|
||||
import capa.features.extractors.elf
|
||||
from capa.features.address import Address
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
MAX_BYTES_FEATURE_SIZE = 0x100
|
||||
@@ -70,20 +71,13 @@ class Result:
|
||||
success: bool,
|
||||
statement: Union["capa.engine.Statement", "Feature"],
|
||||
children: List["Result"],
|
||||
locations=None,
|
||||
locations: Optional[Set[Address]] = None,
|
||||
):
|
||||
"""
|
||||
args:
|
||||
success (bool)
|
||||
statement (capa.engine.Statement or capa.features.Feature)
|
||||
children (list[Result])
|
||||
locations (iterable[VA])
|
||||
"""
|
||||
super(Result, self).__init__()
|
||||
self.success = success
|
||||
self.statement = statement
|
||||
self.children = children
|
||||
self.locations = locations if locations is not None else ()
|
||||
self.locations = locations if locations is not None else set()
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, bool):
|
||||
@@ -137,10 +131,10 @@ class Feature(abc.ABC):
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
def evaluate(self, ctx: Dict["Feature", Set[int]], **kwargs) -> Result:
|
||||
def evaluate(self, ctx: Dict["Feature", Set[Address]], **kwargs) -> Result:
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature." + self.name] += 1
|
||||
return Result(self in ctx, self, [], locations=ctx.get(self, []))
|
||||
return Result(self in ctx, self, [], locations=ctx.get(self, set()))
|
||||
|
||||
def freeze_serialize(self):
|
||||
return (self.__class__.__name__, [self.value])
|
||||
|
||||
@@ -63,6 +63,7 @@ from capa.features.common import (
|
||||
FORMAT_DOTNET,
|
||||
FORMAT_FREEZE,
|
||||
)
|
||||
from capa.features.address import NO_ADDRESS
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
@@ -224,7 +225,7 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
|
||||
|
||||
file_features.update(function_features)
|
||||
|
||||
_, matches = ruleset.match(Scope.FILE, file_features, 0x0)
|
||||
_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
|
||||
return matches, len(file_features)
|
||||
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import uuid
|
||||
import codecs
|
||||
import logging
|
||||
import binascii
|
||||
import functools
|
||||
import collections
|
||||
from enum import Enum
|
||||
|
||||
@@ -40,6 +39,7 @@ import capa.features.common
|
||||
import capa.features.basicblock
|
||||
from capa.engine import Statement, FeatureSet
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature
|
||||
from capa.features.address import Address
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1284,7 +1284,7 @@ class RuleSet:
|
||||
break
|
||||
return RuleSet(list(rules_filtered))
|
||||
|
||||
def match(self, scope: Scope, features: FeatureSet, va: int) -> Tuple[FeatureSet, ceng.MatchResults]:
|
||||
def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]:
|
||||
"""
|
||||
match rules from this ruleset at the given scope against the given features.
|
||||
|
||||
@@ -1316,7 +1316,7 @@ class RuleSet:
|
||||
# first, match against the set of rules that have at least one
|
||||
# feature shared with our feature set.
|
||||
candidate_rules = [self.rules[name] for name in candidate_rule_names]
|
||||
features2, easy_matches = ceng.match(candidate_rules, features, va)
|
||||
features2, easy_matches = ceng.match(candidate_rules, features, addr)
|
||||
|
||||
# note that we've stored the updated feature set in `features2`.
|
||||
# this contains a superset of the features in `features`;
|
||||
@@ -1335,7 +1335,7 @@ class RuleSet:
|
||||
# that we can't really make any guesses about.
|
||||
# these are rules with hard features, like substring/regex/bytes and match statements.
|
||||
hard_rules = [self.rules[name] for name in hard_rule_names]
|
||||
features3, hard_matches = ceng.match(hard_rules, features2, va)
|
||||
features3, hard_matches = ceng.match(hard_rules, features2, addr)
|
||||
|
||||
# note that above, we probably are skipping matching a bunch of
|
||||
# rules that definitely would never hit.
|
||||
|
||||
Reference in New Issue
Block a user