mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 23:59:48 -08:00
type: capa.main
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
|
||||
import copy
|
||||
import collections
|
||||
from typing import TYPE_CHECKING, Set, Dict, List, Union
|
||||
from typing import TYPE_CHECKING, Set, Dict, List, Tuple, Union, Mapping
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from capa.rules import Rule
|
||||
@@ -16,6 +16,10 @@ if TYPE_CHECKING:
|
||||
import capa.features.common
|
||||
from capa.features.common import Feature
|
||||
|
||||
# a collection of features and the locations at which they are found.
|
||||
# used throughout matching as the context in which features are searched.
|
||||
FeatureSet = Dict[Feature, Set[int]]
|
||||
|
||||
|
||||
class Statement:
|
||||
"""
|
||||
@@ -38,7 +42,7 @@ class Statement:
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, features: FeatureSet) -> "Result":
|
||||
"""
|
||||
classes that inherit `Statement` must implement `evaluate`
|
||||
|
||||
@@ -204,7 +208,11 @@ class Subscope(Statement):
|
||||
raise ValueError("cannot evaluate a subscope directly!")
|
||||
|
||||
|
||||
def match(rules: List["Rule"], features: Dict[Feature, Set[int]], va: int):
|
||||
# mapping from rule name to list of: (location of match, result object)
|
||||
MatchResults = Mapping[str, List[Tuple[int, Result]]]
|
||||
|
||||
|
||||
def match(rules: List["Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]:
|
||||
"""
|
||||
Args:
|
||||
rules (List[capa.rules.Rule]): these must already be ordered topologically by dependency.
|
||||
@@ -212,11 +220,11 @@ def match(rules: List["Rule"], features: Dict[Feature, Set[int]], va: int):
|
||||
va (int): location of the features
|
||||
|
||||
Returns:
|
||||
Tuple[List[capa.features.Feature], Dict[str, Tuple[int, capa.engine.Result]]]: two-tuple with entries:
|
||||
- list of features used for matching (which may be greater than argument, due to rule match features), and
|
||||
Tuple[FeatureSet, Dict[str, Tuple[int, Result]]]: two-tuple with entries:
|
||||
- set of features used for matching (which may be greater than argument, due to rule match features), and
|
||||
- mapping from rule name to (location of match, result object)
|
||||
"""
|
||||
results = collections.defaultdict(list)
|
||||
results = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
# copy features so that we can modify it
|
||||
# without affecting the caller (keep this function pure)
|
||||
|
||||
47
capa/main.py
47
capa/main.py
@@ -21,6 +21,7 @@ import textwrap
|
||||
import itertools
|
||||
import contextlib
|
||||
import collections
|
||||
from typing import Any, List, Tuple
|
||||
|
||||
import halo
|
||||
import tqdm
|
||||
@@ -37,7 +38,10 @@ import capa.features.freeze
|
||||
import capa.render.vverbose
|
||||
import capa.features.extractors
|
||||
import capa.features.extractors.pefile
|
||||
from capa.rules import Rule, RuleSet
|
||||
from capa.engine import FeatureSet, MatchResults
|
||||
from capa.helpers import get_file_taste
|
||||
from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SUPPORTED_FILE_MAGIC = set([b"MZ"])
|
||||
@@ -51,7 +55,7 @@ logger = logging.getLogger("capa")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def timing(msg):
|
||||
def timing(msg: str):
|
||||
t0 = time.time()
|
||||
yield
|
||||
t1 = time.time()
|
||||
@@ -67,12 +71,12 @@ def set_vivisect_log_level(level):
|
||||
logging.getLogger("envi.codeflow").setLevel(level)
|
||||
|
||||
|
||||
def find_function_capabilities(ruleset, extractor, f):
|
||||
def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle):
|
||||
# contains features from:
|
||||
# - insns
|
||||
# - function
|
||||
function_features = collections.defaultdict(set)
|
||||
bb_matches = collections.defaultdict(list)
|
||||
function_features = collections.defaultdict(set) # type: FeatureSet
|
||||
bb_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
for feature, va in extractor.extract_function_features(f):
|
||||
function_features[feature].add(va)
|
||||
@@ -103,8 +107,8 @@ def find_function_capabilities(ruleset, extractor, f):
|
||||
return function_matches, bb_matches, len(function_features)
|
||||
|
||||
|
||||
def find_file_capabilities(ruleset, extractor, function_features):
|
||||
file_features = collections.defaultdict(set)
|
||||
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
|
||||
file_features = collections.defaultdict(set) # type: FeatureSet
|
||||
|
||||
for feature, va in extractor.extract_file_features():
|
||||
# not all file features may have virtual addresses.
|
||||
@@ -124,9 +128,9 @@ def find_file_capabilities(ruleset, extractor, function_features):
|
||||
return matches, len(file_features)
|
||||
|
||||
|
||||
def find_capabilities(ruleset, extractor, disable_progress=None):
|
||||
all_function_matches = collections.defaultdict(list)
|
||||
all_bb_matches = collections.defaultdict(list)
|
||||
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
|
||||
all_function_matches = collections.defaultdict(list) # type: MatchResults
|
||||
all_bb_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
meta = {
|
||||
"feature_counts": {
|
||||
@@ -179,7 +183,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
|
||||
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
|
||||
meta["feature_counts"]["file"] = feature_count
|
||||
|
||||
matches = {}
|
||||
matches = {} # type: MatchResults
|
||||
matches.update(all_bb_matches)
|
||||
matches.update(all_function_matches)
|
||||
matches.update(all_file_matches)
|
||||
@@ -194,15 +198,15 @@ def has_rule_with_namespace(rules, capabilities, rule_cat):
|
||||
return False
|
||||
|
||||
|
||||
def is_internal_rule(rule):
|
||||
def is_internal_rule(rule: Rule) -> bool:
|
||||
return rule.meta.get("namespace", "").startswith("internal/")
|
||||
|
||||
|
||||
def is_file_limitation_rule(rule):
|
||||
def is_file_limitation_rule(rule: Rule) -> bool:
|
||||
return rule.meta.get("namespace", "") == "internal/limitation/file"
|
||||
|
||||
|
||||
def has_file_limitation(rules, capabilities, is_standalone=True):
|
||||
def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
|
||||
file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))
|
||||
|
||||
for file_limitation_rule in file_limitation_rules:
|
||||
@@ -224,7 +228,7 @@ def has_file_limitation(rules, capabilities, is_standalone=True):
|
||||
return False
|
||||
|
||||
|
||||
def is_supported_file_type(sample):
|
||||
def is_supported_file_type(sample: str) -> bool:
|
||||
"""
|
||||
Return if this is a supported file based on magic header values
|
||||
"""
|
||||
@@ -329,7 +333,10 @@ def register_flirt_signature_analyzers(vw, sigpaths):
|
||||
viv_utils.flirt.addFlirtFunctionAnalyzer(vw, analyzer)
|
||||
|
||||
|
||||
def get_default_signatures():
|
||||
def get_default_signatures() -> List[str]:
|
||||
"""
|
||||
compute a list of file system paths to the default FLIRT signatures.
|
||||
"""
|
||||
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
|
||||
logger.debug("detected running under PyInstaller")
|
||||
sigs_path = os.path.join(sys._MEIPASS, "sigs")
|
||||
@@ -401,7 +408,9 @@ class UnsupportedRuntimeError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def get_extractor(path, format, backend, sigpaths, disable_progress=False):
|
||||
def get_extractor(
|
||||
path: str, format: str, backend: str, sigpaths: List[str], disable_progress=False
|
||||
) -> FeatureExtractor:
|
||||
"""
|
||||
raises:
|
||||
UnsupportedFormatError:
|
||||
@@ -439,7 +448,7 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False):
|
||||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
|
||||
|
||||
|
||||
def is_nursery_rule_path(path):
|
||||
def is_nursery_rule_path(path: str) -> bool:
|
||||
"""
|
||||
The nursery is a spot for rules that have not yet been fully polished.
|
||||
For example, they may not have references to public example of a technique.
|
||||
@@ -452,7 +461,7 @@ def is_nursery_rule_path(path):
|
||||
return "nursery" in path
|
||||
|
||||
|
||||
def get_rules(rule_path, disable_progress=False):
|
||||
def get_rules(rule_path: str, disable_progress=False) -> List[Rule]:
|
||||
if not os.path.exists(rule_path):
|
||||
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
|
||||
|
||||
@@ -479,7 +488,7 @@ def get_rules(rule_path, disable_progress=False):
|
||||
rule_path = os.path.join(root, file)
|
||||
rule_paths.append(rule_path)
|
||||
|
||||
rules = []
|
||||
rules = [] # type: List[Rule]
|
||||
|
||||
pbar = tqdm.tqdm
|
||||
if disable_progress:
|
||||
|
||||
@@ -31,6 +31,7 @@ import capa.features.file
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
import capa.features.basicblock
|
||||
from capa.engine import Statement, FeatureSet
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -430,7 +431,7 @@ def second(s: List[Any]) -> Any:
|
||||
|
||||
|
||||
class Rule:
|
||||
def __init__(self, name, scope, statement, meta, definition=""):
|
||||
def __init__(self, name: str, scope: str, statement: Statement, meta, definition=""):
|
||||
super(Rule, self).__init__()
|
||||
self.name = name
|
||||
self.scope = scope
|
||||
@@ -551,7 +552,7 @@ class Rule:
|
||||
for new_rule in self._extract_subscope_rules_rec(self.statement):
|
||||
yield new_rule
|
||||
|
||||
def evaluate(self, features):
|
||||
def evaluate(self, features: FeatureSet):
|
||||
return self.statement.evaluate(features)
|
||||
|
||||
@classmethod
|
||||
|
||||
Reference in New Issue
Block a user