Merge pull request #1569 from yelhamer/static-extractor

add a StaticFeatureExtractor class
This commit is contained in:
Willi Ballenthin
2023-06-28 11:13:46 +02:00
committed by GitHub
18 changed files with 115 additions and 49 deletions

View File

@@ -11,6 +11,7 @@
### Breaking Changes
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat
- Change the old FeatureExtractor class' name into StaticFeatureExtractor, and make the former an alias for both the StaticFeatureExtractor and DynamicFeatureExtractor classes @yelhamer [#1567](https://github.com/mandiant/capa/issues/1567)
### New Rules (9)

View File

@@ -11,6 +11,8 @@ import dataclasses
from typing import Any, Dict, Tuple, Union, Iterator
from dataclasses import dataclass
from typing_extensions import TypeAlias
import capa.features.address
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
@@ -63,16 +65,18 @@ class InsnHandle:
inner: Any
class FeatureExtractor:
class StaticFeatureExtractor:
"""
FeatureExtractor defines the interface for fetching features from a sample.
StaticFeatureExtractor defines the interface for fetching features from a
sample without running it; extractors that rely on the execution trace of
a sample must implement the other sibling class, DynamicFeatureExtracor.
There may be multiple backends that support fetching features for capa.
For example, we use vivisect by default, but also want to support saving
and restoring features from a JSON file.
When we restore the features, we'd like to use exactly the same matching logic
to find matching rules.
Therefore, we can define a FeatureExtractor that provides features from the
Therefore, we can define a StaticFeatureExtractor that provides features from the
serialized JSON file and do matching without a binary analysis pass.
Also, this provides a way to hook in an IDA backend.
@@ -292,9 +296,11 @@ class ThreadHandle:
inner: Any
class DynamicExtractor(FeatureExtractor):
class DynamicFeatureExtractor:
"""
DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample.
DynamicFeatureExtractor defines the interface for fetching features from a
sandbox' analysis of a sample; extractors that rely on statically analyzing
a sample must implement the sibling extractor, StaticFeatureExtractor.
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
@@ -303,6 +309,38 @@ class DynamicExtractor(FeatureExtractor):
This class is not instantiated directly; it is the base class for other implementations.
"""
@abc.abstractmethod
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
"""
extract features found at every scope ("global").
example::
extractor = CapeFeatureExtractor.from_report(json.loads(buf))
for feature, addr in extractor.get_global_features():
print(addr, feature)
yields:
Tuple[Feature, Address]: feature and its location
"""
raise NotImplementedError()
@abc.abstractmethod
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
"""
extract file-scope features.
example::
extractor = CapeFeatureExtractor.from_report(json.loads(buf))
for feature, addr in extractor.get_file_features():
print(addr, feature)
yields:
Tuple[Feature, Address]: feature and its location
"""
raise NotImplementedError()
@abc.abstractmethod
def get_processes(self) -> Iterator[ProcessHandle]:
"""
@@ -336,3 +374,6 @@ class DynamicExtractor(FeatureExtractor):
- network activity
"""
raise NotImplementedError()
FeatureExtractor: TypeAlias = Union[StaticFeatureExtractor, DynamicFeatureExtractor]

View File

@@ -17,10 +17,10 @@ import capa.features.extractors.binja.function
import capa.features.extractors.binja.basicblock
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
class BinjaFeatureExtractor(FeatureExtractor):
class BinjaFeatureExtractor(StaticFeatureExtractor):
def __init__(self, bv: binja.BinaryView):
super().__init__()
self.bv = bv

View File

@@ -13,13 +13,13 @@ import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor
logger = logging.getLogger(__name__)
class CapeExtractor(DynamicExtractor):
class CapeExtractor(DynamicFeatureExtractor):
def __init__(self, static: Dict, behavior: Dict):
super().__init__()
self.static = static

View File

@@ -14,7 +14,7 @@ import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)

View File

@@ -21,7 +21,7 @@ import capa.features.extractors.dnfile.function
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
from capa.features.extractors.dnfile.helpers import (
get_dotnet_types,
get_dotnet_fields,
@@ -67,7 +67,7 @@ class DnFileFeatureExtractorCache:
return self.types.get(token, None)
class DnfileFeatureExtractor(FeatureExtractor):
class DnfileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: str):
super().__init__()
self.pe: dnfile.dnPE = dnfile.dnPE(path)

View File

@@ -17,7 +17,7 @@ from capa.features.common import (
Feature,
)
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.base_extractor import StaticFeatureExtractor
logger = logging.getLogger(__name__)
@@ -73,7 +73,7 @@ GLOBAL_HANDLERS = (
)
class DnfileFeatureExtractor(FeatureExtractor):
class DnfileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: str):
super().__init__()
self.path: str = path

View File

@@ -23,7 +23,7 @@ from capa.features.common import (
Characteristic,
)
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.base_extractor import StaticFeatureExtractor
from capa.features.extractors.dnfile.helpers import (
DnType,
iter_dotnet_table,
@@ -157,7 +157,7 @@ GLOBAL_HANDLERS = (
)
class DotnetFileFeatureExtractor(FeatureExtractor):
class DotnetFileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: str):
super().__init__()
self.path: str = path

View File

@@ -15,7 +15,7 @@ import capa.features.extractors.common
from capa.features.file import Import, Section
from capa.features.common import OS, FORMAT_ELF, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.base_extractor import StaticFeatureExtractor
logger = logging.getLogger(__name__)
@@ -106,7 +106,7 @@ GLOBAL_HANDLERS = (
)
class ElfFeatureExtractor(FeatureExtractor):
class ElfFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: str):
super().__init__()
self.path = path

View File

@@ -18,10 +18,10 @@ import capa.features.extractors.ida.function
import capa.features.extractors.ida.basicblock
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
class IdaFeatureExtractor(FeatureExtractor):
class IdaFeatureExtractor(StaticFeatureExtractor):
def __init__(self):
super().__init__()
self.global_features: List[Tuple[Feature, Address]] = []

View File

@@ -3,7 +3,7 @@ from dataclasses import dataclass
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
@dataclass
@@ -24,7 +24,7 @@ class FunctionFeatures:
@dataclass
class NullFeatureExtractor(FeatureExtractor):
class NullFeatureExtractor(StaticFeatureExtractor):
"""
An extractor that extracts some user-provided features.

