Compare commits

..

7 Commits

Author SHA1 Message Date
Mike Hunhoff
0ba5f9664a remove deprecated APIs 2025-12-10 21:20:56 +00:00
Mike Hunhoff
98873c8570 support Ghidra v12 2025-12-10 19:58:21 +00:00
Mike Hunhoff
3687bb95e9 fix black errors 2025-12-09 00:43:44 +00:00
Mike Hunhoff
7175714f9e Update capa/features/extractors/ghidra/helpers.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-12-08 17:37:04 -07:00
Mike Hunhoff
32c201d9b1 update CHANGELOG and PyGhidra version requirements 2025-12-09 00:36:10 +00:00
Mike Hunhoff
784e0346d9 merge upstream 2025-12-09 00:24:41 +00:00
Mike Hunhoff
be1ccb0776 ghidra: init commit switch to PyGhidra 2025-12-09 00:24:03 +00:00
39 changed files with 148 additions and 1336 deletions

View File

@@ -74,7 +74,6 @@ a = Analysis(
# only be installed locally.
"binaryninja",
"ida",
"ghidra",
# remove once https://github.com/mandiant/capa/issues/2681 has
# been addressed by PyInstaller
"pkg_resources",

View File

@@ -28,11 +28,6 @@ jobs:
artifact_name: capa
asset_name: linux
python_version: '3.10'
# for Ghidra
java-version: '21'
ghidra-version: '12.0'
public-version: 'PUBLIC_20251205'
ghidra-sha256: 'af43e8cfb2fa4490cf6020c3a2bde25c159d83f45236a0542688a024e8fc1941'
- os: ubuntu-22.04-arm
artifact_name: capa
asset_name: linux-arm64
@@ -111,24 +106,6 @@ jobs:
run: |
7z e "tests/data/dynamic/cape/v2.2/d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json.gz"
dist/capa -d "d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json"
- name: Set up Java ${{ matrix.java-version }}
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0
with:
distribution: 'temurin'
java-version: ${{ matrix.java-version }}
- name: Install Ghidra ${{ matrix.ghidra-version }}
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
run: |
mkdir ./.github/ghidra
wget "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${{ matrix.ghidra-version }}_build/ghidra_${{ matrix.ghidra-version }}_${{ matrix.public-version }}.zip" -O ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip
echo "${{ matrix.ghidra-sha256 }} ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip" | sha256sum -c -
unzip .github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip -d .github/ghidra/
- name: Does it run (Ghidra)?
if: matrix.os == 'ubuntu-22.04' && matrix.python_version == '3.10'
env:
GHIDRA_INSTALL_DIR: ${{ github.workspace }}/.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC
run: dist/capa -b ghidra -d "tests/data/Practical Malware Analysis Lab 01-01.dll_"
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
with:
name: ${{ matrix.asset_name }}

View File

@@ -174,8 +174,7 @@ jobs:
python-version: ["3.10", "3.13"]
java-version: ["21"]
ghidra-version: ["12.0"]
public-version: ["PUBLIC_20251205"] # for ghidra releases
ghidra-sha256: ['af43e8cfb2fa4490cf6020c3a2bde25c159d83f45236a0542688a024e8fc1941']
public-version: ["PUBLIC_20251205"] # for ghidra releases
steps:
- name: Checkout capa with submodules
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
@@ -194,66 +193,14 @@ jobs:
run: |
mkdir ./.github/ghidra
wget "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${{ matrix.ghidra-version }}_build/ghidra_${{ matrix.ghidra-version }}_${{ matrix.public-version }}.zip" -O ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip
echo "${{ matrix.ghidra-sha256 }} ./.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip" | sha256sum -c -
unzip .github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC.zip -d .github/ghidra/
- name: Install pyyaml
run: sudo apt-get install -y libyaml-dev
- name: Install capa with Ghidra extra
- name: Install capa
run: |
pip install -e .[dev,ghidra]
pip install -e .[dev]
- name: Run tests
env:
GHIDRA_INSTALL_DIR: ${{ github.workspace }}/.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC
run: pytest -v tests/test_ghidra_features.py
idalib-tests:
name: IDA ${{ matrix.ida.version }} tests for ${{ matrix.python-version }}
runs-on: ubuntu-22.04
needs: [tests]
env:
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.13"]
ida:
- version: 9.0
slug: "release/9.0/ida-essential/ida-essential_90_x64linux.run"
- version: 9.1
slug: "release/9.1/ida-essential/ida-essential_91_x64linux.run"
- version: 9.2
slug: "release/9.2/ida-essential/ida-essential_92_x64linux.run"
steps:
- name: Checkout capa with submodules
# do only run if IDA_LICENSE_ID is available, have to do this in every step, see https://github.com/orgs/community/discussions/26726#discussioncomment-3253118
if: ${{ env.IDA_LICENSE_ID != 0 }}
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
submodules: recursive
- name: Set up Python ${{ matrix.python-version }}
if: ${{ env.IDA_LICENSE_ID != 0 }}
uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: ${{ matrix.python-version }}
- name: Setup uv
if: ${{ env.IDA_LICENSE_ID != 0 }}
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7.2.0
- name: Install dependencies
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: sudo apt-get install -y libyaml-dev
- name: Install capa
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: |
pip install -r requirements.txt
pip install -e .[dev,scripts]
pip install idapro
- name: Install IDA ${{ matrix.ida.version }}
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: |
uv run hcli --disable-updates ida install --download-id ${{ matrix.ida.slug }} --license-id ${{ secrets.IDA_LICENSE_ID }} --set-default --yes
env:
HCLI_API_KEY: ${{ secrets.HCLI_API_KEY }}
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
- name: Run tests
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: pytest -v tests/test_idalib_features.py # explicitly refer to the idalib tests for performance. other tests run above.

View File

@@ -138,7 +138,6 @@ repos:
- "--ignore=tests/test_ghidra_features.py"
- "--ignore=tests/test_ida_features.py"
- "--ignore=tests/test_viv_features.py"
- "--ignore=tests/test_idalib_features.py"
- "--ignore=tests/test_main.py"
- "--ignore=tests/test_scripts.py"
always_run: true

View File

@@ -8,18 +8,16 @@
### Breaking Changes
### New Rules (5)
### New Rules (4)
- nursery/run-as-nodejs-native-module mehunhoff@google.com
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_io still@teamt5.org
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_timer still@teamt5.org
- nursery/inject-shellcode-using-thread-pool-work-insertion-with-tp_work still@teamt5.org
- data-manipulation/encryption/hc-256/encrypt-data-using-hc-256 wballenthin@hex-rays.com
-
### Bug Fixes
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
- loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800
### capa Explorer Web
@@ -58,7 +56,6 @@ Additionally a Binary Ninja bug has been fixed. Released binaries now include AR
### New Features
- ci: add support for arm64 binary releases
- tests: run tests against IDA via idalib @williballenthin #2742
### Breaking Changes

View File

@@ -291,17 +291,11 @@ It also uses your local changes to the .idb to extract better features, such as
![capa + IDA Pro integration](https://github.com/mandiant/capa/blob/master/doc/img/explorer_expanded.png)
# Ghidra integration
capa supports using Ghidra (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) as a feature extraction backend. This allows you to run capa against binaries using Ghidra's analysis engine.
You can run and view capa results in the Ghidra UI using [capa explorer for Ghidra](https://github.com/mandiant/capa/tree/master/capa/ghidra/plugin).
If you use Ghidra, then you can use the [capa + Ghidra integration](/capa/ghidra/) to run capa's analysis directly on your Ghidra database and render the results in Ghidra's user interface.
<img src="https://github.com/mandiant/capa/assets/66766340/eeae33f4-99d4-42dc-a5e8-4c1b8c661492" width=300>
You can also run capa from the command line using the [Ghidra backend](https://github.com/mandiant/capa/tree/master/capa/ghidra).
# blog posts
- [Riding Dragons: capa Harnesses Ghidra](https://www.mandiant.com/resources/blog/capa-harnesses-ghidra)
- [Dynamic capa: Exploring Executable Run-Time Behavior with the CAPE Sandbox](https://www.mandiant.com/resources/blog/dynamic-capa-executable-behavior-cape-sandbox)
- [capa v4: casting a wider .NET](https://www.mandiant.com/resources/blog/capa-v4-casting-wider-net) (.NET support)
- [ELFant in the Room capa v3](https://www.mandiant.com/resources/elfant-in-the-room-capa-v3) (ELF support)

View File

@@ -122,18 +122,11 @@ class And(Statement):
# short circuit
return Result(False, self, results)
locations = set()
for res in results:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
return Result(True, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = all(results)
locations = set()
if success:
for res in results:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
return Result(success, self, results)
class Or(Statement):
@@ -160,17 +153,13 @@ class Or(Statement):
results.append(result)
if result:
# short circuit as soon as we hit one match
return Result(True, self, results, locations=result.locations)
return Result(True, self, results)
return Result(False, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = any(results)
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
return Result(success, self, results)
class Not(Statement):
@@ -218,11 +207,7 @@ class Some(Statement):
if satisfied_children_count >= self.count:
# short circuit as soon as we hit the threshold
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
return Result(True, self, results)
return Result(False, self, results)
else:
@@ -232,12 +217,7 @@ class Some(Statement):
#
# we can't use `if child is True` because the instance is not True.
success = sum([1 for child in results if bool(child) is True]) >= self.count
locations = set()
if success:
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
return Result(success, self, results)
class Range(Statement):
@@ -319,75 +299,6 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
features[capa.features.common.MatchedRule(namespace)].update(locations)
class Sequence(Statement):
"""
match if the children evaluate to True in increasing order of location.
the order of evaluation is dictated by the property
`Sequence.children` (type: list[Statement|Feature]).
"""
def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.sequence"] += 1
results = []
min_location = None
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if not result:
# all children must match
return Result(False, self, results)
# Check for location ordering
# We want to find *some* location in the child's locations that is greater than
# the minimum location from the previous child.
#
# If this is the first child, we just take its minimum location.
# The child might match at multiple locations.
# We need to be careful to pick a location that allows subsequent children to match.
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
# This maximizes the "room" for subsequent children.
valid_locations = sorted(result.locations)
if not valid_locations:
# This should effectively never happen if `result.success` is True,
# unless the feature has no associated location (e.g. global features).
# If a feature has no location, we can't enforce order, so strict sequence fails?
# OR we assume it "matches anywhere" and doesn't constrain order?
#
# For now, let's assume valid locations are required for sequence logic.
# If a child has no locations, it fails the sequence constraint.
return Result(False, self, results)
if min_location is None:
min_location = valid_locations[0]
# Filter result to only include this location
results[-1] = Result(True, child, result.children, locations={min_location})
else:
# Find the first location that is strictly greater than min_location
found = False
for loc in valid_locations:
if loc > min_location:
min_location = loc
found = True
results[-1] = Result(True, child, result.children, locations={min_location})
break
if not found:
return Result(False, self, results)
return Result(True, self, results, locations={next(iter(r.locations)) for r in results})
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
"""
match the given rules against the given features,

View File

@@ -35,7 +35,7 @@ from capa.features.extractors.base_extractor import (
logger = logging.getLogger(__name__)
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE", "2.5-CAPE"}
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE"}
class CapeExtractor(DynamicFeatureExtractor):

View File

@@ -16,14 +16,6 @@ from typing import Optional
class GhidraContext:
"""
State holder for the Ghidra backend to avoid passing state to every function.
PyGhidra uses a context manager to set up the Ghidra environment (program, transaction, etc.).
We store the relevant objects here to allow easy access throughout the extractor
without needing to pass them as arguments to every feature extraction method.
"""
def __init__(self, program, flat_api, monitor):
self.program = program
self.flat_api = flat_api

View File

@@ -19,7 +19,6 @@ from typing import Iterator
import capa.features.extractors.ghidra.file
import capa.features.extractors.ghidra.insn
import capa.features.extractors.ghidra.global_
import capa.features.extractors.ghidra.helpers as ghidra_helpers
import capa.features.extractors.ghidra.function
import capa.features.extractors.ghidra.basicblock
from capa.features.common import Feature
@@ -37,6 +36,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
def __init__(self, ctx_manager=None, tmpdir=None):
self.ctx_manager = ctx_manager
self.tmpdir = tmpdir
import capa.features.extractors.ghidra.helpers as ghidra_helpers
super().__init__(
SampleHashes(
@@ -66,6 +66,8 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
weakref.finalize(self, cleanup, self.ctx_manager, self.tmpdir)
def get_base_address(self):
import capa.features.extractors.ghidra.helpers as ghidra_helpers
return AbsoluteVirtualAddress(ghidra_helpers.get_current_program().getImageBase().getOffset())
def extract_global_features(self):
@@ -75,6 +77,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
yield from capa.features.extractors.ghidra.file.extract_features()
def get_functions(self) -> Iterator[FunctionHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
for fhandle in ghidra_helpers.get_function_symbols():
fh: FunctionHandle = FunctionHandle(
@@ -86,6 +89,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
@staticmethod
def get_function(addr: int) -> FunctionHandle:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
func = ghidra_helpers.get_flat_api().getFunctionContaining(ghidra_helpers.get_flat_api().toAddr(addr))
return FunctionHandle(address=AbsoluteVirtualAddress(func.getEntryPoint().getOffset()), inner=func)
@@ -94,6 +98,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
yield from capa.features.extractors.ghidra.function.extract_features(fh)
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_function_blocks(fh)
@@ -101,6 +106,7 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
yield from capa.features.extractors.ghidra.basicblock.extract_features(fh, bbh)
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_insn_in_range(bbh)

View File

@@ -86,11 +86,7 @@ def extract_file_embedded_pe() -> Iterator[tuple[Feature, Address]]:
for off, _ in find_embedded_pe(capa.features.extractors.ghidra.helpers.get_block_bytes(block), mz_xor):
# add offset back to block start
ea_addr = block.getStart().add(off)
ea = ea_addr.getOffset()
f_offset = capa.features.extractors.ghidra.helpers.get_file_offset(ea_addr)
if f_offset != -1:
ea = f_offset
ea: int = block.getStart().add(off).getOffset()
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
@@ -231,3 +227,14 @@ FILE_HANDLERS = (
extract_file_function_names,
extract_file_format,
)
def main():
""" """
import pprint
pprint.pprint(list(extract_features())) # noqa: T203
if __name__ == "__main__":
main()

View File

@@ -44,7 +44,7 @@ def extract_function_loop(fh: FunctionHandle):
dests = block.getDestinations(capa.features.extractors.ghidra.helpers.get_monitor())
s_addrs = block.getStartAddresses()
while dests.hasNext():
while dests.hasNext(): # For loop throws Python TypeError
for addr in s_addrs:
edges.append((addr.getOffset(), dests.next().getDestinationAddress().getOffset()))
@@ -61,9 +61,25 @@ def extract_recursive_call(fh: FunctionHandle):
def extract_features(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
"""extract function features"""
for function_handler in FUNCTION_HANDLERS:
for feature, addr in function_handler(fh):
yield feature, addr
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
def main():
""" """
features = []
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
features.extend(list(extract_features(fhandle)))
import pprint
pprint.pprint(features) # noqa: T203
if __name__ == "__main__":
main()

View File

@@ -62,19 +62,6 @@ def find_byte_sequence(addr: "ghidra.program.model.address.Address", seq: bytes)
yield from eas
def get_file_offset(addr: "ghidra.program.model.address.Address") -> int:
"""get file offset for an address"""
block = get_current_program().getMemory().getBlock(addr)
if not block:
return -1
for info in block.getSourceInfos():
if info.contains(addr):
return info.getFileBytesOffset(addr)
return -1
def get_bytes(addr: "ghidra.program.model.address.Address", length: int) -> bytes:
"""yield length bytes at addr

View File

@@ -488,3 +488,22 @@ INSTRUCTION_HANDLERS = (
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
)
def main():
""" """
features = []
from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor
for fh in GhidraFeatureExtractor().get_functions():
for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb):
features.extend(list(extract_features(fh, bb, insn)))
import pprint
pprint.pprint(features) # noqa: T203
if __name__ == "__main__":
main()

View File

@@ -18,7 +18,6 @@ import idaapi
import idautils
import capa.features.extractors.ida.helpers
from capa.features.file import FunctionName
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors import loops
@@ -51,39 +50,10 @@ def extract_recursive_call(fh: FunctionHandle):
yield Characteristic("recursive call"), fh.address
def extract_function_name(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
ea = fh.inner.start_ea
name = idaapi.get_name(ea)
if name.startswith("sub_"):
# skip default names, like "sub_401000"
return
yield FunctionName(name), fh.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), fh.address
def extract_function_alternative_names(fh: FunctionHandle):
"""Get all alternative names for an address."""
for aname in capa.features.extractors.ida.helpers.get_function_alternative_names(fh.inner.start_ea):
yield FunctionName(aname), fh.address
def extract_features(fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
for func_handler in FUNCTION_HANDLERS:
for feature, addr in func_handler(fh):
yield feature, addr
FUNCTION_HANDLERS = (
extract_function_calls_to,
extract_function_loop,
extract_recursive_call,
extract_function_name,
extract_function_alternative_names,
)
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)

View File

@@ -20,7 +20,6 @@ import idaapi
import ida_nalt
import idautils
import ida_bytes
import ida_funcs
import ida_segment
from capa.features.address import AbsoluteVirtualAddress
@@ -437,16 +436,3 @@ def is_basic_block_return(bb: idaapi.BasicBlock) -> bool:
def has_sib(oper: idaapi.op_t) -> bool:
# via: https://reverseengineering.stackexchange.com/a/14300
return oper.specflag1 == 1
def find_alternative_names(cmt: str):
for line in cmt.split("\n"):
if line.startswith("Alternative name is '") and line.endswith("'"):
name = line[len("Alternative name is '") : -1] # Extract name between quotes
yield name
def get_function_alternative_names(fva: int):
"""Get all alternative names for an address."""
yield from find_alternative_names(ida_bytes.get_cmt(fva, False) or "")
yield from find_alternative_names(ida_funcs.get_func_cmt(idaapi.get_func(fva), False) or "")

View File

@@ -22,7 +22,6 @@ import idautils
import capa.features.extractors.helpers
import capa.features.extractors.ida.helpers
from capa.features.file import FunctionName
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
@@ -130,8 +129,8 @@ def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle)
# not a function (start)
return
name = idaapi.get_name(target_func.start_ea)
if target_func.flags & idaapi.FUNC_LIB or not name.startswith("sub_"):
if target_func.flags & idaapi.FUNC_LIB:
name = idaapi.get_name(target_func.start_ea)
yield API(name), ih.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
@@ -140,10 +139,6 @@ def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle)
# see: https://stackoverflow.com/a/2628384/87207
yield API(name[1:]), ih.address
for altname in capa.features.extractors.ida.helpers.get_function_alternative_names(target_func.start_ea):
yield FunctionName(altname), ih.address
yield API(altname), ih.address
def extract_insn_number_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle

View File

@@ -1,75 +1,17 @@
# capa analysis using Ghidra
<div align="center">
<img src="../../doc/img/ghidra_backend_logo.png" width=240 height=125>
</div>
capa supports using Ghidra (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) as a feature extraction backend. This enables you to run capa against binaries using Ghidra's analysis engine.
# capa + Ghidra
```bash
$ capa -b ghidra Practical\ Malware\ Analysis\ Lab\ 01-01.exe_
┌──────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ md5 │ bb7425b82141a1c0f7d60e5106676bb1 │
│ sha1 │ │
│ sha256 │ 58898bd42c5bd3bf9b1389f0eee5b39cd59180e8370eb9ea838a0b327bd6fe47 │
│ analysis │ static │
│ os │ windows │
│ format │ pe │
│ arch │ i386 │
│ path │ ~/Documents/capa/tests/data/Practical Malware Analysis Lab 01-01.exe_ │
└──────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ATT&CK Tactic ┃ ATT&CK Technique ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ DISCOVERY │ File and Directory Discovery [T1083]
└────────────────────────────────────┴─────────────────────────────────────────────────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ MBC Objective ┃ MBC Behavior ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ DISCOVERY │ File and Directory Discovery [E1083]
│ FILE SYSTEM │ Copy File [C0045]
│ │ Read File [C0051]
│ PROCESS │ Terminate Process [C0018]
└────────────────────────────────────┴─────────────────────────────────────────────────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Capability ┃ Namespace ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ copy file │ host-interaction/file-system/copy │
│ enumerate files recursively │ host-interaction/file-system/files/list │
read file via mapping (2 matches) │ host-interaction/file-system/read │
│ terminate process (2 matches) │ host-interaction/process/terminate │
│ resolve function by parsing PE exports │ load-code/pe │
└────────────────────────────────────────────────┴─────────────────────────────────────────────────┘
```
[capa](https://github.com/mandiant/capa) is the FLARE teams open-source tool that detects capabilities in executable files. [Ghidra](https://github.com/NationalSecurityAgency/ghidra) is an open-source software reverse engineering framework. capa + Ghidra brings capas detection capabilities to Ghidra using [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra).
## getting started
## Prerequisites
### requirements
- Ghidra >= 12.0 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
- [Ghidra](https://github.com/NationalSecurityAgency/ghidra) >= 12.0 must be installed and available via the `GHIDRA_INSTALL_DIR` environment variable.
#### standalone binary (recommended)
The capa [standalone binary](https://github.com/mandiant/capa/releases) is the preferred way to run capa with the Ghidra backend.
Although the binary does not bundle the Java environment or Ghidra itself, it will dynamically load them at runtime.
#### python package
You can also use the Ghidra backend with the capa Python package by installing `flare-capa` with the `ghidra` extra.
```bash
$ pip install "flare-capa[ghidra]"
```
### usage
To use the Ghidra backend, specify it with the `-b` or `--backend` flag:
## Usage
```bash
$ capa -b ghidra /path/to/sample
```
capa will:
1. Initialize a headless Ghidra instance.
2. Create a temporary project.
3. Import and analyze the sample.
4. Extract features and match rules.
5. Clean up the temporary project.
**Note:** The first time you run this, it may take a few moments to initialize the Ghidra environment.

View File

@@ -40,10 +40,6 @@ def get_flat_api():
return ghidra_context.get_context().flat_api
def get_monitor():
return ghidra_context.get_context().monitor
class GHIDRAIO:
"""
An object that acts as a file-like object,

View File

@@ -1,54 +0,0 @@
<div align="center">
<img src="https://github.com/mandiant/capa/blob/master/doc/img/ghidra_backend_logo.png" width=240 height=125>
</div>
# capa explorer for Ghidra
capa explorer for Ghidra brings capas detection capabilities directly to Ghidras user interface helping speed up your reverse engineering tasks by identifying what parts of a program suggest interesting behavior, such as setting a registry value. You can execute (via [PyGhidra](https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra)) the script [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/plugin/capa_explorer.py) using Ghidras Script Manager window to run capas analysis and view the results in Ghidra.
## ui integration
[capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) renders capa results in Ghidra's UI to help you quickly navigate them. This includes adding matched functions to Ghidras Symbol Tree and Bookmarks windows and adding comments to functions that indicate matched capabilities and features. You can execute this script using Ghidras Script Manager window.
### symbol tree window
Matched functions are added to Ghidra's Symbol Tree window under a custom namespace that maps to the capabilities' [capa namespace](https://github.com/mandiant/capa-rules/blob/master/doc/format.md#rule-namespace).
<div align="center">
<img src="https://github.com/mandiant/capa/assets/66766340/eeae33f4-99d4-42dc-a5e8-4c1b8c661492" width=300>
</div>
### comments
Comments are added at the beginning of matched functions indicating matched capabilities and inline comments are added to functions indicating matched features. You can view these comments in Ghidras Disassembly Listing and Decompile windows.
<div align="center">
<img src="https://github.com/mandiant/capa/assets/66766340/bb2b4170-7fd4-45fc-8c7b-ff8f2e2f101b" width=1000>
</div>
### bookmarks
Bookmarks are added to functions that matched a capability that is mapped to a MITRE ATT&CK and/or Malware Behavior Catalog (MBC) technique. You can view these bookmarks in Ghidra's Bookmarks window.
<div align="center">
<img src="https://github.com/mandiant/capa/assets/66766340/7f9a66a9-7be7-4223-91c6-4b8fc4651336" width=825>
</div>
# getting started
## requirements
- [Ghidra](https://github.com/NationalSecurityAgency/ghidra) >= 12.0 must be installed.
- [flare-capa](https://pypi.org/project/flare-capa/) >= 10.0 must be installed (virtual environment recommended) with the `ghidra` extra (e.g., `pip install "flare-capa[ghidra]"`).
- [capa rules](https://github.com/mandiant/capa-rules) must be downloaded for the version of capa you are using.
## execution
### 1. run Ghidra with PyGhidra
You must start Ghidra using the `pyghidraRun` script provided in the support directory of your Ghidra installation to ensure the Python environment is correctly loaded. You should execute `pyghidraRun` from within the Python environment that you used to install capa.
```bash
<ghidra_install>/support/pyghidraRun
```
### 2. run capa_explorer.py
1. Open your Ghidra project and CodeBrowser.
2. Open the Script Manager.
3. Add [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/plugin/capa_explorer.py) to the script directories.
4. Filter for capa and run the script.
5. When prompted, select the directory containing the downloaded capa rules.

View File

@@ -1,463 +0,0 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Run capa against loaded Ghidra database and render results in Ghidra UI
# @author Colton Gabertan (gabertan.colton@gmail.com)
# @category capa
# @runtime PyGhidra
import json
import logging
import pathlib
from typing import Any
from java.util import ArrayList
from ghidra.util import Msg
from ghidra.app.cmd.label import AddLabelCmd, CreateNamespacesCmd
from ghidra.util.exception import CancelledException
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.symbol import Namespace, SourceType, SymbolType
import capa
import capa.main
import capa.rules
import capa.version
import capa.render.json
import capa.ghidra.helpers
import capa.capabilities.common
import capa.features.extractors.ghidra.context
import capa.features.extractors.ghidra.extractor
logger = logging.getLogger("capa_explorer")
def show_monitor_message(msg):
capa.ghidra.helpers.get_monitor().checkCanceled()
capa.ghidra.helpers.get_monitor().setMessage(msg)
def show_error(msg):
Msg.showError(None, None, "capa explorer", msg)
def show_warn(msg):
Msg.showWarn(None, None, "capa explorer", msg)
def show_info(msg):
Msg.showInfo(None, None, "capa explorer", msg)
def add_bookmark(addr, txt, category="CapaExplorer"):
"""create bookmark at addr"""
capa.ghidra.helpers.get_current_program().getBookmarkManager().setBookmark(addr, "Info", category, txt)
def create_namespace(namespace_str):
"""create new Ghidra namespace for each capa namespace"""
cmd = CreateNamespacesCmd(namespace_str, SourceType.USER_DEFINED)
cmd.applyTo(capa.ghidra.helpers.get_current_program())
return cmd.getNamespace()
def create_label(ghidra_addr, name, capa_namespace):
"""custom label cmd to overlay symbols under capa-generated namespaces"""
# prevent duplicate labels under the same capa-generated namespace
symbol_table = capa.ghidra.helpers.get_current_program().getSymbolTable()
for sym in symbol_table.getSymbols(ghidra_addr):
if sym.getName(True) == capa_namespace.getName(True) + Namespace.DELIMITER + name:
return
# create SymbolType.LABEL at addr
# prioritize capa-generated namespace (duplicate match @ new addr), else put under global Ghidra one (new match)
cmd = AddLabelCmd(ghidra_addr, name, True, SourceType.USER_DEFINED)
cmd.applyTo(capa.ghidra.helpers.get_current_program())
# assign new match overlay label to capa-generated namespace
cmd.getSymbol().setNamespace(capa_namespace)
return
class CapaMatchData:
def __init__(
self,
namespace,
scope,
capability,
matches,
attack: list[dict[Any, Any]],
mbc: list[dict[Any, Any]],
):
self.namespace = namespace
self.scope = scope
self.capability = capability
self.matches = matches
self.attack = attack
self.mbc = mbc
def bookmark_functions(self):
"""create bookmarks for MITRE ATT&CK & MBC mappings"""
if self.attack == [] and self.mbc == []:
return
for key in self.matches.keys():
addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(key))
func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(addr)
# bookmark & tag MITRE ATT&CK tactics & MBC @ function scope
if func is not None:
func_addr = func.getEntryPoint()
if self.attack != []:
for item in self.attack:
attack_txt = ""
for part in item.get("parts", {}):
attack_txt = attack_txt + part + Namespace.DELIMITER
attack_txt = attack_txt + item.get("id", {})
add_bookmark(func_addr, attack_txt, "CapaExplorer::MITRE ATT&CK")
if self.mbc != []:
for item in self.mbc:
mbc_txt = ""
for part in item.get("parts", {}):
mbc_txt = mbc_txt + part + Namespace.DELIMITER
mbc_txt = mbc_txt + item.get("id", {})
add_bookmark(func_addr, mbc_txt, "CapaExplorer::MBC")
def set_plate_comment(self, ghidra_addr):
"""set plate comments at matched functions"""
comment = capa.ghidra.helpers.get_flat_api().getPlateComment(ghidra_addr)
rule_path = self.namespace.replace(Namespace.DELIMITER, "/")
# 2 calls to avoid duplicate comments via subsequent script runs
if comment is None:
# first comment @ function
comment = rule_path + "\n"
capa.ghidra.helpers.get_flat_api().setPlateComment(ghidra_addr, comment)
elif rule_path not in comment:
comment = comment + rule_path + "\n"
capa.ghidra.helpers.get_flat_api().setPlateComment(ghidra_addr, comment)
else:
return
def set_pre_comment(self, ghidra_addr, sub_type, description):
"""set pre comments at subscoped matches of main rules"""
comment = capa.ghidra.helpers.get_flat_api().getPreComment(ghidra_addr)
if comment is None:
comment = "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
capa.ghidra.helpers.get_flat_api().setPreComment(ghidra_addr, comment)
elif self.capability not in comment:
comment = (
comment + "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
)
capa.ghidra.helpers.get_flat_api().setPreComment(ghidra_addr, comment)
else:
return
def label_matches(self, do_namespaces, do_comments):
"""label findings at function scopes and comment on subscope matches"""
capa_namespace = None
if do_namespaces:
capa_namespace = create_namespace(self.namespace)
symbol_table = capa.ghidra.helpers.get_current_program().getSymbolTable()
# handle function main scope of matched rule
# these will typically contain further matches within
if self.scope == "function":
for addr in self.matches.keys():
ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(addr))
# classify new function label under capa-generated namespace
if do_namespaces:
sym = symbol_table.getPrimarySymbol(ghidra_addr)
if sym is not None:
if sym.getSymbolType() == SymbolType.FUNCTION:
create_label(ghidra_addr, sym.getName(), capa_namespace)
if do_comments:
self.set_plate_comment(ghidra_addr)
# parse the corresponding nodes, and pre-comment subscope matched features
# under the encompassing function(s)
for sub_match in self.matches.get(addr):
for loc, node in sub_match.items():
sub_ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(loc))
if sub_ghidra_addr == ghidra_addr:
# skip duplicates
continue
# precomment subscope matches under the function
if node != {} and do_comments:
for sub_type, description in parse_node(node):
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
else:
# resolve the encompassing function for the capa namespace
# of non-function scoped main matches
for addr in self.matches.keys():
ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(addr))
# basic block / insn scoped main matches
# Ex. See "Create Process on Windows" Rule
func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(ghidra_addr)
if func is not None:
func_addr = func.getEntryPoint()
if do_namespaces:
create_label(func_addr, func.getName(), capa_namespace)
if do_comments:
self.set_plate_comment(func_addr)
# create subscope match precomments
for sub_match in self.matches.get(addr):
for loc, node in sub_match.items():
sub_ghidra_addr = capa.ghidra.helpers.get_flat_api().toAddr(hex(loc))
if node != {}:
if func is not None:
# basic block/ insn scope under resolved function
if do_comments:
for sub_type, description in parse_node(node):
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
else:
# this would be a global/file scoped main match
# try to resolve the encompassing function via the subscope match, instead
# Ex. "run as service" rule
sub_func = capa.ghidra.helpers.get_flat_api().getFunctionContaining(sub_ghidra_addr)
if sub_func is not None:
sub_func_addr = sub_func.getEntryPoint()
# place function in capa namespace & create the subscope match label in Ghidra's global namespace
if do_namespaces:
create_label(sub_func_addr, sub_func.getName(), capa_namespace)
if do_comments:
self.set_plate_comment(sub_func_addr)
if do_comments:
for sub_type, description in parse_node(node):
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
else:
# addr is in some other file section like .data
# represent this location with a label symbol under the capa namespace
# Ex. See "Reference Base64 String" rule
if do_namespaces:
for _sub_type, _description in parse_node(node):
# in many cases, these will be ghidra-labeled data, so just add the existing
# label symbol to the capa namespace
for sym in symbol_table.getSymbols(sub_ghidra_addr):
if sym.getSymbolType() == SymbolType.LABEL:
sym.setNamespace(capa_namespace)
if do_comments:
for sub_type, description in parse_node(node):
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
def get_capabilities():
rules_dir = ""
show_monitor_message(f"requesting capa {capa.version.__version__} rules directory")
selected_dir = askDirectory(f"choose capa {capa.version.__version__} rules directory", "Ok") # type: ignore [name-defined] # noqa: F821
if selected_dir:
rules_dir = selected_dir.getPath()
if not rules_dir:
raise CancelledException
rules_path: pathlib.Path = pathlib.Path(rules_dir)
show_monitor_message(f"loading rules from {rules_path}")
rules = capa.rules.get_rules([rules_path])
show_monitor_message("collecting binary metadata")
meta = capa.ghidra.helpers.collect_metadata([rules_path])
show_monitor_message("running capa analysis")
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True)
show_monitor_message("checking for static limitations")
if capa.capabilities.common.has_static_limitation(rules, capabilities, is_standalone=False):
show_warn(
"capa explorer encountered warnings during analysis. Please check the console output for more information.",
)
show_monitor_message("rendering results")
return capa.render.json.render(meta, rules, capabilities.matches)
def get_locations(match_dict):
"""recursively collect match addresses and associated nodes"""
for loc in match_dict.get("locations", {}):
# either an rva (absolute)
# or an offset into a file (file)
if loc.get("type", "") in ("absolute", "file"):
yield loc.get("value"), match_dict.get("node")
for child in match_dict.get("children", {}):
yield from get_locations(child)
def parse_node(node_data):
"""pull match descriptions and sub features by parsing node dicts"""
node = node_data.get(node_data.get("type"))
if "description" in node:
yield "description", node.get("description")
data = node.get(node.get("type"))
if isinstance(data, (str, int)):
feat_type = node.get("type")
if isinstance(data, int):
data = hex(data)
yield feat_type, data
def parse_json(capa_data):
"""Parse json produced by capa"""
for rule, capability in capa_data.get("rules", {}).items():
# structure to contain rule match address & supporting feature data
# {rule match addr:[{feature addr:{node_data}}]}
rule_matches: dict[Any, list[Any]] = {}
for i in range(len(capability.get("matches"))):
# grab rule match location
match_loc = capability.get("matches")[i][0].get("value")
if match_loc is None:
# Ex. See "Reference Base64 string"
# {'type':'no address'}
match_loc = i
rule_matches[match_loc] = []
# grab extracted feature locations & corresponding node data
# feature[0]: location
# feature[1]: node
features = capability.get("matches")[i][1]
feat_dict = {}
for feature in get_locations(features):
feat_dict[feature[0]] = feature[1]
rule_matches[match_loc].append(feat_dict)
# dict data of currently matched rule
meta = capability["meta"]
# get MITRE ATT&CK and MBC
attack = meta.get("attack")
if attack is None:
attack = []
mbc = meta.get("mbc")
if mbc is None:
mbc = []
# scope match for the rule
scope = meta["scopes"].get("static")
fmt_rule = Namespace.DELIMITER + rule.replace(" ", "-")
if "namespace" in meta:
# split into list to help define child namespaces
# this requires the correct delimiter used by Ghidra
# Ex. 'communication/named-pipe/create/create pipe' -> capa::communication::named-pipe::create::create-pipe
namespace_str = Namespace.DELIMITER.join(meta["namespace"].split("/"))
namespace = "capa_explorer" + Namespace.DELIMITER + namespace_str + fmt_rule
else:
# lib rules via the official rules repo will not contain data
# for the "namespaces" key, so format using rule itself
# Ex. 'contain loop' -> capa::lib::contain-loop
namespace = "capa_explorer" + Namespace.DELIMITER + "lib" + fmt_rule
yield CapaMatchData(namespace, scope, rule, rule_matches, attack, mbc)
def main():
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
choices = ["namespaces", "bookmarks", "comments"]
# use ArrayList to resolve ambiguous askChoices overloads (List vs List, List) in PyGhidra
choices_java = ArrayList()
for c in choices:
choices_java.add(c)
choice_labels = [
'add "capa_explorer" namespace for matched functions',
"add bookmarks for matched functions",
"add comments to matched functions",
]
# use ArrayList to resolve ambiguous askChoices overloads (List vs List, List) in PyGhidra
choice_labels_java = ArrayList()
for c in choice_labels:
choice_labels_java.add(c)
selected = list(askChoices("capa explorer", "select actions:", choices_java, choice_labels_java)) # type: ignore [name-defined] # noqa: F821
do_namespaces = "namespaces" in selected
do_comments = "comments" in selected
do_bookmarks = "bookmarks" in selected
if not any((do_namespaces, do_comments, do_bookmarks)):
raise CancelledException("no actions selected")
# initialize the context for the extractor/helpers
capa.features.extractors.ghidra.context.set_context(
currentProgram, # type: ignore [name-defined] # noqa: F821
FlatProgramAPI(currentProgram), # type: ignore [name-defined] # noqa: F821
monitor, # type: ignore [name-defined] # noqa: F821
)
show_monitor_message("checking supported Ghidra version")
if not capa.ghidra.helpers.is_supported_ghidra_version():
show_error("unsupported Ghidra version")
return capa.main.E_UNSUPPORTED_GHIDRA_VERSION
show_monitor_message("checking supported file type")
if not capa.ghidra.helpers.is_supported_file_type():
show_error("unsupported file type")
return capa.main.E_INVALID_FILE_TYPE
show_monitor_message("checking supported file architecture")
if not capa.ghidra.helpers.is_supported_arch_type():
show_error("unsupported file architecture")
return capa.main.E_INVALID_FILE_ARCH
# capa_data will always contain {'meta':..., 'rules':...}
# if the 'rules' key contains no values, then there were no matches
capa_data = json.loads(get_capabilities())
if capa_data.get("rules") is None:
show_info("capa explorer found no matches.")
return capa.main.E_EMPTY_REPORT
show_monitor_message("processing matches")
for item in parse_json(capa_data):
if do_bookmarks:
show_monitor_message("adding bookmarks")
item.bookmark_functions()
if do_namespaces or do_comments:
show_monitor_message("adding labels")
item.label_matches(do_namespaces, do_comments)
show_info("capa explorer analysis complete.")
return 0
if __name__ == "__main__":
try:
if main() != 0:
show_error(
"capa explorer encountered errors during analysis. Please check the console output for more information.",
)
except CancelledException:
show_info("capa explorer analysis cancelled.")

View File

@@ -96,7 +96,11 @@ def is_runtime_ida():
def is_runtime_ghidra():
return importlib.util.find_spec("ghidra") is not None
try:
currentProgram # type: ignore [name-defined] # noqa: F821
except NameError:
return False
return True
def assert_never(value) -> NoReturn:

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import logging
import datetime
@@ -22,13 +23,24 @@ from pathlib import Path
from rich.console import Console
from typing_extensions import assert_never
import capa.perf
import capa.rules
import capa.engine
import capa.helpers
import capa.version
import capa.render.json
import capa.rules.cache
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze as frz
import capa.render.vverbose
import capa.features.extractors
import capa.render.result_document
import capa.render.result_document as rdoc
import capa.features.extractors.common
import capa.features.extractors.base_extractor
import capa.features.extractors.cape.extractor
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError
@@ -167,15 +179,8 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
except Exception as e:
# vivisect raises raw Exception instances, and we don't want
# to do a subclass check via isinstance.
if type(e) is Exception and e.args:
error_msg = str(e.args[0])
if "Couldn't convert rva" in error_msg:
raise CorruptFile(error_msg) from e
elif "Unsupported Architecture" in error_msg:
# Extract architecture number if available
arch_info = e.args[1] if len(e.args) > 1 else "unknown"
raise CorruptFile(f"Unsupported architecture: {arch_info}") from e
if type(e) is Exception and "Couldn't convert rva" in e.args[0]:
raise CorruptFile(e.args[0]) from e
raise
viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
@@ -334,24 +339,12 @@ def get_extractor(
import capa.features.extractors.ida.extractor
logger.debug("idalib: opening database...")
idapro.enable_console_messages(False)
with console.status("analyzing program...", spinner="dots"):
# we set the primary and secondary Lumina servers to 0.0.0.0 to disable Lumina,
# which sometimes provides bad names, including overwriting names from debug info.
#
# use -R to load resources, which can help us embedded PE files.
#
# return values from open_database:
# 0 - Success
# 2 - User cancelled or 32-64 bit conversion failed
# 4 - Database initialization failed
# -1 - Generic errors (database already open, auto-analysis failed, etc.)
# -2 - User cancelled operation
ret = idapro.open_database(
str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R"
)
if ret != 0:
raise RuntimeError("failed to analyze input file")
# idalib writes to stdout (ugh), so we have to capture that
# so as not to screw up structured output.
with capa.helpers.stdout_redirector(io.BytesIO()):
with console.status("analyzing program...", spinner="dots"):
if idapro.open_database(str(input_path), run_auto_analysis=True):
raise RuntimeError("failed to analyze input file")
logger.debug("idalib: waiting for analysis...")
ida_auto.auto_wait()

View File

@@ -1107,26 +1107,14 @@ def ida_main():
def ghidra_main():
from ghidra.program.flatapi import FlatProgramAPI
import capa.rules
import capa.ghidra.helpers
import capa.render.default
import capa.features.extractors.ghidra.context
import capa.features.extractors.ghidra.extractor
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# These are provided by the Ghidra scripting environment
# but are not available when running standard python
# so we have to ignore the linting errors
program = currentProgram # type: ignore [name-defined] # noqa: F821
monitor_ = monitor # type: ignore [name-defined] # noqa: F821
flat_api = FlatProgramAPI(program)
capa.features.extractors.ghidra.context.set_context(program, flat_api, monitor_)
logger.debug("-" * 80)
logger.debug(" Using default embedded rules.")
logger.debug(" ")

View File

@@ -167,9 +167,7 @@ class CompoundStatementType:
AND = "and"
OR = "or"
NOT = "not"
NOT = "not"
OPTIONAL = "optional"
SEQUENCE = "sequence"
class StatementModel(FrozenModel): ...
@@ -215,7 +213,7 @@ class StatementNode(FrozenModel):
def statement_from_capa(node: capa.engine.Statement) -> Statement:
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)):
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)
elif isinstance(node, capa.engine.Some):
@@ -282,9 +280,6 @@ def node_to_capa(
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)
elif node.statement.type == CompoundStatementType.SEQUENCE:
return capa.engine.Sequence(description=node.statement.description, children=children)
else:
assert_never(node.statement.type)

View File

@@ -635,8 +635,6 @@ def build_statements(d, scopes: Scopes):
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "or":
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "sequence":
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "not":
if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement")
@@ -1700,7 +1698,7 @@ class RuleSet:
# feature is found N times
return rec(rule_name, node.child)
elif isinstance(node, (ceng.And, ceng.Sequence)):
elif isinstance(node, ceng.And):
# When evaluating an AND block, all of the children need to match.
#
# So when we index rules, we want to pick the most uncommon feature(s)

Binary file not shown.

After

Width:  |  Height:  |  Size: 210 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

View File

@@ -79,6 +79,7 @@ dependencies = [
"ruamel.yaml>=0.18",
"pefile>=2023.2.7",
"pyelftools>=0.31",
"pyghidra>=3.0.0",
"pydantic>=2",
"rich>=13",
"humanize>=4",
@@ -109,13 +110,6 @@ dependencies = [
]
dynamic = ["version"]
[tool.pytest.ini_options]
filterwarnings = [
"ignore:builtin type SwigPyPacked has no __module__ attribute:DeprecationWarning",
"ignore:builtin type SwigPyObject has no __module__ attribute:DeprecationWarning",
"ignore:builtin type swigvarlink has no __module__ attribute:DeprecationWarning",
]
[tool.setuptools.dynamic]
version = {attr = "capa.version.__version__"}
@@ -130,57 +124,54 @@ dev = [
# These dependencies are not used in production environments
# and should not conflict with other libraries/tooling.
"pre-commit==4.5.0",
"pytest==9.0.2",
"pytest==8.0.0",
"pytest-sugar==1.1.1",
"pytest-instafail==0.5.0",
"flake8==7.3.0",
"flake8-bugbear==25.11.29",
"flake8-bugbear==25.10.21",
"flake8-encodings==0.5.1",
"flake8-comprehensions==3.17.0",
"flake8-logging-format==0.9.0",
"flake8-no-implicit-concat==0.3.5",
"flake8-print==5.0.0",
"flake8-todos==0.3.1",
"flake8-simplify==0.30.0",
"flake8-simplify==0.22.0",
"flake8-use-pathlib==0.3.0",
"flake8-copyright==0.2.4",
"ruff==0.14.7",
"black==25.12.0",
"isort==7.0.0",
"mypy==1.19.1",
"mypy-protobuf==4.0.0",
"PyGithub==2.8.1",
"black==25.11.0",
"isort==6.0.0",
"mypy==1.17.1",
"mypy-protobuf==3.6.0",
"PyGithub==2.6.0",
"bump-my-version==1.2.4",
# type stubs for mypy
"types-backports==0.1.3",
"types-colorama==0.4.15.11",
"types-PyYAML==6.0.8",
"types-psutil==7.2.0.20251228",
"types-psutil==7.0.0.20250218",
"types_requests==2.32.0.20240712",
"types-protobuf==6.32.1.20250918",
"deptry==0.24.0"
"deptry==0.23.0"
]
build = [
# Dev and build dependencies are not relaxed because
# we want all developer environments to be consistent.
# These dependencies are not used in production environments
# and should not conflict with other libraries/tooling.
"pyinstaller==6.17.0",
"pyinstaller==6.16.0",
"setuptools==80.9.0",
"build==1.4.0"
"build==1.3.0"
]
scripts = [
# can (optionally) be more lenient on dependencies here
# see comment on dependencies for more context
"jschema_to_python==1.2.3",
"psutil==7.2.1",
"psutil==7.1.2",
"stix2==3.0.1",
"sarif_om==1.0.4",
"requests>=2.32.4",
]
ghidra = [
"pyghidra>=3.0.0",
]
[tool.deptry]
extend_exclude = [

View File

@@ -12,10 +12,10 @@ cxxfilt==0.3.0
dncil==1.0.2
dnfile==0.17.0
funcy==2.0
humanize==4.15.0
humanize==4.14.0
ida-netnode==3.0
ida-settings==3.2.2
intervaltree==3.2.1
intervaltree==3.1.0
markdown-it-py==4.0.0
mdurl==0.1.2
msgpack==1.0.8
@@ -38,12 +38,12 @@ pyghidra==3.0.0
python-flirt==0.9.2
pyyaml==6.0.2
rich==14.2.0
ruamel-yaml==0.19.1
ruamel-yaml==0.18.6
ruamel-yaml-clib==0.2.14
setuptools==80.9.0
six==1.17.0
sortedcontainers==2.4.0
viv-utils==0.8.0
vivisect==1.2.1
msgspec==0.20.0
msgspec==0.19.0
bump-my-version==1.2.4

2
rules

Submodule rules updated: 6a0d506713...6120dfb6e0

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import contextlib
import collections
from pathlib import Path
@@ -20,7 +20,7 @@ from functools import lru_cache
import pytest
import capa.loader
import capa.main
import capa.features.file
import capa.features.insn
import capa.features.common
@@ -53,7 +53,6 @@ from capa.features.extractors.base_extractor import (
)
from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor
logger = logging.getLogger(__name__)
CD = Path(__file__).resolve().parent
DOTNET_DIR = CD / "data" / "dotnet"
DNFILE_TESTFILES = DOTNET_DIR / "dnfile-testfiles"
@@ -201,73 +200,6 @@ def get_binja_extractor(path: Path):
return extractor
# we can't easily cache this because the extractor relies on global state (the opened database)
# which also has to be closed elsewhere. so, the idalib tests will just take a little bit to run.
def get_idalib_extractor(path: Path):
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import idapro
import ida_auto
import capa.features.extractors.ida.extractor
logger.debug("idalib: opening database...")
idapro.enable_console_messages(False)
# we set the primary and secondary Lumina servers to 0.0.0.0 to disable Lumina,
# which sometimes provides bad names, including overwriting names from debug info.
#
# use -R to load resources, which can help us embedded PE files.
#
# return values from open_database:
# 0 - Success
# 2 - User cancelled or 32-64 bit conversion failed
# 4 - Database initialization failed
# -1 - Generic errors (database already open, auto-analysis failed, etc.)
# -2 - User cancelled operation
ret = idapro.open_database(
str(path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R"
)
if ret != 0:
raise RuntimeError("failed to analyze input file")
logger.debug("idalib: waiting for analysis...")
ida_auto.auto_wait()
logger.debug("idalib: opened database.")
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
fixup_idalib(path, extractor)
return extractor
def fixup_idalib(path: Path, extractor):
"""
IDA fixups to overcome differences between backends
"""
import idaapi
import ida_funcs
def remove_library_id_flag(fva):
f = idaapi.get_func(fva)
f.flags &= ~ida_funcs.FUNC_LIB
ida_funcs.update_func(f)
if "kernel32-64" in path.name:
# remove (correct) library function id, so we can test x64 thunk
remove_library_id_flag(0x1800202B0)
if "al-khaser_x64" in path.name:
# remove (correct) library function id, so we can test x64 nested thunk
remove_library_id_flag(0x14004B4F0)
@lru_cache(maxsize=1)
def get_cape_extractor(path):
from capa.helpers import load_json_from_path
@@ -982,8 +914,20 @@ FEATURE_PRESENCE_TESTS = sorted(
("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), False),
("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True),
# insn/api: x64
(
"kernel32-64",
"function=0x180001010",
capa.features.insn.API("RtlVirtualUnwind"),
True,
),
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True),
# insn/api: x64 thunk
(
"kernel32-64",
"function=0x1800202B0",
capa.features.insn.API("RtlCaptureContext"),
True,
),
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True),
# insn/api: x64 nested thunk
("al-khaser x64", "function=0x14004B4F0", capa.features.insn.API("__vcrt_GetModuleHandle"), True),
@@ -1071,20 +1015,20 @@ FEATURE_PRESENCE_TESTS = sorted(
("pma16-01", "file", OS(OS_WINDOWS), True),
("pma16-01", "file", OS(OS_LINUX), False),
("mimikatz", "file", OS(OS_WINDOWS), True),
("pma16-01", "function=0x401100", OS(OS_WINDOWS), True),
("pma16-01", "function=0x401100,bb=0x401130", OS(OS_WINDOWS), True),
("pma16-01", "function=0x404356", OS(OS_WINDOWS), True),
("pma16-01", "function=0x404356,bb=0x4043B9", OS(OS_WINDOWS), True),
("mimikatz", "function=0x40105D", OS(OS_WINDOWS), True),
("pma16-01", "file", Arch(ARCH_I386), True),
("pma16-01", "file", Arch(ARCH_AMD64), False),
("mimikatz", "file", Arch(ARCH_I386), True),
("pma16-01", "function=0x401100", Arch(ARCH_I386), True),
("pma16-01", "function=0x401100,bb=0x401130", Arch(ARCH_I386), True),
("pma16-01", "function=0x404356", Arch(ARCH_I386), True),
("pma16-01", "function=0x404356,bb=0x4043B9", Arch(ARCH_I386), True),
("mimikatz", "function=0x40105D", Arch(ARCH_I386), True),
("pma16-01", "file", Format(FORMAT_PE), True),
("pma16-01", "file", Format(FORMAT_ELF), False),
("mimikatz", "file", Format(FORMAT_PE), True),
# format is also a global feature
("pma16-01", "function=0x401100", Format(FORMAT_PE), True),
("pma16-01", "function=0x404356", Format(FORMAT_PE), True),
("mimikatz", "function=0x456BB9", Format(FORMAT_PE), True),
# elf support
("7351f.elf", "file", OS(OS_LINUX), True),

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import capa.features.address
from capa.engine import Or, And, Not, Some, Range, Sequence
from capa.engine import Or, And, Not, Some, Range
from capa.features.insn import Number
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
@@ -155,145 +155,3 @@ def test_eval_order():
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)
def test_sequence():
# 1 before 2
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
# 1 same as 2 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False
# 1 before 2 before 3
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
)
)
is True
)
# 1 before 2 before 3 (fail, 3 is early)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# 1 before 2 before 3 (fail, 2 is late)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# multiple locations for matches
# 1 at 1, 2 at 2 (match)
# 1 also at 3
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True
# greedy matching?
# 1 at 2, 2 at 3
# 1 matches at 2, so min_loc becomes 2.
# 2 matches at 3, > 2. Match.
# But wait, 1 also matches at 4.
# If we picked 4, 1 > 2 would fail? No.
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).
# CASE:
# 1 matches at 10.
# 2 matches at 5 and 15.
# if 2 picks 5, 5 > 10 is False.
# if 2 picks 15, 15 > 10 is True. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {
capa.features.address.AbsoluteVirtualAddress(5),
capa.features.address.AbsoluteVirtualAddress(15),
},
}
)
)
is True
)
# CASE:
# 1 matches at 10 and 20.
# 2 matches at 15.
# 1 should pick 10. 10 < 15. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {
capa.features.address.AbsoluteVirtualAddress(10),
capa.features.address.AbsoluteVirtualAddress(20),
},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
}
)
)
is True
)
# CASE:
# 1 matched at 10.
# 2 matched at 15.
# 3 matched at 12.
# 1 -> 10.
# 2 -> 15 (> 10).
# 3 -> 12 (not > 15).
# Fail.
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
}
)
)
is False
)
def test_location_propagation():
# regression tests for issue where Or/And/Some statements
# failed to propagate match locations to their results,
# causing Sequence evaluation to fail.
# Or
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first match
assert Or([Number(1), Number(2)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
).locations == {ADDR1, ADDR2}
# And
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}
# Some
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first sufficient set
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1,
ADDR2,
}

View File

@@ -26,17 +26,10 @@ ghidra_present = importlib.util.find_spec("pyghidra") is not None and "GHIDRA_IN
@fixtures.parametrize(
"sample,scope,feature,expected",
[
(
pytest.param(
*t,
marks=pytest.mark.xfail(
reason="specific to Vivisect and basic blocks do not align with Ghidra's analysis"
),
)
if t[0] == "294b8d..." and t[2] == capa.features.common.String("\r\n\x00:ht")
else t
)
t
for t in fixtures.FEATURE_PRESENCE_TESTS
# this test case is specific to Vivisect and its basic blocks do not align with Ghidra's analysis
if t[0] != "294b8d..." or t[2] != capa.features.common.String("\r\n\x00:ht")
],
indirect=["sample", "scope"],
)

View File

@@ -1,86 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pathlib import Path
import pytest
import fixtures
import capa.features.extractors.ida.idalib
from capa.features.file import FunctionName
from capa.features.insn import API
from capa.features.common import Characteristic
logger = logging.getLogger(__name__)
idalib_present = capa.features.extractors.ida.idalib.has_idalib()
if idalib_present:
try:
import idapro # noqa: F401 [imported but unused]
import ida_kernwin
kernel_version: str = ida_kernwin.get_kernel_version()
except ImportError:
idalib_present = False
kernel_version = "0.0"
@pytest.mark.skipif(idalib_present is False, reason="Skip idalib tests if the idalib Python API is not installed")
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_SYMTAB_FUNC_TESTS,
indirect=["sample", "scope"],
)
def test_idalib_features(sample: Path, scope, feature, expected):
if kernel_version in {"9.0", "9.1"} and sample.name.startswith("2bf18d"):
if isinstance(feature, (API, FunctionName)) and feature.value == "__libc_connect":
# see discussion here: https://github.com/mandiant/capa/pull/2742#issuecomment-3674146335
#
# > i confirmed that there were changes in 9.2 related to the ELF loader handling names,
# > so I think its reasonable to conclude that 9.1 and older had a bug that
# > prevented this name from surfacing.
pytest.xfail(f"IDA {kernel_version} does not extract all ELF symbols")
if kernel_version in {"9.0"} and sample.name.startswith("Practical Malware Analysis Lab 12-04.exe_"):
if isinstance(feature, Characteristic) and feature.value == "embedded pe":
# see discussion here: https://github.com/mandiant/capa/pull/2742#issuecomment-3667086165
#
# idalib for IDA 9.0 doesn't support argv arguments, so we can't ask that resources are loaded
pytest.xfail("idalib 9.0 does not support loading resource segments")
try:
fixtures.do_test_feature_presence(fixtures.get_idalib_extractor, sample, scope, feature, expected)
finally:
logger.debug("closing database...")
import idapro
idapro.close_database(save=False)
logger.debug("closed database.")
@pytest.mark.skipif(idalib_present is False, reason="Skip idalib tests if the idalib Python API is not installed")
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_idalib_feature_counts(sample, scope, feature, expected):
try:
fixtures.do_test_feature_count(fixtures.get_idalib_extractor, sample, scope, feature, expected)
finally:
logger.debug("closing database...")
import idapro
idapro.close_database(save=False)
logger.debug("closed database.")

View File

@@ -80,28 +80,6 @@ def test_rule_yaml():
assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True
def test_rule_yaml_sequence():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: function
dynamic: process
features:
- sequence:
- number: 1
- number: 2
"""
)
r = capa.rules.Rule.from_yaml(rule)
# 1 before 2 -> Match
assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 -> No match
assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
def test_rule_yaml_complex():
rule = textwrap.dedent(
"""
@@ -1675,70 +1653,3 @@ def test_circular_dependency():
]
with pytest.raises(capa.rules.InvalidRule):
list(capa.rules.get_rules_and_dependencies(rules, rules[0].name))
def test_rule_yaml_sequence_with_subscope():
# This test mimics the dynamic analysis flow to verify Sequence with subscopes.
rule_yaml = textwrap.dedent(
"""
rule:
meta:
name: test sequence subscope
scopes:
static: function
dynamic: span of calls
features:
- sequence:
- call:
- number: 1
- number: 2
"""
)
# 1. Load rules (triggers subscope extraction)
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)])
# 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls)
call_rules = rules.rules_by_scope[capa.rules.Scope.CALL]
span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS]
assert len(call_rules) == 1
assert len(span_rules) == 1
main_rule = span_rules[0]
subscope_rule = call_rules[0]
# 3. Simulate features
# Call 1: Number(1) -> Matches subscope rule
# Call 2: Number(2) -> Matches second part of sequence
# Address setup
thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1)
call1_addr = capa.features.address.DynamicCallAddress(thread, 1)
call2_addr = capa.features.address.DynamicCallAddress(thread, 2)
features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}}
# 4. Match Call Scope Rules (Simulate find_call_capabilities)
# Match subscope rule against Call 1
# We need to filter features to just Call 1 for this rule?
# Actually, RuleSet.match takes features.
# Match at Call 1
_, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr)
# Should match subscope rule
assert subscope_rule.name in matches1
# Index the match
capa.engine.index_rule_matches(features, subscope_rule, [call1_addr])
# 5. Match Span Scope Rules (Simulate find_span_capabilities)
# Now features contains MatchedRule(subscope_rule).
# Sequence should see:
# - call: matches subscope_rule at call1_addr
# - number: 2 at call2_addr
# call1_addr (id=1) < call2_addr (id=2). Sequence matches.
_, matches_span = rules.match(
capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr
) # addr doesn't matter much for span match logic itself, but passed to result
assert main_rule.name in matches_span