mirror of
https://github.com/mandiant/capa.git
synced 2026-01-21 17:03:24 -08:00
Compare commits
7 Commits
feature/se
...
0ba5f9664a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ba5f9664a | ||
|
|
98873c8570 | ||
|
|
3687bb95e9 | ||
|
|
7175714f9e | ||
|
|
32c201d9b1 | ||
|
|
784e0346d9 | ||
|
|
be1ccb0776 |
1
.github/pyinstaller/pyinstaller.spec
vendored
1
.github/pyinstaller/pyinstaller.spec
vendored
@@ -74,7 +74,6 @@ a = Analysis(
|
||||
# only be installed locally.
|
||||
"binaryninja",
|
||||
"ida",
|
||||
"ghidra",
|
||||
# remove once https://github.com/mandiant/capa/issues/2681 has
|
||||
# been addressed by PyInstaller
|
||||
"pkg_resources",
|
||||
|
||||
23
.github/workflows/build.yml
vendored
23
.github/workflows/build.yml
vendored
@@ -28,11 +28,6 @@ jobs:
|
||||
artifact_name: capa
|
||||
asset_name: linux
|
||||
python_version: '3.10'
|
||||
# for Ghidra
|
||||
java-version: '21'
|
||||
ghidra-version: '12.0'
|
||||
public-version: 'PUBLIC_20251205'
|
||||
ghidra-sha256: 'af43e8cfb2fa4490cf6020c3a2bde25c159d83f45236a0542688a024e8fc1941'
|
||||
- os: ubuntu-22.04-arm
|
||||
artifact_name: capa
|
||||
asset_name: linux-arm64
|
||||
@@ -111,24 +106,6 @@ jobs:
|
||||
run: |
|
||||
7z e "tests/data/dynamic/cape/v2.2/d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json.gz"
|
||||
dist/capa -d "d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json"
|
||||
- name: Set up Java ${{ matrix.java-version }}
|
||||
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
|
||||
uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0
|
||||
with:
|
||||
distribution: 'temurin'
|
||||
java-version: ${{ matrix.java-version }}
|
||||
- name: Install Ghidra ${{ matrix.ghidra-version }}
|
||||
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
|
||||
run: |
|
||||
mkdir ./.github/ghidra
|
||||
wget "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${{ matrix.ghidra-version }}_build/ghidra_${{ matrix.ghidra-version }}_${{ matrix.public-version }}.zip" -O ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip
|
||||
echo "${{ matrix.ghidra-sha256 }} ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip" | sha256sum -c -
|
||||
unzip .github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip -d .github/ghidra/
|
||||
- name: Does it run (Ghidra)?
|
||||
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
|
||||
env:
|
||||
GHIDRA_INSTALL_DIR: ${{ github.workspace }}/.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC
|
||||
run: dist/capa -b ghidra -d "tests/data/Practical Malware Analysis Lab 01-01.dll_"
|
||||
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
|
||||
with:
|
||||
name: ${{ matrix.asset_name }}
|
||||
|
||||
59
.github/workflows/tests.yml
vendored
59
.github/workflows/tests.yml
vendored
@@ -174,8 +174,7 @@ jobs:
|
||||
python-version: ["3.10", "3.13"]
|
||||
java-version: ["21"]
|
||||
ghidra-version: ["12.0"]
|
||||
public-version: ["PUBLIC_20251205"] # for ghidra releases
|
||||
ghidra-sha256: ['af43e8cfb2fa4490cf6020c3a2bde25c159d83f45236a0542688a024e8fc1941']
|
||||
public-version: ["PUBLIC_20251205"] # for ghidra releases
|
||||
steps:
|
||||
- name: Checkout capa with submodules
|
||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
@@ -194,66 +193,14 @@ jobs:
|
||||
run: |
|
||||
mkdir ./.github/ghidra
|
||||
wget "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${{ matrix.ghidra-version }}_build/ghidra_${{ matrix.ghidra-version }}_${{ matrix.public-version }}.zip" -O ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip
|
||||
echo "${{ matrix.ghidra-sha256 }} ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip" | sha256sum -c -
|
||||
unzip .github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip -d .github/ghidra/
|
||||
- name: Install pyyaml
|
||||
run: sudo apt-get install -y libyaml-dev
|
||||
- name: Install capa with Ghidra extra
|
||||
- name: Install capa
|
||||
run: |
|
||||
pip install -e .[dev,ghidra]
|
||||
pip install -e .[dev]
|
||||
- name: Run tests
|
||||
env:
|
||||
GHIDRA_INSTALL_DIR: ${{ github.workspace }}/.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC
|
||||
run: pytest -v tests/test_ghidra_features.py
|
||||
|
||||
idalib-tests:
|
||||
name: IDA ${{ matrix.ida.version }} tests for ${{ matrix.python-version }}
|
||||
runs-on: ubuntu-22.04
|
||||
needs: [tests]
|
||||
env:
|
||||
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.13"]
|
||||
ida:
|
||||
- version: 9.0
|
||||
slug: "release/9.0/ida-essential/ida-essential_90_x64linux.run"
|
||||
- version: 9.1
|
||||
slug: "release/9.1/ida-essential/ida-essential_91_x64linux.run"
|
||||
- version: 9.2
|
||||
slug: "release/9.2/ida-essential/ida-essential_92_x64linux.run"
|
||||
steps:
|
||||
- name: Checkout capa with submodules
|
||||
# do only run if IDA_LICENSE_ID is available, have to do this in every step, see https://github.com/orgs/community/discussions/26726#discussioncomment-3253118
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
with:
|
||||
submodules: recursive
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Setup uv
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7.2.0
|
||||
- name: Install dependencies
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
run: sudo apt-get install -y libyaml-dev
|
||||
- name: Install capa
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
run: |
|
||||
pip install -r requirements.txt
|
||||
pip install -e .[dev,scripts]
|
||||
pip install idapro
|
||||
- name: Install IDA ${{ matrix.ida.version }}
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
run: |
|
||||
uv run hcli --disable-updates ida install --download-id ${{ matrix.ida.slug }} --license-id ${{ secrets.IDA_LICENSE_ID }} --set-default --yes
|
||||
env:
|
||||
HCLI_API_KEY: ${{ secrets.HCLI_API_KEY }}
|
||||
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
|
||||
- name: Run tests
|
||||
if: ${{ env.IDA_LICENSE_ID != 0 }}
|
||||
run: pytest -v tests/test_idalib_features.py # explicitly refer to the idalib tests for performance. other tests run above.
|
||||
|
||||
@@ -138,7 +138,6 @@ repos:
|
||||
- "--ignore=tests/test_ghidra_features.py"
|
||||
- "--ignore=tests/test_ida_features.py"
|
||||
- "--ignore=tests/test_viv_features.py"
|
||||
- "--ignore=tests/test_idalib_features.py"
|
||||
- "--ignore=tests/test_main.py"
|
||||
- "--ignore=tests/test_scripts.py"
|
||||
always_run: true
|
||||
|
||||
@@ -8,18 +8,16 @@
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
### New Rules (5)
|
||||
### New Rules (4)
|
||||
|
||||
- nursery/run-as-nodejs-native-module mehunhoff@google.com
|
||||
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_io still@teamt5.org
|
||||
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_timer still@teamt5.org
|
||||
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_work still@teamt5.org
|
||||
- data-manipulation/encryption/hc-256/encrypt-data-using-hc-256 wballenthin@hex-rays.com
|
||||
-
|
||||
|
||||
### Bug Fixes
|
||||
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
|
||||
- loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800
|
||||
|
||||
### capa Explorer Web
|
||||
|
||||
@@ -58,7 +56,6 @@ Additionally a Binary Ninja bug has been fixed. Released binaries now include AR
|
||||
### New Features
|
||||
|
||||
- ci: add support for arm64 binary releases
|
||||
- tests: run tests against IDA via idalib @williballenthin #2742
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
@@ -291,17 +291,11 @@ It also uses your local changes to the .idb to extract better features, such as
|
||||

|
||||
|
||||
# Ghidra integration
|
||||
|
||||
capa supports using Ghidra (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) as a feature extraction backend. This allows you to run capa against binaries using Ghidra's analysis engine.
|
||||
|
||||
You can run and view capa results in the Ghidra UI using [capa explorer for Ghidra](https://github.com/mandiant/capa/tree/master/capa/ghidra/plugin).
|
||||
If you use Ghidra, then you can use the [capa + Ghidra integration](/capa/ghidra/) to run capa's analysis directly on your Ghidra database and render the results in Ghidra's user interface.
|
||||
|
||||
<img src="https://github.com/mandiant/capa/assets/66766340/eeae33f4-99d4-42dc-a5e8-4c1b8c661492" width=300>
|
||||
|
||||
You can also run capa from the command line using the [Ghidra backend](https://github.com/mandiant/capa/tree/master/capa/ghidra).
|
||||
|
||||
# blog posts
|
||||
- [Riding Dragons: capa Harnesses Ghidra](https://www.mandiant.com/resources/blog/capa-harnesses-ghidra)
|
||||
- [Dynamic capa: Exploring Executable Run-Time Behavior with the CAPE Sandbox](https://www.mandiant.com/resources/blog/dynamic-capa-executable-behavior-cape-sandbox)
|
||||
- [capa v4: casting a wider .NET](https://www.mandiant.com/resources/blog/capa-v4-casting-wider-net) (.NET support)
|
||||
- [ELFant in the Room – capa v3](https://www.mandiant.com/resources/elfant-in-the-room-capa-v3) (ELF support)
|
||||
|
||||
101
capa/engine.py
101
capa/engine.py
@@ -122,18 +122,11 @@ class And(Statement):
|
||||
# short circuit
|
||||
return Result(False, self, results)
|
||||
|
||||
locations = set()
|
||||
for res in results:
|
||||
locations.update(res.locations)
|
||||
return Result(True, self, results, locations=locations)
|
||||
return Result(True, self, results)
|
||||
else:
|
||||
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
|
||||
success = all(results)
|
||||
locations = set()
|
||||
if success:
|
||||
for res in results:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Or(Statement):
|
||||
@@ -160,17 +153,13 @@ class Or(Statement):
|
||||
results.append(result)
|
||||
if result:
|
||||
# short circuit as soon as we hit one match
|
||||
return Result(True, self, results, locations=result.locations)
|
||||
return Result(True, self, results)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
|
||||
success = any(results)
|
||||
locations = set()
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Not(Statement):
|
||||
@@ -218,11 +207,7 @@ class Some(Statement):
|
||||
|
||||
if satisfied_children_count >= self.count:
|
||||
# short circuit as soon as we hit the threshold
|
||||
locations = set()
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(True, self, results, locations=locations)
|
||||
return Result(True, self, results)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
@@ -232,12 +217,7 @@ class Some(Statement):
|
||||
#
|
||||
# we can't use `if child is True` because the instance is not True.
|
||||
success = sum([1 for child in results if bool(child) is True]) >= self.count
|
||||
locations = set()
|
||||
if success:
|
||||
for res in results:
|
||||
if res.success:
|
||||
locations.update(res.locations)
|
||||
return Result(success, self, results, locations=locations)
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Range(Statement):
|
||||
@@ -319,75 +299,6 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
|
||||
features[capa.features.common.MatchedRule(namespace)].update(locations)
|
||||
|
||||
|
||||
class Sequence(Statement):
|
||||
"""
|
||||
match if the children evaluate to True in increasing order of location.
|
||||
|
||||
the order of evaluation is dictated by the property
|
||||
`Sequence.children` (type: list[Statement|Feature]).
|
||||
"""
|
||||
|
||||
def __init__(self, children, description=None):
|
||||
super().__init__(description=description)
|
||||
self.children = children
|
||||
|
||||
def evaluate(self, features: FeatureSet, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.sequence"] += 1
|
||||
|
||||
results = []
|
||||
min_location = None
|
||||
|
||||
for child in self.children:
|
||||
result = child.evaluate(features, short_circuit=short_circuit)
|
||||
results.append(result)
|
||||
|
||||
if not result:
|
||||
# all children must match
|
||||
return Result(False, self, results)
|
||||
|
||||
# Check for location ordering
|
||||
# We want to find *some* location in the child's locations that is greater than
|
||||
# the minimum location from the previous child.
|
||||
#
|
||||
# If this is the first child, we just take its minimum location.
|
||||
|
||||
# The child might match at multiple locations.
|
||||
# We need to be careful to pick a location that allows subsequent children to match.
|
||||
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
|
||||
# This maximizes the "room" for subsequent children.
|
||||
|
||||
valid_locations = sorted(result.locations)
|
||||
if not valid_locations:
|
||||
# This should effectively never happen if `result.success` is True,
|
||||
# unless the feature has no associated location (e.g. global features).
|
||||
# If a feature has no location, we can't enforce order, so strict sequence fails?
|
||||
# OR we assume it "matches anywhere" and doesn't constrain order?
|
||||
#
|
||||
# For now, let's assume valid locations are required for sequence logic.
|
||||
# If a child has no locations, it fails the sequence constraint.
|
||||
return Result(False, self, results)
|
||||
|
||||
if min_location is None:
|
||||
min_location = valid_locations[0]
|
||||
# Filter result to only include this location
|
||||
results[-1] = Result(True, child, result.children, locations={min_location})
|
||||
else:
|
||||
# Find the first location that is strictly greater than min_location
|
||||
found = False
|
||||
for loc in valid_locations:
|
||||
if loc > min_location:
|
||||
min_location = loc
|
||||
found = True
|
||||
results[-1] = Result(True, child, result.children, locations={min_location})
|
||||
break
|
||||
|
||||
if not found:
|
||||
return Result(False, self, results)
|
||||
|
||||
return Result(True, self, results, locations={next(iter(r.locations)) for r in results})
|
||||
|
||||
|
||||
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
|
||||
"""
|
||||
match the given rules against the given features,
|
||||
|
||||
@@ -35,7 +35,7 @@ from capa.features.extractors.base_extractor import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE", "2.5-CAPE"}
|
||||
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE"}
|
||||
|
||||
|
||||
class CapeExtractor(DynamicFeatureExtractor):
|
||||
|
||||
@@ -16,14 +16,6 @@ from typing import Optional
|
||||
|
||||
|
||||
class GhidraContext:
|
||||
"""
|
||||
State holder for the Ghidra backend to avoid passing state to every function.
|
||||
|
||||
PyGhidra uses a context manager to set up the Ghidra environment (program, transaction, etc.).
|
||||
We store the relevant objects here to allow easy access throughout the extractor
|
||||
without needing to pass them as arguments to every feature extraction method.
|
||||
"""
|
||||
|
||||
def __init__(self, program, flat_api, monitor):
|
||||
self.program = program
|
||||
self.flat_api = flat_api
|
||||
|
||||
@@ -19,7 +19,6 @@ from typing import Iterator
|
||||
import capa.features.extractors.ghidra.file
|
||||
import capa.features.extractors.ghidra.insn
|
||||
import capa.features.extractors.ghidra.global_
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
import capa.features.extractors.ghidra.function
|
||||
import capa.features.extractors.ghidra.basicblock
|
||||
from capa.features.common import Feature
|
||||
@@ -37,6 +36,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
def __init__(self, ctx_manager=None, tmpdir=None):
|
||||
self.ctx_manager = ctx_manager
|
||||
self.tmpdir = tmpdir
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
super().__init__(
|
||||
SampleHashes(
|
||||
@@ -66,6 +66,8 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
weakref.finalize(self, cleanup, self.ctx_manager, self.tmpdir)
|
||||
|
||||
def get_base_address(self):
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
return AbsoluteVirtualAddress(ghidra_helpers.get_current_program().getImageBase().getOffset())
|
||||
|
||||
def extract_global_features(self):
|
||||
@@ -75,6 +77,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
yield from capa.features.extractors.ghidra.file.extract_features()
|
||||
|
||||
def get_functions(self) -> Iterator[FunctionHandle]:
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
for fhandle in ghidra_helpers.get_function_symbols():
|
||||
fh: FunctionHandle = FunctionHandle(
|
||||
@@ -86,6 +89,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
|
||||
@staticmethod
|
||||
def get_function(addr: int) -> FunctionHandle:
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
func = ghidra_helpers.get_flat_api().getFunctionContaining(ghidra_helpers.get_flat_api().toAddr(addr))
|
||||
return FunctionHandle(address=AbsoluteVirtualAddress(func.getEntryPoint().getOffset()), inner=func)
|
||||
@@ -94,6 +98,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
yield from capa.features.extractors.ghidra.function.extract_features(fh)
|
||||
|
||||
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
yield from ghidra_helpers.get_function_blocks(fh)
|
||||
|
||||
@@ -101,6 +106,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
yield from capa.features.extractors.ghidra.basicblock.extract_features(fh, bbh)
|
||||
|
||||
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
yield from ghidra_helpers.get_insn_in_range(bbh)
|
||||
|
||||
|
||||
@@ -86,11 +86,7 @@ def extract_file_embedded_pe() -> Iterator[tuple[Feature, Address]]:
|
||||
|
||||
for off, _ in find_embedded_pe(capa.features.extractors.ghidra.helpers.get_block_bytes(block), mz_xor):
|
||||
# add offset back to block start
|
||||
ea_addr = block.getStart().add(off)
|
||||
ea = ea_addr.getOffset()
|
||||
f_offset = capa.features.extractors.ghidra.helpers.get_file_offset(ea_addr)
|
||||
if f_offset != -1:
|
||||
ea = f_offset
|
||||
ea: int = block.getStart().add(off).getOffset()
|
||||
|
||||
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
|
||||
|
||||
@@ -231,3 +227,14 @@ FILE_HANDLERS = (
|
||||
extract_file_function_names,
|
||||
extract_file_format,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
""" """
|
||||
import pprint
|
||||
|
||||
pprint.pprint(list(extract_features())) # noqa: T203
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -44,7 +44,7 @@ def extract_function_loop(fh: FunctionHandle):
|
||||
dests = block.getDestinations(capa.features.extractors.ghidra.helpers.get_monitor())
|
||||
s_addrs = block.getStartAddresses()
|
||||
|
||||
while dests.hasNext():
|
||||
while dests.hasNext(): # For loop throws Python TypeError
|
||||
for addr in s_addrs:
|
||||
edges.append((addr.getOffset(), dests.next().getDestinationAddress().getOffset()))
|
||||
|
||||
@@ -61,9 +61,25 @@ def extract_recursive_call(fh: FunctionHandle):
|
||||
|
||||
|
||||
def extract_features(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
|
||||
"""extract function features"""
|
||||
for function_handler in FUNCTION_HANDLERS:
|
||||
for feature, addr in function_handler(fh):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
|
||||
|
||||
|
||||
def main():
|
||||
""" """
|
||||
features = []
|
||||
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
|
||||
features.extend(list(extract_features(fhandle)))
|
||||
|
||||
import pprint
|
||||
|
||||
pprint.pprint(features) # noqa: T203
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -62,19 +62,6 @@ def find_byte_sequence(addr: "ghidra.program.model.address.Address", seq: bytes)
|
||||
yield from eas
|
||||
|
||||
|
||||
def get_file_offset(addr: "ghidra.program.model.address.Address") -> int:
|
||||
"""get file offset for an address"""
|
||||
block = get_current_program().getMemory().getBlock(addr)
|
||||
if not block:
|
||||
return -1
|
||||
|
||||
for info in block.getSourceInfos():
|
||||
if info.contains(addr):
|
||||
return info.getFileBytesOffset(addr)
|
||||
|
||||
return -1
|
||||
|
||||
|
||||
def get_bytes(addr: "ghidra.program.model.address.Address", length: int) -> bytes:
|
||||
"""yield length bytes at addr
|
||||
|
||||
|
||||
@@ -488,3 +488,22 @@ INSTRUCTION_HANDLERS = (
|
||||
extract_function_calls_from,
|
||||
extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
""" """
|
||||
features = []
|
||||
from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor
|
||||
|
||||
for fh in GhidraFeatureExtractor().get_functions():
|
||||
for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
|
||||
for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb):
|
||||
features.extend(list(extract_features(fh, bb, insn)))
|
||||
|
||||
import pprint
|
||||
|
||||
pprint.pprint(features) # noqa: T203
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -18,7 +18,6 @@ import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.features.extractors.ida.helpers
|
||||
from capa.features.file import FunctionName
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors import loops
|
||||
@@ -51,39 +50,10 @@ def extract_recursive_call(fh: FunctionHandle):
|
||||
yield Characteristic("recursive call"), fh.address
|
||||
|
||||
|
||||
def extract_function_name(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
|
||||
ea = fh.inner.start_ea
|
||||
name = idaapi.get_name(ea)
|
||||
if name.startswith("sub_"):
|
||||
# skip default names, like "sub_401000"
|
||||
return
|
||||
|
||||
yield FunctionName(name), fh.address
|
||||
if name.startswith("_"):
|
||||
# some linkers may prefix linked routines with a `_` to avoid name collisions.
|
||||
# extract features for both the mangled and un-mangled representations.
|
||||
# e.g. `_fwrite` -> `fwrite`
|
||||
# see: https://stackoverflow.com/a/2628384/87207
|
||||
yield FunctionName(name[1:]), fh.address
|
||||
|
||||
|
||||
def extract_function_alternative_names(fh: FunctionHandle):
|
||||
"""Get all alternative names for an address."""
|
||||
|
||||
for aname in capa.features.extractors.ida.helpers.get_function_alternative_names(fh.inner.start_ea):
|
||||
yield FunctionName(aname), fh.address
|
||||
|
||||
|
||||
def extract_features(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, addr in func_handler(fh):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (
|
||||
extract_function_calls_to,
|
||||
extract_function_loop,
|
||||
extract_recursive_call,
|
||||
extract_function_name,
|
||||
extract_function_alternative_names,
|
||||
)
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
|
||||
|
||||
@@ -20,7 +20,6 @@ import idaapi
|
||||
import ida_nalt
|
||||
import idautils
|
||||
import ida_bytes
|
||||
import ida_funcs
|
||||
import ida_segment
|
||||
|
||||
from capa.features.address import AbsoluteVirtualAddress
|
||||
@@ -437,16 +436,3 @@ def is_basic_block_return(bb: idaapi.BasicBlock) -> bool:
|
||||
def has_sib(oper: idaapi.op_t) -> bool:
|
||||
# via: https://reverseengineering.stackexchange.com/a/14300
|
||||
return oper.specflag1 == 1
|
||||
|
||||
|
||||
def find_alternative_names(cmt: str):
|
||||
for line in cmt.split("\n"):
|
||||
if line.startswith("Alternative name is '") and line.endswith("'"):
|
||||
name = line[len("Alternative name is '") : -1] # Extract name between quotes
|
||||
yield name
|
||||
|
||||
|
||||
def get_function_alternative_names(fva: int):
|
||||
"""Get all alternative names for an address."""
|
||||
yield from find_alternative_names(ida_bytes.get_cmt(fva, False) or "")
|
||||
yield from find_alternative_names(ida_funcs.get_func_cmt(idaapi.get_func(fva), False) or "")
|
||||
|
||||
@@ -22,7 +22,6 @@ import idautils
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
import capa.features.extractors.ida.helpers
|
||||
from capa.features.file import FunctionName
|
||||
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
@@ -130,8 +129,8 @@ def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle)
|
||||
# not a function (start)
|
||||
return
|
||||
|
||||
name = idaapi.get_name(target_func.start_ea)
|
||||
if target_func.flags & idaapi.FUNC_LIB or not name.startswith("sub_"):
|
||||
if target_func.flags & idaapi.FUNC_LIB:
|
||||
name = idaapi.get_name(target_func.start_ea)
|
||||
yield API(name), ih.address
|
||||
if name.startswith("_"):
|
||||
# some linkers may prefix linked routines with a `_` to avoid name collisions.
|
||||
@@ -140,10 +139,6 @@ def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle)
|
||||
# see: https://stackoverflow.com/a/2628384/87207
|
||||
yield API(name[1:]), ih.address
|
||||
|
||||
for altname in capa.features.extractors.ida.helpers.get_function_alternative_names(target_func.start_ea):
|
||||
yield FunctionName(altname), ih.address
|
||||
yield API(altname), ih.address
|
||||
|
||||
|
||||
def extract_insn_number_features(
|
||||
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
|
||||
|
||||
@@ -1,75 +1,17 @@
|
||||
# capa analysis using Ghidra
|
||||
<div align="center">
|
||||
<img src="../../doc/img/ghidra_backend_logo.png" width=240 height=125>
|
||||
</div>
|
||||
|
||||
capa supports using Ghidra (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) as a feature extraction backend. This enables you to run capa against binaries using Ghidra's analysis engine.
|
||||
# capa + Ghidra
|
||||
|
||||
```bash
|
||||
$ capa -b ghidra Practical\ Malware\ Analysis\ Lab\ 01-01.exe_
|
||||
┌──────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
│ md5 │ bb7425b82141a1c0f7d60e5106676bb1 │
|
||||
│ sha1 │ │
|
||||
│ sha256 │ 58898bd42c5bd3bf9b1389f0eee5b39cd59180e8370eb9ea838a0b327bd6fe47 │
|
||||
│ analysis │ static │
|
||||
│ os │ windows │
|
||||
│ format │ pe │
|
||||
│ arch │ i386 │
|
||||
│ path │ ~/Documents/capa/tests/data/Practical Malware Analysis Lab 01-01.exe_ │
|
||||
└──────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
┃ ATT&CK Tactic ┃ ATT&CK Technique ┃
|
||||
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
|
||||
│ DISCOVERY │ File and Directory Discovery [T1083] │
|
||||
└────────────────────────────────────┴─────────────────────────────────────────────────────────────┘
|
||||
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
┃ MBC Objective ┃ MBC Behavior ┃
|
||||
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
|
||||
│ DISCOVERY │ File and Directory Discovery [E1083] │
|
||||
│ FILE SYSTEM │ Copy File [C0045] │
|
||||
│ │ Read File [C0051] │
|
||||
│ PROCESS │ Terminate Process [C0018] │
|
||||
└────────────────────────────────────┴─────────────────────────────────────────────────────────────┘
|
||||
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
┃ Capability ┃ Namespace ┃
|
||||
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
|
||||
│ copy file │ host-interaction/file-system/copy │
|
||||
│ enumerate files recursively │ host-interaction/file-system/files/list │
|
||||
│ read file via mapping (2 matches) │ host-interaction/file-system/read │
|
||||
│ terminate process (2 matches) │ host-interaction/process/terminate │
|
||||
│ resolve function by parsing PE exports │ load-code/pe │
|
||||
└────────────────────────────────────────────────┴─────────────────────────────────────────────────┘
|
||||
```
|
||||
[capa](https://github.com/mandiant/capa) is the FLARE team’s open-source tool that detects capabilities in executable files. [Ghidra](https://github.com/NationalSecurityAgency/ghidra) is an open-source software reverse engineering framework. capa + Ghidra brings capa’s detection capabilities to Ghidra using [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra).
|
||||
|
||||
## getting started
|
||||
## Prerequisites
|
||||
|
||||
### requirements
|
||||
- Ghidra >= 12.0 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
|
||||
|
||||
- [Ghidra](https://github.com/NationalSecurityAgency/ghidra) >= 12.0 must be installed and available via the `GHIDRA_INSTALL_DIR` environment variable.
|
||||
|
||||
#### standalone binary (recommended)
|
||||
|
||||
The capa [standalone binary](https://github.com/mandiant/capa/releases) is the preferred way to run capa with the Ghidra backend.
|
||||
Although the binary does not bundle the Java environment or Ghidra itself, it will dynamically load them at runtime.
|
||||
|
||||
#### python package
|
||||
|
||||
You can also use the Ghidra backend with the capa Python package by installing `flare-capa` with the `ghidra` extra.
|
||||
|
||||
```bash
|
||||
$ pip install "flare-capa[ghidra]"
|
||||
```
|
||||
|
||||
### usage
|
||||
|
||||
To use the Ghidra backend, specify it with the `-b` or `--backend` flag:
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
$ capa -b ghidra /path/to/sample
|
||||
```
|
||||
|
||||
capa will:
|
||||
1. Initialize a headless Ghidra instance.
|
||||
2. Create a temporary project.
|
||||
3. Import and analyze the sample.
|
||||
4. Extract features and match rules.
|
||||
5. Clean up the temporary project.
|
||||
|
||||
**Note:** The first time you run this, it may take a few moments to initialize the Ghidra environment.
|
||||
|
||||
@@ -40,10 +40,6 @@ def get_flat_api():
|
||||
return ghidra_context.get_context().flat_api
|
||||
|
||||
|
||||
def get_monitor():
|
||||
return ghidra_context.get_context().monitor
|
||||
|
||||
|
||||
class GHIDRAIO:
|
||||
"""
|
||||
An object that acts as a file-like object,
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
<div align="center">
|
||||
<img src="https://github.com/mandiant/capa/blob/master/doc/img/ghidra_backend_logo.png" width=240 height=125>
|
||||
</div>
|
||||
|
||||
# capa explorer for Ghidra
|
||||
|
||||
capa explorer for Ghidra brings capa’s detection capabilities directly to Ghidra’s user interface helping speed up your reverse engineering tasks by identifying what parts of a program suggest interesting behavior, such as setting a registry value. You can execute (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) the script [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/plugin/capa_explorer.py) using Ghidra’s Script Manager window to run capa’s analysis and view the results in Ghidra.
|
||||
|
||||
## ui integration
|
||||
[capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) renders capa results in Ghidra's UI to help you quickly navigate them. This includes adding matched functions to Ghidra’s Symbol Tree and Bookmarks windows and adding comments to functions that indicate matched capabilities and features. You can execute this script using Ghidra’s Script Manager window.
|
||||
|
||||
### symbol tree window
|
||||
Matched functions are added to Ghidra's Symbol Tree window under a custom namespace that maps to the capabilities' [capa namespace](https://github.com/mandiant/capa-rules/blob/master/doc/format.md#rule-namespace).
|
||||
<div align="center">
|
||||
<img src="https://github.com/mandiant/capa/assets/66766340/eeae33f4-99d4-42dc-a5e8-4c1b8c661492" width=300>
|
||||
</div>
|
||||
|
||||
### comments
|
||||
|
||||
Comments are added at the beginning of matched functions indicating matched capabilities and inline comments are added to functions indicating matched features. You can view these comments in Ghidra’s Disassembly Listing and Decompile windows.
|
||||
<div align="center">
|
||||
<img src="https://github.com/mandiant/capa/assets/66766340/bb2b4170-7fd4-45fc-8c7b-ff8f2e2f101b" width=1000>
|
||||
</div>
|
||||
|
||||
### bookmarks
|
||||
|
||||
Bookmarks are added to functions that matched a capability that is mapped to a MITRE ATT&CK and/or Malware Behavior Catalog (MBC) technique. You can view these bookmarks in Ghidra's Bookmarks window.
|
||||
<div align="center">
|
||||
<img src="https://github.com/mandiant/capa/assets/66766340/7f9a66a9-7be7-4223-91c6-4b8fc4651336" width=825>
|
||||
</div>
|
||||
|
||||
# getting started
|
||||
|
||||
## requirements
|
||||
|
||||
- [Ghidra](https://github.com/NationalSecurityAgency/ghidra) >= 12.0 must be installed.
|
||||
- [flare-capa](https://pypi.org/project/flare-capa/) >= 10.0 must be installed (virtual environment recommended) with the `ghidra` extra (e.g., `pip install "flare-capa[ghidra]"`).
|
||||
- [capa rules](https://github.com/mandiant/capa-rules) must be downloaded for the version of capa you are using.
|
||||
|
||||
## execution
|
||||
|
||||
### 1. run Ghidra with PyGhidra
|
||||
You must start Ghidra using the `pyghidraRun` script provided in the support directory of your Ghidra installation to ensure the Python environment is correctly loaded. You should execute `pyghidraRun` from within the Python environment that you used to install capa.
|
||||
|
||||
```bash
|
||||
<ghidra_install>/support/pyghidraRun
|
||||
```
|
||||
|
||||
### 2. run capa_explorer.py
|
||||
1. Open your Ghidra project and CodeBrowser.
|
||||
2. Open the Script Manager.
|
||||
3. Add [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/plugin/capa_explorer.py) to the script directories.
|
||||
4. Filter for capa and run the script.
|
||||
5. When prompted, select the directory containing the downloaded capa rules.
|
||||
@@ -1,463 +0,0 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Run capa against loaded Ghidra database and render results in Ghidra UI
|
||||
|
||||
# @author Colton Gabertan (gabertan.colton@gmail.com)
|
||||
# @category capa
|
||||
# @runtime PyGhidra
|
||||
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
from typing import Any
|
||||
|
||||
from java.util import ArrayList
|
||||
from ghidra.util import Msg
|
||||
from ghidra.app.cmd.label import AddLabelCmd, CreateNamespacesCmd
|
||||
from ghidra.util.exception import CancelledException
|
||||
from ghidra.program.flatapi import FlatProgramAPI
|
||||
from ghidra.program.model.symbol import Namespace, SourceType, SymbolType
|
||||
|
||||
import capa
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.version
|
||||
import capa.render.json
|
||||
import capa.ghidra.helpers
|
||||
import capa.capabilities.common
|
||||
import capa.features.extractors.ghidra.context
|
||||
import capa.features.extractors.ghidra.extractor
|
||||
|
||||
logger = logging.getLogger("capa_explorer")
|
||||
|
||||
|
||||
def show_monitor_message(msg):
|
||||
capa.ghidra.helpers.get_monitor().checkCanceled()
|
||||
capa.ghidra.helpers.get_monitor().setMessage(msg)
|
||||
|
||||
|
||||
def show_error(msg):
|
||||
Msg.showError(None, None, "capa explorer", msg)
|
||||
|
||||
|
||||
def show_warn(msg):
|
||||
Msg.showWarn(None, None, "capa explorer", msg)
|
||||
|
||||
|
||||
def show_info(msg):
|
||||
Msg.showInfo(None, None, "capa explorer", msg)
|
||||
|
||||
|
||||
def add_bookmark(addr, txt, category="CapaExplorer"):
|
||||
"""create bookmark at addr"""
|
||||
capa.ghidra.helpers.get_current_program().getBookmarkManager().setBookmark(addr, "Info", category, txt)
|
||||
|
||||
|
||||
def create_namespace(namespace_str):
|
||||
"""create new Ghidra namespace for each capa namespace"""
|
||||
cmd = CreateNamespacesCmd(namespace_str, SourceType.USER_DEFINED)
|
||||
cmd.applyTo(capa.ghidra.helpers.get_current_program())
|
||||
return cmd.getNamespace()
|
||||
|
||||
|
||||
def create_label(ghidra_addr, name, capa_namespace):
|
||||
"""custom label cmd to overlay symbols under capa-generated namespaces"""
|
||||
|
||||
# prevent duplicate labels under the same capa-generated namespace
|
||||
symbol_table = capa.ghidra.helpers.get_current_program().getSymbolTable()
|
||||
for sym in symbol_table.getSymbols(ghidra_addr):
|
||||
if sym.getName(True) == capa_namespace.getName(True) + Namespace.DELIMITER + name:
|
||||
return
|
||||
|
||||
# create SymbolType.LABEL at addr
|
||||
# prioritize capa-generated namespace (duplicate match @ new addr), else put under global Ghidra one (new match)
|
||||
cmd = AddLabelCmd(ghidra_addr, name, True, SourceType.USER_DEFINED)
|
||||
cmd.applyTo(capa.ghidra.helpers.get_current_program())
|
||||
|
||||
# assign new match overlay label to capa-generated namespace
|
||||
cmd.getSymbol().setNamespace(capa_namespace)
|
||||
return
|
||||
|
||||
|
||||
class CapaMatchData:
|
||||
def __init__(
|
||||
self,
|
||||
namespace,
|
||||
scope,
|
||||
capability,
|
||||
matches,
|
||||
attack: list[dict[Any, Any]],
|
||||
mbc: list[dict[Any, Any]],
|
||||
):
|
||||
self.namespace = namespace
|
||||
self.scope = scope
|
||||
self.capability = capability
|
||||
self.matches = matches
|
||||
self.attack = attack
|
||||
self.mbc = mbc
|
||||
|
||||
def bookmark_functions(self):
|
||||
"""create bookmarks for MITRE ATT&CK & MBC mappings"""
|
||||
|
||||
if self.attack == [] and self.mbc == []:
|
||||
return
|
||||
|
||||
for key in self.matches.keys():
|
||||
addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(key))
|
||||
func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(addr)
|
||||
|
||||
# bookmark & tag MITRE ATT&CK tactics & MBC @ function scope
|
||||
if func is not None:
|
||||
func_addr = func.getEntryPoint()
|
||||
|
||||
if self.attack != []:
|
||||
for item in self.attack:
|
||||
attack_txt = ""
|
||||
for part in item.get("parts", {}):
|
||||
attack_txt = attack_txt + part + Namespace.DELIMITER
|
||||
attack_txt = attack_txt + item.get("id", {})
|
||||
add_bookmark(func_addr, attack_txt, "CapaExplorer::MITRE ATT&CK")
|
||||
|
||||
if self.mbc != []:
|
||||
for item in self.mbc:
|
||||
mbc_txt = ""
|
||||
for part in item.get("parts", {}):
|
||||
mbc_txt = mbc_txt + part + Namespace.DELIMITER
|
||||
mbc_txt = mbc_txt + item.get("id", {})
|
||||
add_bookmark(func_addr, mbc_txt, "CapaExplorer::MBC")
|
||||
|
||||
def set_plate_comment(self, ghidra_addr):
|
||||
"""set plate comments at matched functions"""
|
||||
comment = capa.ghidra.helpers.get_flat_api().getPlateComment(ghidra_addr)
|
||||
rule_path = self.namespace.replace(Namespace.DELIMITER, "/")
|
||||
# 2 calls to avoid duplicate comments via subsequent script runs
|
||||
if comment is None:
|
||||
# first comment @ function
|
||||
comment = rule_path + "\n"
|
||||
capa.ghidra.helpers.get_flat_api().setPlateComment(ghidra_addr, comment)
|
||||
elif rule_path not in comment:
|
||||
comment = comment + rule_path + "\n"
|
||||
capa.ghidra.helpers.get_flat_api().setPlateComment(ghidra_addr, comment)
|
||||
else:
|
||||
return
|
||||
|
||||
def set_pre_comment(self, ghidra_addr, sub_type, description):
|
||||
"""set pre comments at subscoped matches of main rules"""
|
||||
comment = capa.ghidra.helpers.get_flat_api().getPreComment(ghidra_addr)
|
||||
if comment is None:
|
||||
comment = "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
|
||||
capa.ghidra.helpers.get_flat_api().setPreComment(ghidra_addr, comment)
|
||||
elif self.capability not in comment:
|
||||
comment = (
|
||||
comment + "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
|
||||
)
|
||||
capa.ghidra.helpers.get_flat_api().setPreComment(ghidra_addr, comment)
|
||||
else:
|
||||
return
|
||||
|
||||
def label_matches(self, do_namespaces, do_comments):
|
||||
"""label findings at function scopes and comment on subscope matches"""
|
||||
capa_namespace = None
|
||||
if do_namespaces:
|
||||
capa_namespace = create_namespace(self.namespace)
|
||||
|
||||
symbol_table = capa.ghidra.helpers.get_current_program().getSymbolTable()
|
||||
|
||||
# handle function main scope of matched rule
|
||||
# these will typically contain further matches within
|
||||
if self.scope == "function":
|
||||
for addr in self.matches.keys():
|
||||
ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(addr))
|
||||
|
||||
# classify new function label under capa-generated namespace
|
||||
if do_namespaces:
|
||||
sym = symbol_table.getPrimarySymbol(ghidra_addr)
|
||||
if sym is not None:
|
||||
if sym.getSymbolType() == SymbolType.FUNCTION:
|
||||
create_label(ghidra_addr, sym.getName(), capa_namespace)
|
||||
|
||||
if do_comments:
|
||||
self.set_plate_comment(ghidra_addr)
|
||||
|
||||
# parse the corresponding nodes, and pre-comment subscope matched features
|
||||
# under the encompassing function(s)
|
||||
for sub_match in self.matches.get(addr):
|
||||
for loc, node in sub_match.items():
|
||||
sub_ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(loc))
|
||||
if sub_ghidra_addr == ghidra_addr:
|
||||
# skip duplicates
|
||||
continue
|
||||
|
||||
# precomment subscope matches under the function
|
||||
if node != {} and do_comments:
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# resolve the encompassing function for the capa namespace
|
||||
# of non-function scoped main matches
|
||||
for addr in self.matches.keys():
|
||||
ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(addr))
|
||||
|
||||
# basic block / insn scoped main matches
|
||||
# Ex. See "Create Process on Windows" Rule
|
||||
func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(ghidra_addr)
|
||||
if func is not None:
|
||||
func_addr = func.getEntryPoint()
|
||||
if do_namespaces:
|
||||
create_label(func_addr, func.getName(), capa_namespace)
|
||||
if do_comments:
|
||||
self.set_plate_comment(func_addr)
|
||||
|
||||
# create subscope match precomments
|
||||
for sub_match in self.matches.get(addr):
|
||||
for loc, node in sub_match.items():
|
||||
sub_ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(loc))
|
||||
|
||||
if node != {}:
|
||||
if func is not None:
|
||||
# basic block/ insn scope under resolved function
|
||||
if do_comments:
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# this would be a global/file scoped main match
|
||||
# try to resolve the encompassing function via the subscope match, instead
|
||||
# Ex. "run as service" rule
|
||||
sub_func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(sub_ghidra_addr)
|
||||
if sub_func is not None:
|
||||
sub_func_addr = sub_func.getEntryPoint()
|
||||
# place function in capa namespace & create the subscope match label in Ghidra's global namespace
|
||||
if do_namespaces:
|
||||
create_label(sub_func_addr, sub_func.getName(), capa_namespace)
|
||||
if do_comments:
|
||||
self.set_plate_comment(sub_func_addr)
|
||||
|
||||
if do_comments:
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# addr is in some other file section like .data
|
||||
# represent this location with a label symbol under the capa namespace
|
||||
# Ex. See "Reference Base64 String" rule
|
||||
if do_namespaces:
|
||||
for _sub_type, _description in parse_node(node):
|
||||
# in many cases, these will be ghidra-labeled data, so just add the existing
|
||||
# label symbol to the capa namespace
|
||||
for sym in symbol_table.getSymbols(sub_ghidra_addr):
|
||||
if sym.getSymbolType() == SymbolType.LABEL:
|
||||
sym.setNamespace(capa_namespace)
|
||||
if do_comments:
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
|
||||
|
||||
def get_capabilities():
|
||||
rules_dir = ""
|
||||
|
||||
show_monitor_message(f"requesting capa {capa.version.__version__} rules directory")
|
||||
selected_dir = askDirectory(f"choose capa {capa.version.__version__} rules directory", "Ok") # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
if selected_dir:
|
||||
rules_dir = selected_dir.getPath()
|
||||
|
||||
if not rules_dir:
|
||||
raise CancelledException
|
||||
|
||||
rules_path: pathlib.Path = pathlib.Path(rules_dir)
|
||||
|
||||
show_monitor_message(f"loading rules from {rules_path}")
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
|
||||
show_monitor_message("collecting binary metadata")
|
||||
meta = capa.ghidra.helpers.collect_metadata([rules_path])
|
||||
|
||||
show_monitor_message("running capa analysis")
|
||||
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
|
||||
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True)
|
||||
|
||||
show_monitor_message("checking for static limitations")
|
||||
if capa.capabilities.common.has_static_limitation(rules, capabilities, is_standalone=False):
|
||||
show_warn(
|
||||
"capa explorer encountered warnings during analysis. Please check the console output for more information.",
|
||||
)
|
||||
|
||||
show_monitor_message("rendering results")
|
||||
return capa.render.json.render(meta, rules, capabilities.matches)
|
||||
|
||||
|
||||
def get_locations(match_dict):
|
||||
"""recursively collect match addresses and associated nodes"""
|
||||
|
||||
for loc in match_dict.get("locations", {}):
|
||||
# either an rva (absolute)
|
||||
# or an offset into a file (file)
|
||||
if loc.get("type", "") in ("absolute", "file"):
|
||||
yield loc.get("value"), match_dict.get("node")
|
||||
|
||||
for child in match_dict.get("children", {}):
|
||||
yield from get_locations(child)
|
||||
|
||||
|
||||
def parse_node(node_data):
|
||||
"""pull match descriptions and sub features by parsing node dicts"""
|
||||
|
||||
node = node_data.get(node_data.get("type"))
|
||||
|
||||
if "description" in node:
|
||||
yield "description", node.get("description")
|
||||
|
||||
data = node.get(node.get("type"))
|
||||
if isinstance(data, (str, int)):
|
||||
feat_type = node.get("type")
|
||||
if isinstance(data, int):
|
||||
data = hex(data)
|
||||
yield feat_type, data
|
||||
|
||||
|
||||
def parse_json(capa_data):
|
||||
"""Parse json produced by capa"""
|
||||
|
||||
for rule, capability in capa_data.get("rules", {}).items():
|
||||
# structure to contain rule match address & supporting feature data
|
||||
# {rule match addr:[{feature addr:{node_data}}]}
|
||||
rule_matches: dict[Any, list[Any]] = {}
|
||||
for i in range(len(capability.get("matches"))):
|
||||
# grab rule match location
|
||||
match_loc = capability.get("matches")[i][0].get("value")
|
||||
if match_loc is None:
|
||||
# Ex. See "Reference Base64 string"
|
||||
# {'type':'no address'}
|
||||
match_loc = i
|
||||
rule_matches[match_loc] = []
|
||||
|
||||
# grab extracted feature locations & corresponding node data
|
||||
# feature[0]: location
|
||||
# feature[1]: node
|
||||
features = capability.get("matches")[i][1]
|
||||
feat_dict = {}
|
||||
for feature in get_locations(features):
|
||||
feat_dict[feature[0]] = feature[1]
|
||||
rule_matches[match_loc].append(feat_dict)
|
||||
|
||||
# dict data of currently matched rule
|
||||
meta = capability["meta"]
|
||||
|
||||
# get MITRE ATT&CK and MBC
|
||||
attack = meta.get("attack")
|
||||
if attack is None:
|
||||
attack = []
|
||||
mbc = meta.get("mbc")
|
||||
if mbc is None:
|
||||
mbc = []
|
||||
|
||||
# scope match for the rule
|
||||
scope = meta["scopes"].get("static")
|
||||
|
||||
fmt_rule = Namespace.DELIMITER + rule.replace(" ", "-")
|
||||
if "namespace" in meta:
|
||||
# split into list to help define child namespaces
|
||||
# this requires the correct delimiter used by Ghidra
|
||||
# Ex. 'communication/named-pipe/create/create pipe' -> capa::communication::named-pipe::create::create-pipe
|
||||
namespace_str = Namespace.DELIMITER.join(meta["namespace"].split("/"))
|
||||
namespace = "capa_explorer" + Namespace.DELIMITER + namespace_str + fmt_rule
|
||||
else:
|
||||
# lib rules via the official rules repo will not contain data
|
||||
# for the "namespaces" key, so format using rule itself
|
||||
# Ex. 'contain loop' -> capa::lib::contain-loop
|
||||
namespace = "capa_explorer" + Namespace.DELIMITER + "lib" + fmt_rule
|
||||
|
||||
yield CapaMatchData(namespace, scope, rule, rule_matches, attack, mbc)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
choices = ["namespaces", "bookmarks", "comments"]
|
||||
# use ArrayList to resolve ambiguous askChoices overloads (List vs List, List) in PyGhidra
|
||||
choices_java = ArrayList()
|
||||
for c in choices:
|
||||
choices_java.add(c)
|
||||
|
||||
choice_labels = [
|
||||
'add "capa_explorer" namespace for matched functions',
|
||||
"add bookmarks for matched functions",
|
||||
"add comments to matched functions",
|
||||
]
|
||||
# use ArrayList to resolve ambiguous askChoices overloads (List vs List, List) in PyGhidra
|
||||
choice_labels_java = ArrayList()
|
||||
for c in choice_labels:
|
||||
choice_labels_java.add(c)
|
||||
|
||||
selected = list(askChoices("capa explorer", "select actions:", choices_java, choice_labels_java)) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
do_namespaces = "namespaces" in selected
|
||||
do_comments = "comments" in selected
|
||||
do_bookmarks = "bookmarks" in selected
|
||||
|
||||
if not any((do_namespaces, do_comments, do_bookmarks)):
|
||||
raise CancelledException("no actions selected")
|
||||
|
||||
# initialize the context for the extractor/helpers
|
||||
capa.features.extractors.ghidra.context.set_context(
|
||||
currentProgram, # type: ignore [name-defined] # noqa: F821
|
||||
FlatProgramAPI(currentProgram), # type: ignore [name-defined] # noqa: F821
|
||||
monitor, # type: ignore [name-defined] # noqa: F821
|
||||
)
|
||||
|
||||
show_monitor_message("checking supported Ghidra version")
|
||||
if not capa.ghidra.helpers.is_supported_ghidra_version():
|
||||
show_error("unsupported Ghidra version")
|
||||
return capa.main.E_UNSUPPORTED_GHIDRA_VERSION
|
||||
|
||||
show_monitor_message("checking supported file type")
|
||||
if not capa.ghidra.helpers.is_supported_file_type():
|
||||
show_error("unsupported file type")
|
||||
return capa.main.E_INVALID_FILE_TYPE
|
||||
|
||||
show_monitor_message("checking supported file architecture")
|
||||
if not capa.ghidra.helpers.is_supported_arch_type():
|
||||
show_error("unsupported file architecture")
|
||||
return capa.main.E_INVALID_FILE_ARCH
|
||||
|
||||
# capa_data will always contain {'meta':..., 'rules':...}
|
||||
# if the 'rules' key contains no values, then there were no matches
|
||||
capa_data = json.loads(get_capabilities())
|
||||
if capa_data.get("rules") is None:
|
||||
show_info("capa explorer found no matches.")
|
||||
return capa.main.E_EMPTY_REPORT
|
||||
|
||||
show_monitor_message("processing matches")
|
||||
for item in parse_json(capa_data):
|
||||
if do_bookmarks:
|
||||
show_monitor_message("adding bookmarks")
|
||||
item.bookmark_functions()
|
||||
if do_namespaces or do_comments:
|
||||
show_monitor_message("adding labels")
|
||||
item.label_matches(do_namespaces, do_comments)
|
||||
|
||||
show_info("capa explorer analysis complete.")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
if main() != 0:
|
||||
show_error(
|
||||
"capa explorer encountered errors during analysis. Please check the console output for more information.",
|
||||
)
|
||||
except CancelledException:
|
||||
show_info("capa explorer analysis cancelled.")
|
||||
@@ -96,7 +96,11 @@ def is_runtime_ida():
|
||||
|
||||
|
||||
def is_runtime_ghidra():
|
||||
return importlib.util.find_spec("ghidra") is not None
|
||||
try:
|
||||
currentProgram # type: ignore [name-defined] # noqa: F821
|
||||
except NameError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def assert_never(value) -> NoReturn:
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import io
|
||||
import os
|
||||
import logging
|
||||
import datetime
|
||||
@@ -22,13 +23,24 @@ from pathlib import Path
|
||||
from rich.console import Console
|
||||
from typing_extensions import assert_never
|
||||
|
||||
import capa.perf
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.helpers
|
||||
import capa.version
|
||||
import capa.render.json
|
||||
import capa.rules.cache
|
||||
import capa.render.default
|
||||
import capa.render.verbose
|
||||
import capa.features.common
|
||||
import capa.features.freeze as frz
|
||||
import capa.render.vverbose
|
||||
import capa.features.extractors
|
||||
import capa.render.result_document
|
||||
import capa.render.result_document as rdoc
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.base_extractor
|
||||
import capa.features.extractors.cape.extractor
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError
|
||||
@@ -167,15 +179,8 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
|
||||
except Exception as e:
|
||||
# vivisect raises raw Exception instances, and we don't want
|
||||
# to do a subclass check via isinstance.
|
||||
if type(e) is Exception and e.args:
|
||||
error_msg = str(e.args[0])
|
||||
|
||||
if "Couldn't convert rva" in error_msg:
|
||||
raise CorruptFile(error_msg) from e
|
||||
elif "Unsupported Architecture" in error_msg:
|
||||
# Extract architecture number if available
|
||||
arch_info = e.args[1] if len(e.args) > 1 else "unknown"
|
||||
raise CorruptFile(f"Unsupported architecture: {arch_info}") from e
|
||||
if type(e) is Exception and "Couldn't convert rva" in e.args[0]:
|
||||
raise CorruptFile(e.args[0]) from e
|
||||
raise
|
||||
|
||||
viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
|
||||
@@ -334,24 +339,12 @@ def get_extractor(
|
||||
import capa.features.extractors.ida.extractor
|
||||
|
||||
logger.debug("idalib: opening database...")
|
||||
idapro.enable_console_messages(False)
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
# we set the primary and secondary Lumina servers to 0.0.0.0 to disable Lumina,
|
||||
# which sometimes provides bad names, including overwriting names from debug info.
|
||||
#
|
||||
# use -R to load resources, which can help us embedded PE files.
|
||||
#
|
||||
# return values from open_database:
|
||||
# 0 - Success
|
||||
# 2 - User cancelled or 32-64 bit conversion failed
|
||||
# 4 - Database initialization failed
|
||||
# -1 - Generic errors (database already open, auto-analysis failed, etc.)
|
||||
# -2 - User cancelled operation
|
||||
ret = idapro.open_database(
|
||||
str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R"
|
||||
)
|
||||
if ret != 0:
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
# idalib writes to stdout (ugh), so we have to capture that
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(input_path), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
|
||||
12
capa/main.py
12
capa/main.py
@@ -1107,26 +1107,14 @@ def ida_main():
|
||||
|
||||
|
||||
def ghidra_main():
|
||||
from ghidra.program.flatapi import FlatProgramAPI
|
||||
|
||||
import capa.rules
|
||||
import capa.ghidra.helpers
|
||||
import capa.render.default
|
||||
import capa.features.extractors.ghidra.context
|
||||
import capa.features.extractors.ghidra.extractor
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
# These are provided by the Ghidra scripting environment
|
||||
# but are not available when running standard python
|
||||
# so we have to ignore the linting errors
|
||||
program = currentProgram # type: ignore [name-defined] # noqa: F821
|
||||
monitor_ = monitor # type: ignore [name-defined] # noqa: F821
|
||||
flat_api = FlatProgramAPI(program)
|
||||
|
||||
capa.features.extractors.ghidra.context.set_context(program, flat_api, monitor_)
|
||||
|
||||
logger.debug("-" * 80)
|
||||
logger.debug(" Using default embedded rules.")
|
||||
logger.debug(" ")
|
||||
|
||||
@@ -167,9 +167,7 @@ class CompoundStatementType:
|
||||
AND = "and"
|
||||
OR = "or"
|
||||
NOT = "not"
|
||||
NOT = "not"
|
||||
OPTIONAL = "optional"
|
||||
SEQUENCE = "sequence"
|
||||
|
||||
|
||||
class StatementModel(FrozenModel): ...
|
||||
@@ -215,7 +213,7 @@ class StatementNode(FrozenModel):
|
||||
|
||||
|
||||
def statement_from_capa(node: capa.engine.Statement) -> Statement:
|
||||
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
|
||||
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)):
|
||||
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)
|
||||
|
||||
elif isinstance(node, capa.engine.Some):
|
||||
@@ -282,9 +280,6 @@ def node_to_capa(
|
||||
elif node.statement.type == CompoundStatementType.OPTIONAL:
|
||||
return capa.engine.Some(description=node.statement.description, count=0, children=children)
|
||||
|
||||
elif node.statement.type == CompoundStatementType.SEQUENCE:
|
||||
return capa.engine.Sequence(description=node.statement.description, children=children)
|
||||
|
||||
else:
|
||||
assert_never(node.statement.type)
|
||||
|
||||
|
||||
@@ -635,8 +635,6 @@ def build_statements(d, scopes: Scopes):
|
||||
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "or":
|
||||
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "sequence":
|
||||
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
|
||||
elif key == "not":
|
||||
if len(d[key]) != 1:
|
||||
raise InvalidRule("not statement must have exactly one child statement")
|
||||
@@ -1700,7 +1698,7 @@ class RuleSet:
|
||||
# feature is found N times
|
||||
return rec(rule_name, node.child)
|
||||
|
||||
elif isinstance(node, (ceng.And, ceng.Sequence)):
|
||||
elif isinstance(node, ceng.And):
|
||||
# When evaluating an AND block, all of the children need to match.
|
||||
#
|
||||
# So when we index rules, we want to pick the most uncommon feature(s)
|
||||
|
||||
BIN
doc/img/ghidra_headless_analyzer.png
Normal file
BIN
doc/img/ghidra_headless_analyzer.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 210 KiB |
BIN
doc/img/ghidra_script_mngr_output.png
Executable file
BIN
doc/img/ghidra_script_mngr_output.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 108 KiB |
BIN
doc/img/ghidra_script_mngr_rules.png
Executable file
BIN
doc/img/ghidra_script_mngr_rules.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 110 KiB |
BIN
doc/img/ghidra_script_mngr_verbosity.png
Executable file
BIN
doc/img/ghidra_script_mngr_verbosity.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 79 KiB |
@@ -79,6 +79,7 @@ dependencies = [
|
||||
"ruamel.yaml>=0.18",
|
||||
"pefile>=2023.2.7",
|
||||
"pyelftools>=0.31",
|
||||
"pyghidra>=3.0.0",
|
||||
"pydantic>=2",
|
||||
"rich>=13",
|
||||
"humanize>=4",
|
||||
@@ -109,13 +110,6 @@ dependencies = [
|
||||
]
|
||||
dynamic = ["version"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
filterwarnings = [
|
||||
"ignore:builtin type SwigPyPacked has no __module__ attribute:DeprecationWarning",
|
||||
"ignore:builtin type SwigPyObject has no __module__ attribute:DeprecationWarning",
|
||||
"ignore:builtin type swigvarlink has no __module__ attribute:DeprecationWarning",
|
||||
]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "capa.version.__version__"}
|
||||
|
||||
@@ -130,57 +124,54 @@ dev = [
|
||||
# These dependencies are not used in production environments
|
||||
# and should not conflict with other libraries/tooling.
|
||||
"pre-commit==4.5.0",
|
||||
"pytest==9.0.2",
|
||||
"pytest==8.0.0",
|
||||
"pytest-sugar==1.1.1",
|
||||
"pytest-instafail==0.5.0",
|
||||
"flake8==7.3.0",
|
||||
"flake8-bugbear==25.11.29",
|
||||
"flake8-bugbear==25.10.21",
|
||||
"flake8-encodings==0.5.1",
|
||||
"flake8-comprehensions==3.17.0",
|
||||
"flake8-logging-format==0.9.0",
|
||||
"flake8-no-implicit-concat==0.3.5",
|
||||
"flake8-print==5.0.0",
|
||||
"flake8-todos==0.3.1",
|
||||
"flake8-simplify==0.30.0",
|
||||
"flake8-simplify==0.22.0",
|
||||
"flake8-use-pathlib==0.3.0",
|
||||
"flake8-copyright==0.2.4",
|
||||
"ruff==0.14.7",
|
||||
"black==25.12.0",
|
||||
"isort==7.0.0",
|
||||
"mypy==1.19.1",
|
||||
"mypy-protobuf==4.0.0",
|
||||
"PyGithub==2.8.1",
|
||||
"black==25.11.0",
|
||||
"isort==6.0.0",
|
||||
"mypy==1.17.1",
|
||||
"mypy-protobuf==3.6.0",
|
||||
"PyGithub==2.6.0",
|
||||
"bump-my-version==1.2.4",
|
||||
# type stubs for mypy
|
||||
"types-backports==0.1.3",
|
||||
"types-colorama==0.4.15.11",
|
||||
"types-PyYAML==6.0.8",
|
||||
"types-psutil==7.2.0.20251228",
|
||||
"types-psutil==7.0.0.20250218",
|
||||
"types_requests==2.32.0.20240712",
|
||||
"types-protobuf==6.32.1.20250918",
|
||||
"deptry==0.24.0"
|
||||
"deptry==0.23.0"
|
||||
]
|
||||
build = [
|
||||
# Dev and build dependencies are not relaxed because
|
||||
# we want all developer environments to be consistent.
|
||||
# These dependencies are not used in production environments
|
||||
# and should not conflict with other libraries/tooling.
|
||||
"pyinstaller==6.17.0",
|
||||
"pyinstaller==6.16.0",
|
||||
"setuptools==80.9.0",
|
||||
"build==1.4.0"
|
||||
"build==1.3.0"
|
||||
]
|
||||
scripts = [
|
||||
# can (optionally) be more lenient on dependencies here
|
||||
# see comment on dependencies for more context
|
||||
"jschema_to_python==1.2.3",
|
||||
"psutil==7.2.1",
|
||||
"psutil==7.1.2",
|
||||
"stix2==3.0.1",
|
||||
"sarif_om==1.0.4",
|
||||
"requests>=2.32.4",
|
||||
]
|
||||
ghidra = [
|
||||
"pyghidra>=3.0.0",
|
||||
]
|
||||
|
||||
[tool.deptry]
|
||||
extend_exclude = [
|
||||
|
||||
@@ -12,10 +12,10 @@ cxxfilt==0.3.0
|
||||
dncil==1.0.2
|
||||
dnfile==0.17.0
|
||||
funcy==2.0
|
||||
humanize==4.15.0
|
||||
humanize==4.14.0
|
||||
ida-netnode==3.0
|
||||
ida-settings==3.2.2
|
||||
intervaltree==3.2.1
|
||||
intervaltree==3.1.0
|
||||
markdown-it-py==4.0.0
|
||||
mdurl==0.1.2
|
||||
msgpack==1.0.8
|
||||
@@ -38,12 +38,12 @@ pyghidra==3.0.0
|
||||
python-flirt==0.9.2
|
||||
pyyaml==6.0.2
|
||||
rich==14.2.0
|
||||
ruamel-yaml==0.19.1
|
||||
ruamel-yaml==0.18.6
|
||||
ruamel-yaml-clib==0.2.14
|
||||
setuptools==80.9.0
|
||||
six==1.17.0
|
||||
sortedcontainers==2.4.0
|
||||
viv-utils==0.8.0
|
||||
vivisect==1.2.1
|
||||
msgspec==0.20.0
|
||||
msgspec==0.19.0
|
||||
bump-my-version==1.2.4
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 6a0d506713...6120dfb6e0
Submodule tests/data updated: 689960a966...cfca4022ee
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import contextlib
|
||||
import collections
|
||||
from pathlib import Path
|
||||
@@ -20,7 +20,7 @@ from functools import lru_cache
|
||||
|
||||
import pytest
|
||||
|
||||
import capa.loader
|
||||
import capa.main
|
||||
import capa.features.file
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
@@ -53,7 +53,6 @@ from capa.features.extractors.base_extractor import (
|
||||
)
|
||||
from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
CD = Path(__file__).resolve().parent
|
||||
DOTNET_DIR = CD / "data" / "dotnet"
|
||||
DNFILE_TESTFILES = DOTNET_DIR / "dnfile-testfiles"
|
||||
@@ -201,73 +200,6 @@ def get_binja_extractor(path: Path):
|
||||
return extractor
|
||||
|
||||
|
||||
# we can't easily cache this because the extractor relies on global state (the opened database)
|
||||
# which also has to be closed elsewhere. so, the idalib tests will just take a little bit to run.
|
||||
def get_idalib_extractor(path: Path):
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idapro
|
||||
import ida_auto
|
||||
|
||||
import capa.features.extractors.ida.extractor
|
||||
|
||||
logger.debug("idalib: opening database...")
|
||||
|
||||
idapro.enable_console_messages(False)
|
||||
|
||||
# we set the primary and secondary Lumina servers to 0.0.0.0 to disable Lumina,
|
||||
# which sometimes provides bad names, including overwriting names from debug info.
|
||||
#
|
||||
# use -R to load resources, which can help us embedded PE files.
|
||||
#
|
||||
# return values from open_database:
|
||||
# 0 - Success
|
||||
# 2 - User cancelled or 32-64 bit conversion failed
|
||||
# 4 - Database initialization failed
|
||||
# -1 - Generic errors (database already open, auto-analysis failed, etc.)
|
||||
# -2 - User cancelled operation
|
||||
ret = idapro.open_database(
|
||||
str(path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R"
|
||||
)
|
||||
if ret != 0:
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
|
||||
fixup_idalib(path, extractor)
|
||||
return extractor
|
||||
|
||||
|
||||
def fixup_idalib(path: Path, extractor):
|
||||
"""
|
||||
IDA fixups to overcome differences between backends
|
||||
"""
|
||||
import idaapi
|
||||
import ida_funcs
|
||||
|
||||
def remove_library_id_flag(fva):
|
||||
f = idaapi.get_func(fva)
|
||||
f.flags &= ~ida_funcs.FUNC_LIB
|
||||
ida_funcs.update_func(f)
|
||||
|
||||
if "kernel32-64" in path.name:
|
||||
# remove (correct) library function id, so we can test x64 thunk
|
||||
remove_library_id_flag(0x1800202B0)
|
||||
|
||||
if "al-khaser_x64" in path.name:
|
||||
# remove (correct) library function id, so we can test x64 nested thunk
|
||||
remove_library_id_flag(0x14004B4F0)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_cape_extractor(path):
|
||||
from capa.helpers import load_json_from_path
|
||||
@@ -982,8 +914,20 @@ FEATURE_PRESENCE_TESTS = sorted(
|
||||
("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), False),
|
||||
("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True),
|
||||
# insn/api: x64
|
||||
(
|
||||
"kernel32-64",
|
||||
"function=0x180001010",
|
||||
capa.features.insn.API("RtlVirtualUnwind"),
|
||||
True,
|
||||
),
|
||||
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True),
|
||||
# insn/api: x64 thunk
|
||||
(
|
||||
"kernel32-64",
|
||||
"function=0x1800202B0",
|
||||
capa.features.insn.API("RtlCaptureContext"),
|
||||
True,
|
||||
),
|
||||
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True),
|
||||
# insn/api: x64 nested thunk
|
||||
("al-khaser x64", "function=0x14004B4F0", capa.features.insn.API("__vcrt_GetModuleHandle"), True),
|
||||
@@ -1071,20 +1015,20 @@ FEATURE_PRESENCE_TESTS = sorted(
|
||||
("pma16-01", "file", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "file", OS(OS_LINUX), False),
|
||||
("mimikatz", "file", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "function=0x401100", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "function=0x401100,bb=0x401130", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "function=0x404356", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "function=0x404356,bb=0x4043B9", OS(OS_WINDOWS), True),
|
||||
("mimikatz", "function=0x40105D", OS(OS_WINDOWS), True),
|
||||
("pma16-01", "file", Arch(ARCH_I386), True),
|
||||
("pma16-01", "file", Arch(ARCH_AMD64), False),
|
||||
("mimikatz", "file", Arch(ARCH_I386), True),
|
||||
("pma16-01", "function=0x401100", Arch(ARCH_I386), True),
|
||||
("pma16-01", "function=0x401100,bb=0x401130", Arch(ARCH_I386), True),
|
||||
("pma16-01", "function=0x404356", Arch(ARCH_I386), True),
|
||||
("pma16-01", "function=0x404356,bb=0x4043B9", Arch(ARCH_I386), True),
|
||||
("mimikatz", "function=0x40105D", Arch(ARCH_I386), True),
|
||||
("pma16-01", "file", Format(FORMAT_PE), True),
|
||||
("pma16-01", "file", Format(FORMAT_ELF), False),
|
||||
("mimikatz", "file", Format(FORMAT_PE), True),
|
||||
# format is also a global feature
|
||||
("pma16-01", "function=0x401100", Format(FORMAT_PE), True),
|
||||
("pma16-01", "function=0x404356", Format(FORMAT_PE), True),
|
||||
("mimikatz", "function=0x456BB9", Format(FORMAT_PE), True),
|
||||
# elf support
|
||||
("7351f.elf", "file", OS(OS_LINUX), True),
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import capa.features.address
|
||||
from capa.engine import Or, And, Not, Some, Range, Sequence
|
||||
from capa.engine import Or, And, Not, Some, Range
|
||||
from capa.features.insn import Number
|
||||
|
||||
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
|
||||
@@ -155,145 +155,3 @@ def test_eval_order():
|
||||
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)
|
||||
|
||||
|
||||
def test_sequence():
|
||||
# 1 before 2
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
|
||||
# 2 before 1 (fail)
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
|
||||
# 1 same as 2 (fail)
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False
|
||||
|
||||
# 1 before 2 before 3
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# 1 before 2 before 3 (fail, 3 is early)
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
# 1 before 2 before 3 (fail, 2 is late)
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
# multiple locations for matches
|
||||
# 1 at 1, 2 at 2 (match)
|
||||
# 1 also at 3
|
||||
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True
|
||||
|
||||
# greedy matching?
|
||||
# 1 at 2, 2 at 3
|
||||
# 1 matches at 2, so min_loc becomes 2.
|
||||
# 2 matches at 3, > 2. Match.
|
||||
# But wait, 1 also matches at 4.
|
||||
# If we picked 4, 1 > 2 would fail? No.
|
||||
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).
|
||||
|
||||
# CASE:
|
||||
# 1 matches at 10.
|
||||
# 2 matches at 5 and 15.
|
||||
# if 2 picks 5, 5 > 10 is False.
|
||||
# if 2 picks 15, 15 > 10 is True. Match.
|
||||
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2)]).evaluate(
|
||||
{
|
||||
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
|
||||
Number(2): {
|
||||
capa.features.address.AbsoluteVirtualAddress(5),
|
||||
capa.features.address.AbsoluteVirtualAddress(15),
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# CASE:
|
||||
# 1 matches at 10 and 20.
|
||||
# 2 matches at 15.
|
||||
# 1 should pick 10. 10 < 15. Match.
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2)]).evaluate(
|
||||
{
|
||||
Number(1): {
|
||||
capa.features.address.AbsoluteVirtualAddress(10),
|
||||
capa.features.address.AbsoluteVirtualAddress(20),
|
||||
},
|
||||
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
|
||||
}
|
||||
)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
# CASE:
|
||||
# 1 matched at 10.
|
||||
# 2 matched at 15.
|
||||
# 3 matched at 12.
|
||||
# 1 -> 10.
|
||||
# 2 -> 15 (> 10).
|
||||
# 3 -> 12 (not > 15).
|
||||
# Fail.
|
||||
assert (
|
||||
bool(
|
||||
Sequence([Number(1), Number(2), Number(3)]).evaluate(
|
||||
{
|
||||
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
|
||||
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
|
||||
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
|
||||
}
|
||||
)
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_location_propagation():
|
||||
# regression tests for issue where Or/And/Some statements
|
||||
# failed to propagate match locations to their results,
|
||||
# causing Sequence evaluation to fail.
|
||||
|
||||
# Or
|
||||
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1
|
||||
} # short_circuit=True returns first match
|
||||
assert Or([Number(1), Number(2)]).evaluate(
|
||||
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
|
||||
).locations == {ADDR1, ADDR2}
|
||||
|
||||
# And
|
||||
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}
|
||||
|
||||
# Some
|
||||
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
|
||||
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1
|
||||
} # short_circuit=True returns first sufficient set
|
||||
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
|
||||
ADDR1,
|
||||
ADDR2,
|
||||
}
|
||||
|
||||
@@ -26,17 +26,10 @@ ghidra_present = importlib.util.find_spec("pyghidra") is not None and "GHIDRA_IN
|
||||
@fixtures.parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
[
|
||||
(
|
||||
pytest.param(
|
||||
*t,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="specific to Vivisect and basic blocks do not align with Ghidra's analysis"
|
||||
),
|
||||
)
|
||||
if t[0] == "294b8d..." and t[2] == capa.features.common.String("\r\n\x00:ht")
|
||||
else t
|
||||
)
|
||||
t
|
||||
for t in fixtures.FEATURE_PRESENCE_TESTS
|
||||
# this test case is specific to Vivisect and its basic blocks do not align with Ghidra's analysis
|
||||
if t[0] != "294b8d..." or t[2] != capa.features.common.String("\r\n\x00:ht")
|
||||
],
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
# Copyright 2020 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import fixtures
|
||||
|
||||
import capa.features.extractors.ida.idalib
|
||||
from capa.features.file import FunctionName
|
||||
from capa.features.insn import API
|
||||
from capa.features.common import Characteristic
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
idalib_present = capa.features.extractors.ida.idalib.has_idalib()
|
||||
if idalib_present:
|
||||
try:
|
||||
import idapro # noqa: F401 [imported but unused]
|
||||
import ida_kernwin
|
||||
|
||||
kernel_version: str = ida_kernwin.get_kernel_version()
|
||||
except ImportError:
|
||||
idalib_present = False
|
||||
kernel_version = "0.0"
|
||||
|
||||
|
||||
@pytest.mark.skipif(idalib_present is False, reason="Skip idalib tests if the idalib Python API is not installed")
|
||||
@fixtures.parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_SYMTAB_FUNC_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_idalib_features(sample: Path, scope, feature, expected):
|
||||
if kernel_version in {"9.0", "9.1"} and sample.name.startswith("2bf18d"):
|
||||
if isinstance(feature, (API, FunctionName)) and feature.value == "__libc_connect":
|
||||
# see discussion here: https://github.com/mandiant/capa/pull/2742#issuecomment-3674146335
|
||||
#
|
||||
# > i confirmed that there were changes in 9.2 related to the ELF loader handling names,
|
||||
# > so I think its reasonable to conclude that 9.1 and older had a bug that
|
||||
# > prevented this name from surfacing.
|
||||
pytest.xfail(f"IDA {kernel_version} does not extract all ELF symbols")
|
||||
|
||||
if kernel_version in {"9.0"} and sample.name.startswith("Practical Malware Analysis Lab 12-04.exe_"):
|
||||
if isinstance(feature, Characteristic) and feature.value == "embedded pe":
|
||||
# see discussion here: https://github.com/mandiant/capa/pull/2742#issuecomment-3667086165
|
||||
#
|
||||
# idalib for IDA 9.0 doesn't support argv arguments, so we can't ask that resources are loaded
|
||||
pytest.xfail("idalib 9.0 does not support loading resource segments")
|
||||
|
||||
try:
|
||||
fixtures.do_test_feature_presence(fixtures.get_idalib_extractor, sample, scope, feature, expected)
|
||||
finally:
|
||||
logger.debug("closing database...")
|
||||
import idapro
|
||||
|
||||
idapro.close_database(save=False)
|
||||
logger.debug("closed database.")
|
||||
|
||||
|
||||
@pytest.mark.skipif(idalib_present is False, reason="Skip idalib tests if the idalib Python API is not installed")
|
||||
@fixtures.parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_idalib_feature_counts(sample, scope, feature, expected):
|
||||
try:
|
||||
fixtures.do_test_feature_count(fixtures.get_idalib_extractor, sample, scope, feature, expected)
|
||||
finally:
|
||||
logger.debug("closing database...")
|
||||
import idapro
|
||||
|
||||
idapro.close_database(save=False)
|
||||
logger.debug("closed database.")
|
||||
@@ -80,28 +80,6 @@ def test_rule_yaml():
|
||||
assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True
|
||||
|
||||
|
||||
def test_rule_yaml_sequence():
|
||||
rule = textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test rule
|
||||
scopes:
|
||||
static: function
|
||||
dynamic: process
|
||||
features:
|
||||
- sequence:
|
||||
- number: 1
|
||||
- number: 2
|
||||
"""
|
||||
)
|
||||
r = capa.rules.Rule.from_yaml(rule)
|
||||
# 1 before 2 -> Match
|
||||
assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
|
||||
# 2 before 1 -> No match
|
||||
assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
|
||||
|
||||
|
||||
def test_rule_yaml_complex():
|
||||
rule = textwrap.dedent(
|
||||
"""
|
||||
@@ -1675,70 +1653,3 @@ def test_circular_dependency():
|
||||
]
|
||||
with pytest.raises(capa.rules.InvalidRule):
|
||||
list(capa.rules.get_rules_and_dependencies(rules, rules[0].name))
|
||||
|
||||
|
||||
def test_rule_yaml_sequence_with_subscope():
|
||||
# This test mimics the dynamic analysis flow to verify Sequence with subscopes.
|
||||
rule_yaml = textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test sequence subscope
|
||||
scopes:
|
||||
static: function
|
||||
dynamic: span of calls
|
||||
features:
|
||||
- sequence:
|
||||
- call:
|
||||
- number: 1
|
||||
- number: 2
|
||||
"""
|
||||
)
|
||||
# 1. Load rules (triggers subscope extraction)
|
||||
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)])
|
||||
|
||||
# 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls)
|
||||
call_rules = rules.rules_by_scope[capa.rules.Scope.CALL]
|
||||
span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS]
|
||||
assert len(call_rules) == 1
|
||||
assert len(span_rules) == 1
|
||||
|
||||
main_rule = span_rules[0]
|
||||
subscope_rule = call_rules[0]
|
||||
|
||||
# 3. Simulate features
|
||||
# Call 1: Number(1) -> Matches subscope rule
|
||||
# Call 2: Number(2) -> Matches second part of sequence
|
||||
|
||||
# Address setup
|
||||
thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1)
|
||||
call1_addr = capa.features.address.DynamicCallAddress(thread, 1)
|
||||
call2_addr = capa.features.address.DynamicCallAddress(thread, 2)
|
||||
|
||||
features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}}
|
||||
|
||||
# 4. Match Call Scope Rules (Simulate find_call_capabilities)
|
||||
# Match subscope rule against Call 1
|
||||
# We need to filter features to just Call 1 for this rule?
|
||||
# Actually, RuleSet.match takes features.
|
||||
|
||||
# Match at Call 1
|
||||
_, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr)
|
||||
# Should match subscope rule
|
||||
assert subscope_rule.name in matches1
|
||||
|
||||
# Index the match
|
||||
capa.engine.index_rule_matches(features, subscope_rule, [call1_addr])
|
||||
|
||||
# 5. Match Span Scope Rules (Simulate find_span_capabilities)
|
||||
# Now features contains MatchedRule(subscope_rule).
|
||||
# Sequence should see:
|
||||
# - call: matches subscope_rule at call1_addr
|
||||
# - number: 2 at call2_addr
|
||||
# call1_addr (id=1) < call2_addr (id=2). Sequence matches.
|
||||
|
||||
_, matches_span = rules.match(
|
||||
capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr
|
||||
) # addr doesn't matter much for span match logic itself, but passed to result
|
||||
|
||||
assert main_rule.name in matches_span
|
||||
|
||||
Reference in New Issue
Block a user