sequence: documentation and tests

sequence: add more tests
This commit is contained in:
Willi Ballenthin
2024-12-16 15:51:35 +00:00
committed by Willi Ballenthin
parent 86908c9025
commit 39319c57a4
4 changed files with 117 additions and 12 deletions

View File

@@ -18,13 +18,12 @@ import itertools
import collections
from dataclasses import dataclass
from capa.features.address import NO_ADDRESS
import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.common import Feature
from capa.features.address import _NoAddress
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor
@@ -137,9 +136,10 @@ def find_thread_capabilities(
if len(sequence_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = sequence_feature_sets.popleft()
# these are the top-level features that will no longer have any associated addresses.
for feature, vas in overflowing_feature_set.items():
if vas == { NO_ADDRESS, }:
if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress):
# `vas == { NO_ADDRESS }` without the garbage.
#
# ignore the common case of global features getting added/removed/trimmed repeatedly,
# like arch/os/format.
continue
@@ -238,7 +238,7 @@ def find_process_capabilities(
def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress: bool = False
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)

View File

@@ -488,11 +488,11 @@ class DynamicFeatureExtractor:
raise NotImplementedError()
def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> DynamicFeatureExtractor:
def ProcessFilter(extractor: DynamicFeatureExtractor, pids: set[int]) -> DynamicFeatureExtractor:
original_get_processes = extractor.get_processes
def filtered_get_processes(self):
yield from (f for f in original_get_processes() if f.address.pid in processes)
yield from (f for f in original_get_processes() if f.address.pid in pids)
# we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one.
# this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code.
@@ -504,7 +504,7 @@ def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> Dynamic
return new_extractor
def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set) -> DynamicFeatureExtractor:
def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set[Address]) -> DynamicFeatureExtractor:
original_get_threads = extractor.get_threads
def filtered_get_threads(self, ph: ProcessHandle):

View File

@@ -643,7 +643,7 @@ def build_statements(d, scopes: Scopes):
)
elif key == "sequence":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD)):
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE)):
raise InvalidRule("sequence subscope supported only for the process and thread scopes")
if len(d[key]) != 1:

View File

@@ -27,11 +27,14 @@
# ...
import textwrap
from typing import Iterator
from functools import lru_cache
import pytest
import fixtures
import capa.main
import capa.rules
import capa.capabilities.dynamic
from capa.features.extractors.base_extractor import ThreadFilter, DynamicFeatureExtractor
@@ -62,7 +65,7 @@ def get_0000a657_thread3064():
return extractor
def get_call_ids(matches):
def get_call_ids(matches) -> Iterator[int]:
for address, _ in matches:
yield address.id
@@ -96,7 +99,7 @@ def test_dynamic_call_scope():
assert 8 in get_call_ids(matches[r.name])
# match the first 5-tuple sequence.
# match the first sequence.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -147,7 +150,7 @@ def test_dynamic_sequence_scope():
# call 14: RtlAddVectoredExceptionHandler(1921490089, 0)
# call 15: GetSystemTime()
# call 16: NtAllocateVirtualMemory(no, 4, 786432, 4784128, 4294967295)
def test_dynamic_sequence_scope2():
def test_dynamic_sequence_scope_length():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -178,6 +181,108 @@ def test_dynamic_sequence_scope2():
assert r.name not in capabilities.matches
# show that you can use a call subscope in sequence rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_call_subscope():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- call:
- and:
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
"""
)
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 11 in get_call_ids(capabilities.matches[r.name])
# show that you can use a sequence subscope in sequence rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 10: LdrGetDllHandle(1974337536, kernel32.dll)
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_scope_sequence_subscope():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- sequence:
- description: resolve add VEH # should match at 11
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
- sequence:
- description: resolve remove VEH # should match at 13
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: RemoveVectoredExceptionHandler
"""
)
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 13 in get_call_ids(capabilities.matches[r.name])
# show that you can't use thread subscope in sequence rules.
def test_dynamic_sequence_scope_thread_subscope():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- thread:
- string: "foo"
"""
)
with pytest.raises(capa.rules.InvalidRule):
capa.rules.Rule.from_yaml(rule)
# show how you might use a sequence rule: to match a small window for a collection of features.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)