View File

@@ -18,7 +18,7 @@ import capa.features.extractors.strings
from capa.features.file import Export, Import, Section
from capa.features.common import OS, ARCH_I386, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Characteristic
from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.base_extractor import StaticFeatureExtractor
logger = logging.getLogger(__name__)
@@ -172,7 +172,7 @@ GLOBAL_HANDLERS = (
)
class PefileFeatureExtractor(FeatureExtractor):
class PefileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: str):
super().__init__()
self.path = path

View File

@@ -19,12 +19,12 @@ import capa.features.extractors.viv.function
import capa.features.extractors.viv.basicblock
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
logger = logging.getLogger(__name__)
class VivisectFeatureExtractor(FeatureExtractor):
class VivisectFeatureExtractor(StaticFeatureExtractor):
def __init__(self, vw, path, os):
super().__init__()
self.vw = vw

View File

@@ -23,9 +23,9 @@ import capa.features.insn
import capa.features.common
import capa.features.address
import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.helpers import assert_never
from capa.features.freeze.features import Feature, feature_from_capa
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
logger = logging.getLogger(__name__)
@@ -226,7 +226,7 @@ class Freeze(BaseModel):
allow_population_by_field_name = True
def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> str:
def dumps(extractor: StaticFeatureExtractor) -> str:
"""
serialize the given extractor to a string
"""
@@ -327,7 +327,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
return freeze.json()
def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor:
def loads(s: str) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
import capa.features.extractors.null as null
@@ -363,8 +363,9 @@ def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor:
MAGIC = "capa0000".encode("ascii")
def dump(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> bytes:
def dump(extractor: FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array."""
assert isinstance(extractor, StaticFeatureExtractor)
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
@@ -372,7 +373,7 @@ def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC
def load(buf: bytes) -> capa.features.extractors.base_extractor.FeatureExtractor:
def load(buf: bytes) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")

View File

@@ -21,7 +21,7 @@ import textwrap
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple, Union, Callable
from typing import Any, Dict, List, Tuple, Union, Callable, cast
import halo
import tqdm
@@ -83,8 +83,9 @@ from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
FunctionHandle,
DynamicExtractor,
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
@@ -126,7 +127,7 @@ def set_vivisect_log_level(level):
def find_instruction_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Tuple[FeatureSet, MatchResults]:
"""
find matches for the given rules for the given instruction.
@@ -153,7 +154,7 @@ def find_instruction_capabilities(
def find_basic_block_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
"""
find matches for the given rules within the given basic block.
@@ -193,7 +194,7 @@ def find_basic_block_capabilities(
def find_code_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, fh: FunctionHandle
ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given function.
@@ -251,7 +252,9 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
return matches, len(file_features)
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
def find_static_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None
) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults
all_bb_matches = collections.defaultdict(list) # type: MatchResults
all_insn_matches = collections.defaultdict(list) # type: MatchResults
@@ -334,6 +337,17 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
return matches, meta
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, **kwargs) -> Tuple[MatchResults, Any]:
if isinstance(extractor, StaticFeatureExtractor):
extractor_: StaticFeatureExtractor = cast(StaticFeatureExtractor, extractor)
return find_static_capabilities(ruleset, extractor_, kwargs)
elif isinstance(extractor, DynamicFeatureExtractor):
# extractor_ = cast(DynamicFeatureExtractor, extractor)
raise NotImplementedError()
else:
raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}")
# TODO move all to helpers?
def has_rule_with_namespace(rules, capabilities, rule_cat):
for rule_name in capabilities.keys():
@@ -766,12 +780,13 @@ def collect_metadata(
format_: str,
os_: str,
rules_path: List[str],
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
extractor: FeatureExtractor,
) -> rdoc.Metadata:
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
assert isinstance(extractor, StaticFeatureExtractor)
with open(sample_path, "rb") as f:
buf = f.read()
@@ -1247,7 +1262,8 @@ def main(argv=None):
if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor = frz.load(f.read())
extractor: FeatureExtractor = frz.load(f.read())
assert isinstance(extractor, StaticFeatureExtractor)
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces

View File

@@ -46,6 +46,7 @@ import capa.helpers
import capa.features
import capa.features.common
import capa.features.freeze
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
logger = logging.getLogger("capa.profile")
@@ -103,12 +104,14 @@ def main(argv=None):
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
assert isinstance(extractor, StaticFeatureExtractor)
else:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
)
assert isinstance(extractor, StaticFeatureExtractor)
with tqdm.tqdm(total=args.number * args.repeat) as pbar:
def do_iteration():

View File

@@ -70,6 +70,7 @@ import capa.render.result_document as rd
from capa.helpers import get_file_taste
from capa.features.common import FORMAT_AUTO
from capa.features.freeze import Address
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
logger = logging.getLogger("capa.show-capabilities-by-function")
@@ -160,7 +161,7 @@ def main(argv=None):
if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)):
format_ = "freeze"
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
else:
format_ = args.format
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
@@ -169,6 +170,7 @@ def main(argv=None):
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
assert isinstance(extractor, StaticFeatureExtractor)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
return -1

View File

@@ -82,7 +82,7 @@ import capa.features.freeze
import capa.features.address
from capa.helpers import get_auto_format, log_unsupported_runtime_error
from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, is_global_feature
from capa.features.extractors.base_extractor import DynamicExtractor, FeatureExtractor
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger("capa.show-features")
@@ -120,7 +120,7 @@ def main(argv=None):
# this should be moved above the previous if clause after implementing
# feature freeze for the dynamic analysis flavor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
@@ -135,14 +135,16 @@ def main(argv=None):
return -1
if format_ in DYNAMIC_FORMATS:
print_dynamic_analysis(cast(DynamicExtractor, extractor), args)
assert isinstance(extractor, DynamicFeatureExtractor)
print_dynamic_analysis(extractor, args)
else:
assert isinstance(extractor, StaticFeatureExtractor)
print_static_analysis(extractor, args)
return 0
def print_static_analysis(extractor: FeatureExtractor, args):
def print_static_analysis(extractor: StaticFeatureExtractor, args):
for feature, addr in extractor.extract_global_features():
print(f"global: {format_address(addr)}: {feature}")
@@ -170,7 +172,7 @@ def print_static_analysis(extractor: FeatureExtractor, args):
print_static_features(function_handles, extractor)
def print_dynamic_analysis(extractor: DynamicExtractor, args):
def print_dynamic_analysis(extractor: DynamicFeatureExtractor, args):
for feature, addr in extractor.extract_global_features():
print(f"global: {format_address(addr)}: {feature}")
@@ -189,7 +191,7 @@ def print_dynamic_analysis(extractor: DynamicExtractor, args):
print_dynamic_features(process_handles, extractor)
def print_static_features(functions, extractor: FeatureExtractor):
def print_static_features(functions, extractor: StaticFeatureExtractor):
for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
@@ -235,7 +237,7 @@ def print_static_features(functions, extractor: FeatureExtractor):
continue
def print_dynamic_features(processes, extractor: DynamicExtractor):
def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
for p in processes:
print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})")