mirror of
https://github.com/mandiant/capa.git
synced 2026-01-23 17:59:01 -08:00
Compare commits
1 Commits
dependabot
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
120ed65b31 |
7
.github/dependabot.yml
vendored
7
.github/dependabot.yml
vendored
@@ -4,13 +4,6 @@ updates:
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
groups:
|
||||
vivisect:
|
||||
patterns:
|
||||
- "vivisect"
|
||||
- "pyasn1"
|
||||
- "pyasn1-modules"
|
||||
- "msgpack"
|
||||
ignore:
|
||||
- dependency-name: "*"
|
||||
update-types: ["version-update:semver-patch"]
|
||||
|
||||
2
.github/flake8.ini
vendored
2
.github/flake8.ini
vendored
@@ -33,6 +33,8 @@ per-file-ignores =
|
||||
scripts/*: T201
|
||||
# capa.exe is meant to print output
|
||||
capa/main.py: T201
|
||||
# IDA tests emit results to output window so need to print
|
||||
tests/test_ida_features.py: T201
|
||||
# utility used to find the Binary Ninja API via invoking python.exe
|
||||
capa/features/extractors/binja/find_binja_api.py: T201
|
||||
|
||||
|
||||
@@ -136,6 +136,7 @@ repos:
|
||||
- "tests/"
|
||||
- "--ignore=tests/test_binja_features.py"
|
||||
- "--ignore=tests/test_ghidra_features.py"
|
||||
- "--ignore=tests/test_ida_features.py"
|
||||
- "--ignore=tests/test_viv_features.py"
|
||||
- "--ignore=tests/test_idalib_features.py"
|
||||
- "--ignore=tests/test_main.py"
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
### Bug Fixes
|
||||
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
|
||||
- loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800
|
||||
- lint: disable rule caching during linting @Maijin #2817
|
||||
|
||||
### capa Explorer Web
|
||||
|
||||
|
||||
101
capa/engine.py
101
capa/engine.py
@@ -122,11 +122,18 @@ class And(Statement):
|
||||
# short circuit
|
||||
return Result(False, self, results)
|
||||
|
||||
return Result(True, self, results)
|
||||
locations = set()
|
||||
for res in results:
|
||||
locations.update(res.locations)
|
||||
return Result(True, self, results, locations=locations)
|
||||
else:
|
||||
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
|
||||
success = all(results)
|
||||
return Result(success, self, results)
|
||||
locations = set()
|
||||
if success:
|
||||
for res in results:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
|
||||
|
||||
class Or(Statement):
|
||||
@@ -153,13 +160,17 @@ class Or(Statement):
|
||||
results.append(result)
|
||||
if result:
|
||||
# short circuit as soon as we hit one match
|
||||
return Result(True, self, results)
|
||||
return Result(True, self, results, locations=result.locations)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
|
||||
success = any(results)
|
||||
return Result(success, self, results)
|
||||
locations = set()
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
|
||||
|
||||
class Not(Statement):
|
||||
@@ -207,7 +218,11 @@ class Some(Statement):
|
||||
|
||||
if satisfied_children_count >= self.count:
|
||||
# short circuit as soon as we hit the threshold
|
||||
return Result(True, self, results)
|
||||
locations = set()
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(True, self, results, locations=locations)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
@@ -217,7 +232,12 @@ class Some(Statement):
|
||||
#
|
||||
# we can't use `if child is True` because the instance is not True.
|
||||
success = sum([1 for child in results if bool(child) is True]) >= self.count
|
||||
return Result(success, self, results)
|
||||
locations = set()
|
||||
if success:
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
|
||||
|
||||
class Range(Statement):
|
||||
@@ -299,6 +319,75 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
|
||||
features[capa.features.common.MatchedRule(namespace)].update(locations)
|
||||
|
||||
|
||||
class Sequence(Statement):
|
||||
"""
|
||||
match if the children evaluate to True in increasing order of location.
|
||||
|
||||
the order of evaluation is dictated by the property
|
||||
`Sequence.children` (type: list[Statement|Feature]).
|
||||
"""
|
||||
|
||||
def __init__(self, children, description=None):
|
||||
super().__init__(description=description)
|
||||
self.children = children
|
||||
|
||||
def evaluate(self, features: FeatureSet, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.sequence"] += 1
|
||||
|
||||
results = []
|
||||
min_location = None
|
||||
|
||||
for child in self.children:
|
||||
result = child.evaluate(features, short_circuit=short_circuit)
|
||||
results.append(result)
|
||||
|
||||
if not result:
|
||||
# all children must match
|
||||
return Result(False, self, results)
|
||||
|
||||
# Check for location ordering
|
||||
# We want to find *some* location in the child's locations that is greater than
|
||||
# the minimum location from the previous child.
|
||||
#
|
||||
# If this is the first child, we just take its minimum location.
|
||||
|
||||
# The child might match at multiple locations.
|
||||
# We need to be careful to pick a location that allows subsequent children to match.
|
||||
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
|
||||
# This maximizes the "room" for subsequent children.
|
||||
|
||||
valid_locations = sorted(result.locations)
|
||||
if not valid_locations:
|
||||
# This should effectively never happen if `result.success` is True,
|
||||
# unless the feature has no associated location (e.g. global features).
|
||||
# If a feature has no location, we can't enforce order, so strict sequence fails?
|
||||
# OR we assume it "matches anywhere" and doesn't constrain order?
|
||||
#
|
||||
# For now, let's assume valid locations are required for sequence logic.
|
||||
# If a child has no locations, it fails the sequence constraint.
|
||||
return Result(False, self, results)
|
||||
|
||||
if min_location is None:
|
||||
min_location = valid_locations[0]
|
||||
# Filter result to only include this location
|
||||
results[-1] = Result(True, child, result.children, locations={min_location})
|
||||
else:
|
||||
# Find the first location that is strictly greater than min_location
|
||||
found = False
|
||||
for loc in valid_locations:
|
||||
if loc > min_location:
|
||||
min_location = loc
|
||||
found = True
|
||||
results[-1] = Result(True, child, result.children, locations={min_location})
|
||||
break
|
||||
|
||||
if not found:
|
||||
return Result(False, self, results)
|
||||
|
||||
return Result(True, self, results, locations={next(iter(r.locations)) for r in results})
|
||||
|
||||
|
||||
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
|
||||
"""
|
||||
match the given rules against the given features,
|
||||
|
||||
@@ -661,9 +661,7 @@ def get_rules_from_cli(args) -> RuleSet:
|
||||
raises:
|
||||
ShouldExitError: if the program is invoked incorrectly and should exit.
|
||||
"""
|
||||
enable_cache: bool = getattr(args, "enable_cache", True)
|
||||
# this allows calling functions to easily disable rule caching, e.g., used by the rule linter to avoid
|
||||
|
||||
enable_cache: bool = True
|
||||
try:
|
||||
if capa.helpers.is_running_standalone() and args.is_default_rules:
|
||||
cache_dir = get_default_root() / "cache"
|
||||
|
||||
@@ -167,7 +167,9 @@ class CompoundStatementType:
|
||||
AND = "and"
|
||||
OR = "or"
|
||||
NOT = "not"
|
||||
NOT = "not"
|
||||
OPTIONAL = "optional"
|
||||
SEQUENCE = "sequence"
|
||||
|
||||
|
||||
class StatementModel(FrozenModel): ...
|
||||
@@ -213,7 +215,7 @@ class StatementNode(FrozenModel):
|
||||
|
||||
|
||||
def statement_from_capa(node: capa.engine.Statement) -> Statement:
|
||||
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)):
|
||||
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
|
||||
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)
|
||||
|
||||
elif isinstance(node, capa.engine.Some):
|
||||
@@ -280,6 +282,9 @@ def node_to_capa(
|
||||
elif node.statement.type == CompoundStatementType.OPTIONAL:
|
||||
return capa.engine.Some(description=node.statement.description, count=0, children=children)
|
||||
|
||||
elif node.statement.type == CompoundStatementType.SEQUENCE:
|
||||
return capa.engine.Sequence(description=node.statement.description, children=children)
|
||||
|
||||
else:
|
||||
assert_never(node.statement.type)
|
||||
|
||||
|
||||
@@ -635,6 +635,8 @@ def build_statements(d, scopes: Scopes):
|
||||
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "or":
|
||||
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "sequence":
|
||||
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "not":
|
||||
if len(d[key]) != 1:
|
||||
raise InvalidRule("not statement must have exactly one child statement")
|
||||
@@ -1698,7 +1700,7 @@ class RuleSet:
|
||||
# feature is found N times
|
||||
return rec(rule_name, node.child)
|
||||
|
||||
elif isinstance(node, ceng.And):
|
||||
elif isinstance(node, (ceng.And, ceng.Sequence)):
|
||||
# When evaluating an AND block, all of the children need to match.
|
||||
#
|
||||
# So when we index rules, we want to pick the most uncommon feature(s)
|
||||
|
||||
@@ -148,7 +148,7 @@ dev = [
|
||||
"black==25.12.0",
|
||||
"isort==7.0.0",
|
||||
"mypy==1.19.1",
|
||||
"mypy-protobuf==5.0.0",
|
||||
"mypy-protobuf==4.0.0",
|
||||
"PyGithub==2.8.1",
|
||||
"bump-my-version==1.2.4",
|
||||
# type stubs for mypy
|
||||
@@ -165,7 +165,7 @@ build = [
|
||||
# we want all developer environments to be consistent.
|
||||
# These dependencies are not used in production environments
|
||||
# and should not conflict with other libraries/tooling.
|
||||
"pyinstaller==6.18.0",
|
||||
"pyinstaller==6.17.0",
|
||||
"setuptools==80.9.0",
|
||||
"build==1.4.0"
|
||||
]
|
||||
|
||||
@@ -18,14 +18,14 @@ ida-settings==3.2.2
|
||||
intervaltree==3.2.1
|
||||
markdown-it-py==4.0.0
|
||||
mdurl==0.1.2
|
||||
msgpack==1.1.2
|
||||
msgpack==1.0.8
|
||||
networkx==3.4.2
|
||||
pefile==2024.8.26
|
||||
pip==25.3
|
||||
protobuf==6.33.1
|
||||
pyasn1==0.6.2
|
||||
pyasn1-modules==0.4.2
|
||||
pycparser==3.0
|
||||
pyasn1==0.5.1
|
||||
pyasn1-modules==0.3.0
|
||||
pycparser==2.23
|
||||
pydantic==2.12.4
|
||||
# pydantic pins pydantic-core,
|
||||
# but dependabot updates these separately (which is broken) and is annoying,
|
||||
@@ -44,6 +44,6 @@ setuptools==80.9.0
|
||||
six==1.17.0
|
||||
sortedcontainers==2.4.0
|
||||
viv-utils==0.8.0
|
||||
vivisect==1.3.0
|
||||
vivisect==1.2.1
|
||||
msgspec==0.20.0
|
||||
bump-my-version==1.2.4
|
||||
|
||||
@@ -1229,7 +1229,6 @@ def main(argv=None):
|
||||
|
||||
time0 = time.time()
|
||||
|
||||
args.enable_cache = False
|
||||
try:
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import capa.features.address
|
||||
from capa.engine import Or, And, Not, Some, Range
|
||||
from capa.engine import Or, And, Not, Some, Range, Sequence
|
||||
from capa.features.insn import Number
|
||||
|
||||
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
|
||||
@@ -155,3 +155,145 @@ def test_eval_order():
|
||||
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)
|
||||
|
||||
|
||||
def test_sequence():
|
||||
# 1 before 2
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
|
||||
# 2 before 1 (fail)
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
|
||||
# 1 same as 2 (fail)
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False
|
||||
|
||||
# 1 before 2 before 3
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# 1 before 2 before 3 (fail, 3 is early)
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
# 1 before 2 before 3 (fail, 2 is late)
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
# multiple locations for matches
|
||||
# 1 at 1, 2 at 2 (match)
|
||||
# 1 also at 3
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True
|
||||
|
||||
# greedy matching?
|
||||
# 1 at 2, 2 at 3
|
||||
# 1 matches at 2, so min_loc becomes 2.
|
||||
# 2 matches at 3, > 2. Match.
|
||||
# But wait, 1 also matches at 4.
|
||||
# If we picked 4, 1 > 2 would fail? No.
|
||||
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).
|
||||
|
||||
# CASE:
|
||||
# 1 matches at 10.
|
||||
# 2 matches at 5 and 15.
|
||||
# if 2 picks 5, 5 > 10 is False.
|
||||
# if 2 picks 15, 15 > 10 is True. Match.
|
||||
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2)]).evaluate(
|
||||
{
|
||||
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
|
||||
Number(2): {
|
||||
capa.features.address.AbsoluteVirtualAddress(5),
|
||||
capa.features.address.AbsoluteVirtualAddress(15),
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# CASE:
|
||||
# 1 matches at 10 and 20.
|
||||
# 2 matches at 15.
|
||||
# 1 should pick 10. 10 < 15. Match.
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2)]).evaluate(
|
||||
{
|
||||
Number(1): {
|
||||
capa.features.address.AbsoluteVirtualAddress(10),
|
||||
capa.features.address.AbsoluteVirtualAddress(20),
|
||||
},
|
||||
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
|
||||
}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# CASE:
|
||||
# 1 matched at 10.
|
||||
# 2 matched at 15.
|
||||
# 3 matched at 12.
|
||||
# 1 -> 10.
|
||||
# 2 -> 15 (> 10).
|
||||
# 3 -> 12 (not > 15).
|
||||
# Fail.
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{
|
||||
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
|
||||
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
|
||||
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
|
||||
}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_location_propagation():
|
||||
# regression tests for issue where Or/And/Some statements
|
||||
# failed to propagate match locations to their results,
|
||||
# causing Sequence evaluation to fail.
|
||||
|
||||
# Or
|
||||
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1
|
||||
} # short_circuit=True returns first match
|
||||
assert Or([Number(1), Number(2)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
|
||||
).locations == {ADDR1, ADDR2}
|
||||
|
||||
# And
|
||||
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}
|
||||
|
||||
# Some
|
||||
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1
|
||||
} # short_circuit=True returns first sufficient set
|
||||
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1,
|
||||
ADDR2,
|
||||
}
|
||||
|
||||
187
tests/test_ida_features.py
Normal file
187
tests/test_ida_features.py
Normal file
@@ -0,0 +1,187 @@
|
||||
# Copyright 2020 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
run this script from within IDA to test the IDA feature extractor.
|
||||
you must have loaded a file referenced by a test case in order
|
||||
for this to do anything meaningful. for example, mimikatz.exe from testfiles.
|
||||
|
||||
you can invoke from the command line like this:
|
||||
|
||||
& 'C:\\Program Files\\IDA Pro 8.2\\idat.exe' \
|
||||
-S"C:\\Exclusions\\code\\capa\\tests\\test_ida_features.py --CAPA_AUTOEXIT=true" \
|
||||
-A \
|
||||
-Lidalog \
|
||||
'C:\\Exclusions\\code\\capa\\tests\\data\\mimikatz.exe_'
|
||||
|
||||
if you invoke from the command line, and provide the script argument `--CAPA_AUTOEXIT=true`,
|
||||
then the script will exit IDA after running the tests.
|
||||
|
||||
the output (in idalog) will look like this:
|
||||
|
||||
```
|
||||
Loading processor module C:\\Program Files\\IDA Pro 8.2\\procs\\pc.dll for metapc...Initializing processor module metapc...OK
|
||||
Loading type libraries...
|
||||
Autoanalysis subsystem has been initialized.
|
||||
Database for file 'mimikatz.exe_' has been loaded.
|
||||
--------------------------------------------------------------------------------
|
||||
PASS: test_ida_feature_counts/mimikatz-function=0x40E5C2-basic block-7
|
||||
PASS: test_ida_feature_counts/mimikatz-function=0x4702FD-characteristic(calls from)-0
|
||||
SKIP: test_ida_features/294b8d...-function=0x404970,bb=0x404970,insn=0x40499F-string(\r\n\x00:ht)-False
|
||||
SKIP: test_ida_features/64d9f-function=0x10001510,bb=0x100015B0-offset(0x4000)-True
|
||||
...
|
||||
SKIP: test_ida_features/pma16-01-function=0x404356,bb=0x4043B9-arch(i386)-True
|
||||
PASS: test_ida_features/mimikatz-file-import(cabinet.FCIAddFile)-True
|
||||
DONE
|
||||
C:\\Exclusions\\code\\capa\\tests\\test_ida_features.py: Traceback (most recent call last):
|
||||
File "C:\\Program Files\\IDA Pro 8.2\\python\\3\\ida_idaapi.py", line 588, in IDAPython_ExecScript
|
||||
exec(code, g)
|
||||
File "C:/Exclusions/code/capa/tests/test_ida_features.py", line 120, in <module>
|
||||
sys.exit(0)
|
||||
SystemExit: 0
|
||||
-> OK
|
||||
Flushing buffers, please wait...ok
|
||||
```
|
||||
|
||||
Look for lines that start with "FAIL" to identify test failures.
|
||||
"""
|
||||
import io
|
||||
import sys
|
||||
import inspect
|
||||
import logging
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
sys.path.append(str(Path(__file__).parent))
|
||||
import fixtures
|
||||
finally:
|
||||
sys.path.pop()
|
||||
|
||||
|
||||
logger = logging.getLogger("test_ida_features")
|
||||
|
||||
|
||||
def check_input_file(wanted):
|
||||
import idautils
|
||||
|
||||
# some versions (7.4) of IDA return a truncated version of the MD5.
|
||||
# https://github.com/idapython/bin/issues/11
|
||||
try:
|
||||
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
|
||||
except UnicodeDecodeError:
|
||||
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
|
||||
# rather than the hex digest
|
||||
found = bytes.hex(idautils.GetInputFileMD5()[:15]).lower()
|
||||
|
||||
if not wanted.startswith(found):
|
||||
raise RuntimeError(f"please run the tests against sample with MD5: `{wanted}`")
|
||||
|
||||
|
||||
def get_ida_extractor(_path):
|
||||
# have to import this inline so pytest doesn't bail outside of IDA
|
||||
import capa.features.extractors.ida.extractor
|
||||
|
||||
return capa.features.extractors.ida.extractor.IdaFeatureExtractor()
|
||||
|
||||
|
||||
def nocollect(f):
|
||||
"don't collect the decorated function as a pytest test"
|
||||
f.__test__ = False
|
||||
return f
|
||||
|
||||
|
||||
# although these look like pytest tests, they're not, because they don't run within pytest
|
||||
# (the runner is below) and they use `yield`, which is deprecated.
|
||||
@nocollect
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_features():
|
||||
# we're guaranteed to be in a function here, so there's a stack frame
|
||||
this_name = inspect.currentframe().f_code.co_name # type: ignore
|
||||
for sample, scope, feature, expected in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA:
|
||||
id = fixtures.make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(fixtures.get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
yield this_name, id, "skip", None
|
||||
continue
|
||||
|
||||
scope = fixtures.resolve_scope(scope)
|
||||
sample = fixtures.resolve_sample(sample)
|
||||
|
||||
try:
|
||||
fixtures.do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception:
|
||||
f = io.StringIO()
|
||||
traceback.print_exc(file=f)
|
||||
yield this_name, id, "fail", f.getvalue()
|
||||
else:
|
||||
yield this_name, id, "pass", None
|
||||
|
||||
|
||||
@nocollect
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_feature_counts():
|
||||
# we're guaranteed to be in a function here, so there's a stack frame
|
||||
this_name = inspect.currentframe().f_code.co_name # type: ignore
|
||||
for sample, scope, feature, expected in fixtures.FEATURE_COUNT_TESTS:
|
||||
id = fixtures.make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(fixtures.get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
yield this_name, id, "skip", None
|
||||
continue
|
||||
|
||||
scope = fixtures.resolve_scope(scope)
|
||||
sample = fixtures.resolve_sample(sample)
|
||||
|
||||
try:
|
||||
fixtures.do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception:
|
||||
f = io.StringIO()
|
||||
traceback.print_exc(file=f)
|
||||
yield this_name, id, "fail", f.getvalue()
|
||||
else:
|
||||
yield this_name, id, "pass", None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import idc
|
||||
import ida_auto
|
||||
|
||||
ida_auto.auto_wait()
|
||||
|
||||
print("-" * 80)
|
||||
|
||||
# invoke all functions in this module that start with `test_`
|
||||
for name in dir(sys.modules[__name__]):
|
||||
if not name.startswith("test_"):
|
||||
continue
|
||||
|
||||
test = getattr(sys.modules[__name__], name)
|
||||
logger.debug("invoking test: %s", name)
|
||||
sys.stderr.flush()
|
||||
for name, id, state, info in test():
|
||||
print(f"{state.upper()}: {name}/{id}")
|
||||
if info:
|
||||
print(info)
|
||||
|
||||
print("DONE")
|
||||
|
||||
if "--CAPA_AUTOEXIT=true" in idc.ARGV:
|
||||
sys.exit(0)
|
||||
@@ -80,6 +80,28 @@ def test_rule_yaml():
|
||||
assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True
|
||||
|
||||
|
||||
def test_rule_yaml_sequence():
|
||||
rule = textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test rule
|
||||
scopes:
|
||||
static: function
|
||||
dynamic: process
|
||||
features:
|
||||
- sequence:
|
||||
- number: 1
|
||||
- number: 2
|
||||
"""
|
||||
)
|
||||
r = capa.rules.Rule.from_yaml(rule)
|
||||
# 1 before 2 -> Match
|
||||
assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
|
||||
# 2 before 1 -> No match
|
||||
assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
|
||||
|
||||
|
||||
def test_rule_yaml_complex():
|
||||
rule = textwrap.dedent(
|
||||
"""
|
||||
@@ -1653,3 +1675,70 @@ def test_circular_dependency():
|
||||
]
|
||||
with pytest.raises(capa.rules.InvalidRule):
|
||||
list(capa.rules.get_rules_and_dependencies(rules, rules[0].name))
|
||||
|
||||
|
||||
def test_rule_yaml_sequence_with_subscope():
|
||||
# This test mimics the dynamic analysis flow to verify Sequence with subscopes.
|
||||
rule_yaml = textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test sequence subscope
|
||||
scopes:
|
||||
static: function
|
||||
dynamic: span of calls
|
||||
features:
|
||||
- sequence:
|
||||
- call:
|
||||
- number: 1
|
||||
- number: 2
|
||||
"""
|
||||
)
|
||||
# 1. Load rules (triggers subscope extraction)
|
||||
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)])
|
||||
|
||||
# 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls)
|
||||
call_rules = rules.rules_by_scope[capa.rules.Scope.CALL]
|
||||
span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS]
|
||||
assert len(call_rules) == 1
|
||||
assert len(span_rules) == 1
|
||||
|
||||
main_rule = span_rules[0]
|
||||
subscope_rule = call_rules[0]
|
||||
|
||||
# 3. Simulate features
|
||||
# Call 1: Number(1) -> Matches subscope rule
|
||||
# Call 2: Number(2) -> Matches second part of sequence
|
||||
|
||||
# Address setup
|
||||
thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1)
|
||||
call1_addr = capa.features.address.DynamicCallAddress(thread, 1)
|
||||
call2_addr = capa.features.address.DynamicCallAddress(thread, 2)
|
||||
|
||||
features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}}
|
||||
|
||||
# 4. Match Call Scope Rules (Simulate find_call_capabilities)
|
||||
# Match subscope rule against Call 1
|
||||
# We need to filter features to just Call 1 for this rule?
|
||||
# Actually, RuleSet.match takes features.
|
||||
|
||||
# Match at Call 1
|
||||
_, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr)
|
||||
# Should match subscope rule
|
||||
assert subscope_rule.name in matches1
|
||||
|
||||
# Index the match
|
||||
capa.engine.index_rule_matches(features, subscope_rule, [call1_addr])
|
||||
|
||||
# 5. Match Span Scope Rules (Simulate find_span_capabilities)
|
||||
# Now features contains MatchedRule(subscope_rule).
|
||||
# Sequence should see:
|
||||
# - call: matches subscope_rule at call1_addr
|
||||
# - number: 2 at call2_addr
|
||||
# call1_addr (id=1) < call2_addr (id=2). Sequence matches.
|
||||
|
||||
_, matches_span = rules.match(
|
||||
capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr
|
||||
) # addr doesn't matter much for span match logic itself, but passed to result
|
||||
|
||||
assert main_rule.name in matches_span
|
||||
|
||||
9
web/explorer/package-lock.json
generated
9
web/explorer/package-lock.json
generated
@@ -2756,11 +2756,10 @@
|
||||
}
|
||||
},
|
||||
"node_modules/lodash": {
|
||||
"version": "4.17.23",
|
||||
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.23.tgz",
|
||||
"integrity": "sha512-LgVTMpQtIopCi79SJeDiP0TfWi5CNEc/L/aRdTh3yIvmZXTnheWpKjSZhnvMl8iXbC1tFg9gdHHDMLoV7CnG+w==",
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
"version": "4.17.21",
|
||||
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
||||
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/lodash.merge": {
|
||||
"version": "4.6.2",
|
||||
|
||||
Reference in New Issue
Block a user