mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
sequence: only match first overlapping sequence
also, for repeating behavior, match only the first instance.
This commit is contained in:
committed by
Willi Ballenthin
parent
b06fea130c
commit
294ff34a30
@@ -29,6 +29,13 @@ from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, Pr
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# The number of calls that make up a sequence.
|
||||
#
|
||||
# The larger this is, the more calls are grouped together to match rule logic.
|
||||
# This means a longer chain can be recognized; however, its a bit more expensive.
|
||||
SEQUENCE_SIZE = 20
|
||||
|
||||
|
||||
@dataclass
|
||||
class CallCapabilities:
|
||||
features: FeatureSet
|
||||
@@ -76,7 +83,8 @@ def find_thread_capabilities(
|
||||
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
|
||||
) -> ThreadCapabilities:
|
||||
"""
|
||||
find matches for the given rules within the given thread.
|
||||
find matches for the given rules within the given thread,
|
||||
which includes matches for all the sequences and calls within it.
|
||||
"""
|
||||
# all features found within this thread,
|
||||
# includes features found within calls.
|
||||
@@ -89,8 +97,18 @@ def find_thread_capabilities(
|
||||
# matches found at the sequence scope.
|
||||
sequence_matches: MatchResults = collections.defaultdict(list)
|
||||
|
||||
# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
|
||||
#
|
||||
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
|
||||
# merging all their features and doing a match.
|
||||
# Here's the primary data structure: a deque of those features found in the prior calls.
|
||||
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
|
||||
sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
|
||||
|
||||
# the names of rules matched at the last sequence,
|
||||
# so that we can deduplicate long strings of the same matche.
|
||||
last_sequence_matches: set[str] = set()
|
||||
|
||||
for ch in extractor.get_calls(ph, th):
|
||||
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
|
||||
for feature, vas in call_capabilities.features.items():
|
||||
@@ -99,7 +117,13 @@ def find_thread_capabilities(
|
||||
for rule_name, res in call_capabilities.matches.items():
|
||||
call_matches[rule_name].extend(res)
|
||||
|
||||
#
|
||||
# sequence scope matching
|
||||
#
|
||||
# as we add items to the end of the deque, the oldest items will overflow and get dropped.
|
||||
sequence.append(call_capabilities.features)
|
||||
# collect all the features seen across the last SEQUENCE_SIZE calls,
|
||||
# and match against them.
|
||||
sequence_features: FeatureSet = collections.defaultdict(set)
|
||||
for call in sequence:
|
||||
for feature, vas in call.items():
|
||||
@@ -107,8 +131,21 @@ def find_thread_capabilities(
|
||||
|
||||
_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address)
|
||||
for rule_name, res in smatches.items():
|
||||
if rule_name in last_sequence_matches:
|
||||
# don't emit match results for rules seen during the immediately preceeding sequence.
|
||||
#
|
||||
# This means that we won't emit duplicate matches when there are multiple sequences
|
||||
# that overlap a single matching event.
|
||||
# It also handles the case of a tight loop containing matched logic;
|
||||
# only the first match will be recorded.
|
||||
#
|
||||
# In theory, this means the result document doesn't have *every* possible match location,
|
||||
# but in practice, humans will only be interested in the first handful anyways.
|
||||
continue
|
||||
sequence_matches[rule_name].extend(res)
|
||||
|
||||
last_sequence_matches = set(smatches.keys())
|
||||
|
||||
for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
|
||||
features[feature].add(va)
|
||||
|
||||
|
||||
@@ -134,7 +134,7 @@ def test_dynamic_sequence_scope():
|
||||
assert 12 in get_call_ids(matches[r.name])
|
||||
|
||||
|
||||
# show the sequence is only 5 calls long, and doesn't match beyond that 5-tuple.
|
||||
# show that when the sequence is only 5 calls long (for example), it doesn't match beyond that 5-tuple.
|
||||
#
|
||||
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
|
||||
# thread: 3064
|
||||
@@ -168,8 +168,14 @@ def test_dynamic_sequence_scope2():
|
||||
r = capa.rules.Rule.from_yaml(rule)
|
||||
ruleset = capa.rules.RuleSet([r])
|
||||
|
||||
matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
|
||||
assert r.name not in matches
|
||||
# patch SEQUENCE_SIZE since we may use a much larger value in the real world.
|
||||
from pytest import MonkeyPatch
|
||||
|
||||
with MonkeyPatch.context() as m:
|
||||
m.setattr(capa.capabilities.dynamic, "SEQUENCE_SIZE", 5)
|
||||
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
|
||||
|
||||
assert r.name not in capabilities.matches
|
||||
|
||||
|
||||
# show how you might use a sequence rule: to match a small window for a collection of features.
|
||||
@@ -215,7 +221,6 @@ def test_dynamic_sequence_example():
|
||||
|
||||
|
||||
# show how sequences that overlap a single event are handled.
|
||||
# TODO(williballenthin): but I think we really just want one match for this, not copies of the same thing.
|
||||
#
|
||||
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
|
||||
# thread: 3064
|
||||
@@ -250,7 +255,7 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
|
||||
r = capa.rules.Rule.from_yaml(rule)
|
||||
ruleset = capa.rules.RuleSet([r])
|
||||
|
||||
matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
|
||||
assert r.name in matches
|
||||
assert [11, 12, 13, 14, 15] == list(get_call_ids(matches[r.name]))
|
||||
|
||||
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
|
||||
assert r.name in capabilities.matches
|
||||
# we only match the first overlapping sequence
|
||||
assert [11] == list(get_call_ids(capabilities.matches[r.name]))
|
||||
|
||||
Reference in New Issue
Block a user