From 294ff34a30083566d48617a999c2c9e00b8e2392 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Thu, 12 Dec 2024 15:13:10 +0000 Subject: [PATCH] sequence: only match first overlapping sequence also, for repeating behavior, match only the first instance. --- capa/capabilities/dynamic.py | 39 +++++++++++++++++++++++++++- tests/test_dynamic_sequence_scope.py | 21 +++++++++------ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index cb553e30..8ab14ea0 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -29,6 +29,13 @@ from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, Pr logger = logging.getLogger(__name__) +# The number of calls that make up a sequence. +# +# The larger this is, the more calls are grouped together to match rule logic. +# This means a longer chain can be recognized; however, its a bit more expensive. +SEQUENCE_SIZE = 20 + + @dataclass class CallCapabilities: features: FeatureSet @@ -76,7 +83,8 @@ def find_thread_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle ) -> ThreadCapabilities: """ - find matches for the given rules within the given thread. + find matches for the given rules within the given thread, + which includes matches for all the sequences and calls within it. """ # all features found within this thread, # includes features found within calls. @@ -89,8 +97,18 @@ def find_thread_capabilities( # matches found at the sequence scope. sequence_matches: MatchResults = collections.defaultdict(list) + # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. + # + # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, + # merging all their features and doing a match. + # Here's the primary data structure: a deque of those features found in the prior calls. + # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) + # the names of rules matched at the last sequence, + # so that we can deduplicate long strings of the same matche. + last_sequence_matches: set[str] = set() + for ch in extractor.get_calls(ph, th): call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) for feature, vas in call_capabilities.features.items(): @@ -99,7 +117,13 @@ def find_thread_capabilities( for rule_name, res in call_capabilities.matches.items(): call_matches[rule_name].extend(res) + # + # sequence scope matching + # + # as we add items to the end of the deque, the oldest items will overflow and get dropped. sequence.append(call_capabilities.features) + # collect all the features seen across the last SEQUENCE_SIZE calls, + # and match against them. sequence_features: FeatureSet = collections.defaultdict(set) for call in sequence: for feature, vas in call.items(): @@ -107,8 +131,21 @@ def find_thread_capabilities( _, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address) for rule_name, res in smatches.items(): + if rule_name in last_sequence_matches: + # don't emit match results for rules seen during the immediately preceeding sequence. + # + # This means that we won't emit duplicate matches when there are multiple sequences + # that overlap a single matching event. + # It also handles the case of a tight loop containing matched logic; + # only the first match will be recorded. + # + # In theory, this means the result document doesn't have *every* possible match location, + # but in practice, humans will only be interested in the first handful anyways. + continue sequence_matches[rule_name].extend(res) + last_sequence_matches = set(smatches.keys()) + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): features[feature].add(va) diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 810dc5b3..4b423fe0 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -134,7 +134,7 @@ def test_dynamic_sequence_scope(): assert 12 in get_call_ids(matches[r.name]) -# show the sequence is only 5 calls long, and doesn't match beyond that 5-tuple. +# show that when the sequence is only 5 calls long (for example), it doesn't match beyond that 5-tuple. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -168,8 +168,14 @@ def test_dynamic_sequence_scope2(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name not in matches + # patch SEQUENCE_SIZE since we may use a much larger value in the real world. + from pytest import MonkeyPatch + + with MonkeyPatch.context() as m: + m.setattr(capa.capabilities.dynamic, "SEQUENCE_SIZE", 5) + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + + assert r.name not in capabilities.matches # show how you might use a sequence rule: to match a small window for a collection of features. @@ -215,7 +221,6 @@ def test_dynamic_sequence_example(): # show how sequences that overlap a single event are handled. -# TODO(williballenthin): but I think we really just want one match for this, not copies of the same thing. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -250,7 +255,7 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name in matches - assert [11, 12, 13, 14, 15] == list(get_call_ids(matches[r.name])) - + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + # we only match the first overlapping sequence + assert [11] == list(get_call_ids(capabilities.matches[r.name]))