mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
rules: ruleset: add optimized match routine
This commit is contained in:
142
capa/rules.py
142
capa/rules.py
@@ -23,7 +23,7 @@ except ImportError:
|
||||
# https://github.com/python/mypy/issues/1153
|
||||
from backports.functools_lru_cache import lru_cache # type: ignore
|
||||
|
||||
from typing import Any, Dict, List, Union, Iterator
|
||||
from typing import Any, Set, Dict, List, Tuple, Union, Iterator
|
||||
|
||||
import yaml
|
||||
import ruamel.yaml
|
||||
@@ -974,6 +974,15 @@ class RuleSet:
|
||||
self.rules = {rule.name: rule for rule in rules}
|
||||
self.rules_by_namespace = index_rules_by_namespace(rules)
|
||||
|
||||
# unstable
|
||||
(self._easy_file_rules_by_feature, self._hard_file_rules) = self._index_rules_by_feature(self.file_rules)
|
||||
(self._easy_function_rules_by_feature, self._hard_function_rules) = self._index_rules_by_feature(
|
||||
self.function_rules
|
||||
)
|
||||
(self._easy_basic_block_rules_by_feature, self._hard_basic_block_rules) = self._index_rules_by_feature(
|
||||
self.basic_block_rules
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.rules)
|
||||
|
||||
@@ -983,6 +992,88 @@ class RuleSet:
|
||||
def __contains__(self, rulename):
|
||||
return rulename in self.rules
|
||||
|
||||
@staticmethod
|
||||
def _index_rules_by_feature(rules) -> Tuple[Dict[Feature, Set[str]], List[str]]:
|
||||
"""
|
||||
split the given rules into to structures:
|
||||
- "easy rules" are indexed by feature,
|
||||
such that you can quickly find the rules that contain a given feature.
|
||||
- "hard rules" are those that contain substring/regex/bytes features or match statements.
|
||||
these continue to be ordered topologically.
|
||||
|
||||
a rule evaluator can use the "easy rule" index to restrict the
|
||||
candidate rules that might match a given set of features.
|
||||
|
||||
at this time, a rule evaluator can't do anything special with
|
||||
the "hard rules". it must still do a full top-down match of each
|
||||
rule, in topological order.
|
||||
"""
|
||||
|
||||
# we'll do a couple phases:
|
||||
#
|
||||
# 1. recursively visit all nodes in all rules,
|
||||
# a. indexing all features
|
||||
# b. recording the types of features found per rule
|
||||
# 2. compute the easy and hard rule sets
|
||||
# 3. remove hard rules from the rules-by-feature index
|
||||
# 4. construct the topologically ordered list of hard rules
|
||||
rules_with_easy_features: Set[str] = set()
|
||||
rules_with_hard_features: Set[str] = set()
|
||||
rules_by_feature: Dict[Feature, Set[str]] = collections.defaultdict(set)
|
||||
|
||||
def rec(rule: str, node: Union[Feature, Statement]):
|
||||
if isinstance(
|
||||
node,
|
||||
(
|
||||
# these are the "hard features"
|
||||
# substring: scanning feature
|
||||
capa.features.common.Substring,
|
||||
# regex: scanning feature
|
||||
capa.features.common.Regex,
|
||||
# bytes: scanning feature
|
||||
capa.features.common.Bytes,
|
||||
# match: dependency on another rule,
|
||||
# which we have to evaluate first,
|
||||
# and is therefore tricky.
|
||||
capa.features.common.MatchedRule,
|
||||
),
|
||||
):
|
||||
# hard feature: requires scan or match lookup
|
||||
rules_with_hard_features.add(rule)
|
||||
elif isinstance(node, capa.features.common.Feature):
|
||||
# easy feature: hash lookup
|
||||
rules_with_easy_features.add(rule)
|
||||
rules_by_feature[node].add(rule)
|
||||
elif isinstance(node, (ceng.Not, ceng.Range)):
|
||||
return rec(rule, node.child)
|
||||
elif isinstance(node, (ceng.And, ceng.Or, ceng.Some)):
|
||||
for child in node.children:
|
||||
rec(rule, child)
|
||||
else:
|
||||
# programming error
|
||||
raise Exception("programming error: unexpected node type: %s" % (node))
|
||||
|
||||
for rule in rules:
|
||||
rec(rule.meta["name"], rule.statement)
|
||||
|
||||
# if a rule has a hard feature,
|
||||
# dont consider it easy, and therefore,
|
||||
# don't index any of its features.
|
||||
#
|
||||
# otherwise, its an easy rule, and index its features
|
||||
for rules_with_feature in rules_by_feature.values():
|
||||
rules_with_feature.difference_update(rules_with_hard_features)
|
||||
easy_rules_by_feature = rules_by_feature
|
||||
|
||||
# `rules` is already topologically ordered,
|
||||
# so extract our hard set into the topological ordering.
|
||||
hard_rules = []
|
||||
for rule in rules:
|
||||
if rule.meta["name"] in rules_with_hard_features:
|
||||
hard_rules.append(rule.meta["name"])
|
||||
|
||||
return (easy_rules_by_feature, hard_rules)
|
||||
|
||||
@staticmethod
|
||||
def _get_rules_for_scope(rules, scope):
|
||||
"""
|
||||
@@ -1045,3 +1136,52 @@ class RuleSet:
|
||||
rules_filtered.update(set(capa.rules.get_rules_and_dependencies(rules, rule.name)))
|
||||
break
|
||||
return RuleSet(list(rules_filtered))
|
||||
|
||||
def match(self, scope: Scope, features: FeatureSet, va: int) -> Tuple[FeatureSet, ceng.MatchResults]:
|
||||
"""
|
||||
match rules from this ruleset at the given scope against the given features.
|
||||
|
||||
this routine should act just like `capa.engine.match`,
|
||||
except that it may be more performant.
|
||||
"""
|
||||
if scope == scope.FILE:
|
||||
easy_rules_by_feature = self._easy_file_rules_by_feature
|
||||
hard_rule_names = self._hard_file_rules
|
||||
elif scope == scope.FUNCTION:
|
||||
easy_rules_by_feature = self._easy_function_rules_by_feature
|
||||
hard_rule_names = self._hard_function_rules
|
||||
elif scope == scope.BASIC_BLOCK:
|
||||
easy_rules_by_feature = self._easy_basic_block_rules_by_feature
|
||||
hard_rule_names = self._hard_basic_block_rules
|
||||
else:
|
||||
raise Exception("programming error: unexpected scope")
|
||||
|
||||
candidate_rule_names = set()
|
||||
for feature in features:
|
||||
easy_rules = easy_rules_by_feature.get(feature)
|
||||
if easy_rules:
|
||||
candidate_rule_names.update(easy_rules)
|
||||
|
||||
# first, match against the set of rules that have at least one
|
||||
# feature shared with our feature set.
|
||||
candidate_rules = [self.rules[name] for name in candidate_rule_names]
|
||||
features2, easy_matches = ceng.match(candidate_rules, features, va)
|
||||
|
||||
# now, match against (topologically ordered) list of rules
|
||||
# that we can't really make any guesses about.
|
||||
# these are rules with hard features, like substring/regex/bytes and match statements.
|
||||
hard_rules = [self.rules[name] for name in hard_rule_names]
|
||||
features3, hard_matches = ceng.match(hard_rules, features2, va)
|
||||
|
||||
# note that above, we ideally skipping matching a bunch of
|
||||
# rules that probably would never hit.
|
||||
# specifically, "easy rules" that don't share any features with
|
||||
# feature set.
|
||||
|
||||
# MatchResults doesn't technically have an .update() method
|
||||
# but a dict does.
|
||||
matches = {} # type: ignore
|
||||
matches.update(easy_matches)
|
||||
matches.update(hard_matches)
|
||||
|
||||
return (features3, matches)
|
||||
|
||||
Reference in New Issue
Block a user