resolved merge conflicts

This commit is contained in:
Pratham Chauhan
2023-03-30 11:05:32 +05:30
46 changed files with 4928 additions and 82 deletions

View File

@@ -61,6 +61,7 @@ a = Analysis(
"qt5",
"pyqtwebengine",
"pyasn1",
"binaryninja",
],
)

View File

@@ -34,11 +34,11 @@ jobs:
- name: Install dependencies
run: pip install -e .[dev]
- name: Lint with isort
run: isort --profile black --length-sort --line-width 120 -c .
run: isort --profile black --length-sort --line-width 120 --skip-glob "*_pb2.py" -c .
- name: Lint with black
run: black -l 120 --check .
run: black -l 120 --extend-exclude ".*_pb2.py" --check .
- name: Lint with pycodestyle
run: pycodestyle --show-source capa/ scripts/ tests/
run: pycodestyle --exclude="*_pb2.py" --show-source capa/ scripts/ tests/
- name: Check types with mypy
run: mypy --config-file .github/mypy/mypy.ini --check-untyped-defs capa/ scripts/ tests/
@@ -90,3 +90,38 @@ jobs:
run: pip install -e .[dev]
- name: Run tests
run: pytest -v tests/
binja-tests:
name: Binary Ninja tests for ${{ matrix.python-version }} on ${{ matrix.os }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.11"]
steps:
- name: Checkout capa with submodules
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3.3.0
with:
submodules: recursive
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@d27e3f3d7c64b4bbf8e4abfb9b63b83e846e0435 # v4.5.0
with:
python-version: ${{ matrix.python-version }}
- name: Install pyyaml
run: sudo apt-get install -y libyaml-dev
- name: Install capa
run: pip install -e .[dev]
- name: install Binary Ninja
env:
BN_SERIAL: ${{ secrets.BN_SERIAL }}
run: |
mkdir ./.github/binja
curl "https://raw.githubusercontent.com/Vector35/binaryninja-api/6812c97/scripts/download_headless.py" -o ./.github/binja/download_headless.py
python ./.github/binja/download_headless.py --serial $BN_SERIAL --output .github/binja/BinaryNinja-headless.zip
unzip .github/binja/BinaryNinja-headless.zip -d .github/binja/
python .github/binja/binaryninja/scripts/install_api.py --install-on-root --silent
- name: Run tests
env:
BN_LICENSE: ${{ secrets.BN_LICENSE }}
run: pytest -v tests/test_binja_features.py # explicitly refer to the binja tests for performance. other tests run above.

5
.gitignore vendored
View File

@@ -118,7 +118,12 @@ rule-linter-output.log
scripts/perf/*.txt
scripts/perf/*.svg
scripts/perf/*.zip
.direnv
.envrc
.DS_Store
*/.DS_Store
Pipfile
Pipfile.lock
/cache/
.github/binja/binaryninja

View File

@@ -3,11 +3,15 @@
## master (unreleased)
### New Features
- add protobuf format for result documents #1219 @williballenthin @mr-tz
- extractor: add Binary Ninja feature extractor @xusheng6
- new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42
- Change colour/highlight to "cyan" instead of "blue" for easy noticing.#1384 @ggold7046
- add new format to parse output json back to capa #1396 @ooprathamm
### Breaking Changes
### New Rules (20)
### New Rules (24)
- persistence/scheduled-tasks/schedule-task-via-at joren485
- data-manipulation/prng/generate-random-numbers-via-rtlgenrandom william.ballenthin@mandiant.com
@@ -29,6 +33,10 @@
- nursery/hash-data-using-ripemd256 raymond.leong@mandiant.com
- nursery/hash-data-using-ripemd320 raymond.leong@mandiant.com
- nursery/set-web-proxy-in-dotnet michael.hunhoff@mandiant.com
- nursery/check-for-windows-sandbox-via-subdirectory echernofsky@google.com
- nursery/enumerate-pe-sections-in-dotnet @mr-tz
- nursery/destroy-software-breakpoint-capability echernofsky@google.com
- nursery/send-data-to-internet michael.hunhoff@mandiant.com
-
### Bug Fixes

View File

@@ -2,7 +2,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/flare-capa)](https://pypi.org/project/flare-capa)
[![Last release](https://img.shields.io/github/v/release/mandiant/capa)](https://github.com/mandiant/capa/releases)
[![Number of rules](https://img.shields.io/badge/rules-787-blue.svg)](https://github.com/mandiant/capa-rules)
[![Number of rules](https://img.shields.io/badge/rules-792-blue.svg)](https://github.com/mandiant/capa-rules)
[![CI status](https://github.com/mandiant/capa/workflows/CI/badge.svg)](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
[![Downloads](https://img.shields.io/github/downloads/mandiant/capa/total)](https://github.com/mandiant/capa/releases)
[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt)

View File

@@ -417,6 +417,8 @@ OS_MACOS = "macos"
OS_ANY = "any"
VALID_OS = {os.value for os in capa.features.extractors.elf.OS}
VALID_OS.update({OS_WINDOWS, OS_LINUX, OS_MACOS, OS_ANY})
# internal only, not to be used in rules
OS_AUTO = "auto"
class OS(Feature):

View File

@@ -0,0 +1,146 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import string
import struct
from typing import Tuple, Iterator
from binaryninja import Function
from binaryninja import BasicBlock as BinjaBasicBlock
from binaryninja import (
BinaryView,
VariableSourceType,
MediumLevelILSetVar,
MediumLevelILOperation,
MediumLevelILBasicBlock,
MediumLevelILInstruction,
)
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
def get_printable_len(il: MediumLevelILSetVar) -> int:
"""Return string length if all operand bytes are ascii or utf16-le printable"""
width = il.dest.type.width
value = il.src.value.value
if width == 1:
chars = struct.pack("<B", value & 0xFF)
elif width == 2:
chars = struct.pack("<H", value & 0xFFFF)
elif width == 4:
chars = struct.pack("<I", value & 0xFFFFFFFF)
elif width == 8:
chars = struct.pack("<Q", value & 0xFFFFFFFFFFFFFFFF)
else:
return 0
def is_printable_ascii(chars_: bytes):
return all(c < 127 and chr(c) in string.printable for c in chars_)
def is_printable_utf16le(chars_: bytes):
if all(c == 0x00 for c in chars_[1::2]):
return is_printable_ascii(chars_[::2])
if is_printable_ascii(chars):
return width
if is_printable_utf16le(chars):
return width // 2
return 0
def is_mov_imm_to_stack(il: MediumLevelILInstruction) -> bool:
"""verify instruction moves immediate onto stack"""
if il.operation != MediumLevelILOperation.MLIL_SET_VAR:
return False
if il.src.operation != MediumLevelILOperation.MLIL_CONST:
return False
if not il.dest.source_type == VariableSourceType.StackVariableSourceType:
return False
return True
def bb_contains_stackstring(f: Function, bb: MediumLevelILBasicBlock) -> bool:
"""check basic block for stackstring indicators
true if basic block contains enough moves of constant bytes to the stack
"""
count = 0
for il in bb:
if is_mov_imm_to_stack(il):
count += get_printable_len(il)
if count > MIN_STACKSTRING_LEN:
return True
return False
def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract stackstring indicators from basic block"""
bb: Tuple[BinjaBasicBlock, MediumLevelILBasicBlock] = bbh.inner
if bb[1] is not None and bb_contains_stackstring(fh.inner, bb[1]):
yield Characteristic("stack string"), bbh.address
def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract tight loop indicators from a basic block"""
bb: Tuple[BinjaBasicBlock, MediumLevelILBasicBlock] = bbh.inner
for edge in bb[0].outgoing_edges:
if edge.target.start == bb[0].start:
yield Characteristic("tight loop"), bbh.address
def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract basic block features"""
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, addr in bb_handler(fh, bbh):
yield feature, addr
yield BasicBlock(), bbh.address
BASIC_BLOCK_HANDLERS = (
extract_bb_tight_loop,
extract_bb_stackstring,
)
def main():
if len(sys.argv) < 2:
return
from binaryninja import BinaryViewType
from capa.features.extractors.binja.extractor import BinjaFeatureExtractor
bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1])
if bv is None:
return
features = []
extractor = BinjaFeatureExtractor(bv)
for fh in extractor.get_functions():
for bbh in extractor.get_basic_blocks(fh):
features.extend(list(extract_features(fh, bbh)))
import pprint
pprint.pprint(features)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,77 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple, Iterator
import binaryninja as binja
import capa.features.extractors.elf
import capa.features.extractors.binja.file
import capa.features.extractors.binja.insn
import capa.features.extractors.binja.global_
import capa.features.extractors.binja.function
import capa.features.extractors.binja.basicblock
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
class BinjaFeatureExtractor(FeatureExtractor):
def __init__(self, bv: binja.BinaryView):
super().__init__()
self.bv = bv
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.binja.file.extract_file_format(self.bv))
self.global_features.extend(capa.features.extractors.binja.global_.extract_os(self.bv))
self.global_features.extend(capa.features.extractors.binja.global_.extract_arch(self.bv))
def get_base_address(self):
return AbsoluteVirtualAddress(self.bv.start)
def extract_global_features(self):
yield from self.global_features
def extract_file_features(self):
yield from capa.features.extractors.binja.file.extract_features(self.bv)
def get_functions(self) -> Iterator[FunctionHandle]:
for f in self.bv.functions:
yield FunctionHandle(address=AbsoluteVirtualAddress(f.start), inner=f)
def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.binja.function.extract_features(fh)
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
f: binja.Function = fh.inner
# Set up a MLIL basic block dict look up to associate the disassembly basic block with its MLIL basic block
mlil_lookup = {}
for mlil_bb in f.mlil.basic_blocks:
mlil_lookup[mlil_bb.source_block.start] = mlil_bb
for bb in f.basic_blocks:
mlil_bb = None
if bb.start in mlil_lookup:
mlil_bb = mlil_lookup[bb.start]
yield BBHandle(address=AbsoluteVirtualAddress(bb.start), inner=(bb, mlil_bb))
def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.binja.basicblock.extract_features(fh, bbh)
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
import capa.features.extractors.binja.helpers as binja_helpers
bb: Tuple[binja.BasicBlock, binja.MediumLevelILBasicBlock] = bbh.inner
addr = bb[0].start
for text, length in bb[0]:
insn = binja_helpers.DisassemblyInstruction(addr, length, text)
yield InsnHandle(address=AbsoluteVirtualAddress(addr), inner=insn)
addr += length
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
yield from capa.features.extractors.binja.insn.extract_features(fh, bbh, ih)

View File

@@ -0,0 +1,188 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import struct
from typing import Tuple, Iterator
from binaryninja import Symbol, Segment, BinaryView, SymbolType, SymbolBinding
import capa.features.extractors.common
import capa.features.extractors.helpers
import capa.features.extractors.strings
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress
from capa.features.extractors.binja.helpers import unmangle_c_name
def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[int, int]]:
"""check segment for embedded PE
adapted for binja from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
"""
mz_xor = [
(
capa.features.extractors.helpers.xor_static(b"MZ", i),
capa.features.extractors.helpers.xor_static(b"PE", i),
i,
)
for i in range(256)
]
todo = []
# If this is the first segment of the binary, skip the first bytes. Otherwise, there will always be a matched
# PE at the start of the binaryview.
start = seg.start
if bv.view_type == "PE" and start == bv.start:
start += 1
for mzx, pex, i in mz_xor:
for off, _ in bv.find_all_data(start, seg.end, mzx):
todo.append((off, mzx, pex, i))
while len(todo):
off, mzx, pex, i = todo.pop()
# The MZ header has one field we will check e_lfanew is at 0x3c
e_lfanew = off + 0x3C
if seg.end < (e_lfanew + 4):
continue
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(bv.read(e_lfanew, 4), i))[0]
peoff = off + newoff
if seg.end < (peoff + 2):
continue
if bv.read(peoff, 2) == pex:
yield off, i
def extract_file_embedded_pe(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract embedded PE features"""
for seg in bv.segments:
for ea, _ in check_segment_for_pe(bv, seg):
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
def extract_file_export_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract function exports"""
for sym in bv.get_symbols_of_type(SymbolType.FunctionSymbol):
if sym.binding in [SymbolBinding.GlobalBinding, SymbolBinding.WeakBinding]:
name = sym.short_name
yield Export(name), AbsoluteVirtualAddress(sym.address)
unmangled_name = unmangle_c_name(name)
if name != unmangled_name:
yield Export(unmangled_name), AbsoluteVirtualAddress(sym.address)
def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract function imports
1. imports by ordinal:
- modulename.#ordinal
2. imports by name, results in two features to support importname-only
matching:
- modulename.importname
- importname
"""
for sym in bv.get_symbols_of_type(SymbolType.ImportAddressSymbol):
lib_name = str(sym.namespace)
addr = AbsoluteVirtualAddress(sym.address)
for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym.short_name):
yield Import(name), addr
ordinal = sym.ordinal
if ordinal != 0 and (lib_name != ""):
ordinal_name = f"#{ordinal}"
for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name):
yield Import(name), addr
def extract_file_section_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract section names"""
for name, section in bv.sections.items():
yield Section(name), AbsoluteVirtualAddress(section.start)
def extract_file_strings(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract ASCII and UTF-16 LE strings"""
for s in bv.strings:
yield String(s.value), FileOffsetAddress(s.start)
def extract_file_function_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of statically-linked library functions.
"""
for sym_name in bv.symbols:
for sym in bv.symbols[sym_name]:
if sym.type == SymbolType.LibraryFunctionSymbol:
name = sym.short_name
yield FunctionName(name), sym.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), sym.address
def extract_file_format(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
view_type = bv.view_type
if view_type in ["PE", "COFF"]:
yield Format(FORMAT_PE), NO_ADDRESS
elif view_type == "ELF":
yield Format(FORMAT_ELF), NO_ADDRESS
elif view_type == "Raw":
# no file type to return when processing a binary file, but we want to continue processing
return
else:
raise NotImplementedError(f"unexpected file format: {view_type}")
def extract_features(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract file features"""
for file_handler in FILE_HANDLERS:
for feature, addr in file_handler(bv):
yield feature, addr
FILE_HANDLERS = (
extract_file_export_names,
extract_file_import_names,
extract_file_strings,
extract_file_section_names,
extract_file_embedded_pe,
extract_file_function_names,
extract_file_format,
)
def main():
""" """
if len(sys.argv) < 2:
return
from binaryninja import BinaryViewType
bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1])
if bv is None:
return
import pprint
pprint.pprint(list(extract_features(bv)))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import subprocess
# When the script gets executed as a standalone executable (via PyInstaller), `import binaryninja` does not work because
# we have excluded the binaryninja module in `pyinstaller.spec`. The trick here is to call the system Python and try
# to find out the path of the binaryninja module that has been installed.
# Note, including the binaryninja module in the `pyintaller.spec` would not work, since the binaryninja module tries to
# find the binaryninja core e.g., `libbinaryninjacore.dylib`, using a relative path. And this does not work when the
# binaryninja module is extracted by the PyInstaller.
code = r"""
from pathlib import Path
import importlib
spec = importlib.util.find_spec('binaryninja')
if spec is not None:
if len(spec.submodule_search_locations) > 0:
path = Path(spec.submodule_search_locations[0])
# encode the path with utf8 then convert to hex, make sure it can be read and restored properly
print(str(path.parent).encode('utf8').hex())
"""
def find_binja_path() -> str:
raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip()
return bytes.fromhex(raw_output).decode("utf8")
if __name__ == "__main__":
print(find_binja_path())

View File

@@ -0,0 +1,97 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
from typing import Tuple, Iterator
from binaryninja import Function, BinaryView, LowLevelILOperation
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle
def extract_function_calls_to(fh: FunctionHandle):
"""extract callers to a function"""
func: Function = fh.inner
bv: BinaryView = func.view
for caller in func.caller_sites:
# Everything that is a code reference to the current function is considered a caller, which actually includes
# many other references that are NOT a caller. For example, an instruction `push function_start` will also be
# considered a caller to the function
if caller.llil.operation in [
LowLevelILOperation.LLIL_CALL,
LowLevelILOperation.LLIL_CALL_STACK_ADJUST,
LowLevelILOperation.LLIL_JUMP,
LowLevelILOperation.LLIL_TAILCALL,
]:
yield Characteristic("calls to"), AbsoluteVirtualAddress(caller.address)
def extract_function_loop(fh: FunctionHandle):
"""extract loop indicators from a function"""
func: Function = fh.inner
edges = []
# construct control flow graph
for bb in func.basic_blocks:
for edge in bb.outgoing_edges:
edges.append((bb.start, edge.target.start))
if loops.has_loop(edges):
yield Characteristic("loop"), fh.address
def extract_recursive_call(fh: FunctionHandle):
"""extract recursive function call"""
func: Function = fh.inner
bv: BinaryView = func.view
if bv is None:
return
for ref in bv.get_code_refs(func.start):
if ref.function == func:
yield Characteristic("recursive call"), fh.address
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
for func_handler in FUNCTION_HANDLERS:
for feature, addr in func_handler(fh):
yield feature, addr
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
def main():
""" """
if len(sys.argv) < 2:
return
from binaryninja import BinaryViewType
from capa.features.extractors.binja.extractor import BinjaFeatureExtractor
bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1])
if bv is None:
return
features = []
extractor = BinjaFeatureExtractor(bv)
for fh in extractor.get_functions():
features.extend(list(extract_features(fh)))
import pprint
pprint.pprint(features)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,55 @@
import logging
import contextlib
from typing import Tuple, Iterator
from binaryninja import BinaryView
import capa.features.extractors.elf
from capa.features.common import OS, OS_MACOS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Feature
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
def extract_os(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
name = bv.platform.name
if "-" in name:
name = name.split("-")[0]
if name == "windows":
yield OS(OS_WINDOWS), NO_ADDRESS
elif name == "macos":
yield OS(OS_MACOS), NO_ADDRESS
elif name in ["linux", "freebsd", "decree"]:
yield OS(name), NO_ADDRESS
else:
# we likely end up here:
# 1. handling shellcode, or
# 2. handling a new file format (e.g. macho)
#
# for (1) we can't do much - its shellcode and all bets are off.
# we could maybe accept a further CLI argument to specify the OS,
# but i think this would be rarely used.
# rules that rely on OS conditions will fail to match on shellcode.
#
# for (2), this logic will need to be updated as the format is implemented.
logger.debug("unsupported file format: %s, will not guess OS", name)
return
def extract_arch(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
arch = bv.arch.name
if arch == "x86_64":
yield Arch(ARCH_AMD64), NO_ADDRESS
elif arch == "x86":
yield Arch(ARCH_I386), NO_ADDRESS
else:
# we likely end up here:
# 1. handling a new architecture (e.g. aarch64)
#
# for (1), this logic will need to be updated as the format is implemented.
logger.debug("unsupported architecture: %s", arch)
return

View File

@@ -0,0 +1,50 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import re
from typing import List, Callable
from dataclasses import dataclass
from binaryninja import LowLevelILInstruction
from binaryninja.architecture import InstructionTextToken
@dataclass
class DisassemblyInstruction:
address: int
length: int
text: List[InstructionTextToken]
LLIL_VISITOR = Callable[[LowLevelILInstruction, LowLevelILInstruction, int], bool]
def visit_llil_exprs(il: LowLevelILInstruction, func: LLIL_VISITOR):
# BN does not really support operand index at the disassembly level, so use the LLIL operand index as a substitute.
# Note, this is NOT always guaranteed to be the same as disassembly operand.
for i, op in enumerate(il.operands):
if isinstance(op, LowLevelILInstruction) and func(op, il, i):
visit_llil_exprs(op, func)
def unmangle_c_name(name: str) -> str:
# https://learn.microsoft.com/en-us/cpp/build/reference/decorated-names?view=msvc-170#FormatC
# Possible variations for BaseThreadInitThunk:
# @BaseThreadInitThunk@12
# _BaseThreadInitThunk
# _BaseThreadInitThunk@12
# It is also possible for a function to have a `Stub` appended to its name:
# _lstrlenWStub@4
# A small optimization to avoid running the regex too many times
# TODO: this still increases the unit test execution time from 170s to 200s, should be able to accelerate it
if name[0] in ["@", "_"]:
match = re.match(r"^[@|_](.*?)(Stub)?(@\d+)?$", name)
if match:
return match.group(1)
return name

View File

@@ -0,0 +1,630 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
from typing import Any, Dict, List, Tuple, Iterator, Optional
from binaryninja import Function
from binaryninja import BasicBlock as BinjaBasicBlock
from binaryninja import (
BinaryView,
ILRegister,
SymbolType,
BinaryReader,
RegisterValueType,
LowLevelILOperation,
LowLevelILInstruction,
InstructionTextTokenType,
)
import capa.features.extractors.helpers
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.binja.helpers import DisassemblyInstruction, visit_llil_exprs
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
# check if a function is a stub function to another function/symbol. The criteria is:
# 1. The function must only have one basic block
# 2. The function must only make one call/jump to another address
# If the function being checked is a stub function, returns the target address. Otherwise, return None.
def is_stub_function(bv: BinaryView, addr: int) -> Optional[int]:
funcs = bv.get_functions_at(addr)
for func in funcs:
if len(func.basic_blocks) != 1:
continue
call_count = 0
call_target = None
for il in func.llil.instructions:
if il.operation in [
LowLevelILOperation.LLIL_CALL,
LowLevelILOperation.LLIL_CALL_STACK_ADJUST,
LowLevelILOperation.LLIL_JUMP,
LowLevelILOperation.LLIL_TAILCALL,
]:
call_count += 1
if il.dest.value.type in [
RegisterValueType.ImportedAddressValue,
RegisterValueType.ConstantValue,
RegisterValueType.ConstantPointerValue,
]:
call_target = il.dest.value.value
if call_count == 1 and call_target is not None:
return call_target
return None
def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction API features
example:
call dword [0x00473038]
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
for llil in func.get_llils_at(ih.address):
if llil.operation in [
LowLevelILOperation.LLIL_CALL,
LowLevelILOperation.LLIL_CALL_STACK_ADJUST,
LowLevelILOperation.LLIL_JUMP,
LowLevelILOperation.LLIL_TAILCALL,
]:
if llil.dest.value.type not in [
RegisterValueType.ImportedAddressValue,
RegisterValueType.ConstantValue,
RegisterValueType.ConstantPointerValue,
]:
continue
address = llil.dest.value.value
candidate_addrs = [address]
stub_addr = is_stub_function(bv, address)
if stub_addr is not None:
candidate_addrs.append(stub_addr)
for address in candidate_addrs:
sym = func.view.get_symbol_at(address)
if sym is None or sym.type not in [SymbolType.ImportAddressSymbol, SymbolType.ImportedFunctionSymbol]:
continue
sym_name = sym.short_name
lib_name = ""
import_lib = bv.lookup_imported_object_library(sym.address)
if import_lib is not None:
lib_name = import_lib[0].name
if lib_name.endswith(".dll"):
lib_name = lib_name[:-4]
elif lib_name.endswith(".so"):
lib_name = lib_name[:-3]
for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym_name):
yield API(name), ih.address
if sym_name.startswith("_"):
for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym_name[1:]):
yield API(name), ih.address
def extract_insn_number_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction number features
example:
push 3136B0h ; dwControlCode
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
results: List[Tuple[Any[Number, OperandNumber], Address]] = []
address_size = func.view.arch.address_size * 8
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
if il.operation == LowLevelILOperation.LLIL_LOAD:
return False
if il.operation not in [LowLevelILOperation.LLIL_CONST, LowLevelILOperation.LLIL_CONST_PTR]:
return True
for op in parent.operands:
if isinstance(op, ILRegister) and op.name in ["esp", "ebp", "rsp", "rbp", "sp"]:
return False
elif isinstance(op, LowLevelILInstruction) and op.operation == LowLevelILOperation.LLIL_REG:
if op.src.name in ["esp", "ebp", "rsp", "rbp", "sp"]:
return False
raw_value = il.value.value
if parent.operation == LowLevelILOperation.LLIL_SUB:
raw_value = -raw_value
results.append((Number(raw_value), ih.address))
results.append((OperandNumber(index, raw_value), ih.address))
return False
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for result in results:
yield result
def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse referenced byte sequences
example:
push offset iid_004118d4_IShellLinkA ; riid
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
candidate_addrs = set()
llil = func.get_llil_at(ih.address)
if llil is None or llil.operation in [LowLevelILOperation.LLIL_CALL, LowLevelILOperation.LLIL_CALL_STACK_ADJUST]:
return
for ref in bv.get_code_refs_from(ih.address):
if ref == ih.address:
continue
if len(bv.get_functions_containing(ref)) > 0:
continue
candidate_addrs.add(ref)
# collect candidate address by enumerating all integers, https://github.com/Vector35/binaryninja-api/issues/3966
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
if il.operation in [LowLevelILOperation.LLIL_CONST, LowLevelILOperation.LLIL_CONST_PTR]:
value = il.value.value
if value > 0:
candidate_addrs.add(value)
return False
return True
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for addr in candidate_addrs:
extracted_bytes = bv.read(addr, MAX_BYTES_FEATURE_SIZE)
if extracted_bytes and not capa.features.extractors.helpers.all_zeros(extracted_bytes):
if bv.get_string_at(addr) is None:
# don't extract byte features for obvious strings
yield Bytes(extracted_bytes), ih.address
def extract_insn_string_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction string features
example:
push offset aAcr ; "ACR > "
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
candidate_addrs = set()
# collect candidate address from code refs directly
for ref in bv.get_code_refs_from(ih.address):
if ref == ih.address:
continue
if len(bv.get_functions_containing(ref)) > 0:
continue
candidate_addrs.add(ref)
# collect candidate address by enumerating all integers, https://github.com/Vector35/binaryninja-api/issues/3966
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
if il.operation in [LowLevelILOperation.LLIL_CONST, LowLevelILOperation.LLIL_CONST_PTR]:
value = il.value.value
if value > 0:
candidate_addrs.add(value)
return False
return True
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
# Now we have all the candidate address, check them for string or pointer to string
br = BinaryReader(bv)
for addr in candidate_addrs:
found = bv.get_string_at(addr)
if found:
yield String(found.value), ih.address
br.seek(addr)
pointer = None
if bv.arch.address_size == 4:
pointer = br.read32()
elif bv.arch.address_size == 8:
pointer = br.read64()
if pointer is not None:
found = bv.get_string_at(pointer)
if found:
yield String(found.value), ih.address
def extract_insn_offset_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction structure offset features
example:
.text:0040112F cmp [esi+4], ebx
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
results: List[Tuple[Any[Offset, OperandOffset], Address]] = []
address_size = func.view.arch.address_size * 8
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
# The most common case, read/write dereference to something like `dword [eax+0x28]`
if il.operation in [LowLevelILOperation.LLIL_ADD, LowLevelILOperation.LLIL_SUB]:
left = il.left
right = il.right
# Exclude offsets based on stack/franme pointers
if left.operation == LowLevelILOperation.LLIL_REG and left.src.name in ["esp", "ebp", "rsp", "rbp", "sp"]:
return True
if right.operation != LowLevelILOperation.LLIL_CONST:
return True
raw_value = right.value.value
# If this is not a dereference, then this must be an add and the offset must be in the range \
# [0, MAX_STRUCTURE_SIZE]. For example,
# add eax, 0x10,
# lea ebx, [eax + 1]
if parent.operation not in [LowLevelILOperation.LLIL_LOAD, LowLevelILOperation.LLIL_STORE]:
if il.operation != LowLevelILOperation.LLIL_ADD or (not 0 < raw_value < MAX_STRUCTURE_SIZE):
return False
if address_size > 0:
# BN also encodes the constant value as two's complement, we need to restore its original value
value = capa.features.extractors.helpers.twos_complement(raw_value, address_size)
else:
value = raw_value
results.append((Offset(value), ih.address))
results.append((OperandOffset(index, value), ih.address))
return False
# An edge case: for code like `push dword [esi]`, we need to generate a feature for offset 0x0
elif il.operation in [LowLevelILOperation.LLIL_LOAD, LowLevelILOperation.LLIL_STORE]:
if il.operands[0].operation == LowLevelILOperation.LLIL_REG:
results.append((Offset(0), ih.address))
results.append((OperandOffset(index, 0), ih.address))
return False
return True
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for result in results:
yield result
def is_nzxor_stack_cookie(f: Function, bb: BinjaBasicBlock, llil: LowLevelILInstruction) -> bool:
"""check if nzxor exists within stack cookie delta"""
# TODO: we can do a much accurate analysi using LLIL SSA
reg_names = []
if llil.left.operation == LowLevelILOperation.LLIL_REG:
reg_names.append(llil.left.src.name)
if llil.right.operation == LowLevelILOperation.LLIL_REG:
reg_names.append(llil.right.src.name)
# stack cookie reg should be stack/frame pointer
if not any(reg in ["ebp", "esp", "rbp", "rsp", "sp"] for reg in reg_names):
return False
# expect security cookie init in first basic block within first bytes (instructions)
if len(bb.incoming_edges) == 0 and llil.address < (bb.start + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
if len(bb.outgoing_edges) == 0 and llil.address > (bb.end - SECURITY_COOKIE_BYTES_DELTA):
return True
return False
def extract_insn_nzxor_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction non-zeroing XOR instruction
ignore expected non-zeroing XORs, e.g. security cookies
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
results = []
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
# If the two operands of the xor instruction are the same, the LLIL will be translated to other instructions,
# e.g., <llil: eax = 0>, (LLIL_SET_REG). So we do not need to check whether the two operands are the same.
if il.operation == LowLevelILOperation.LLIL_XOR:
# Exclude cases related to the stack cookie
if is_nzxor_stack_cookie(fh.inner, bbh.inner[0], il):
return False
results.append((Characteristic("nzxor"), ih.address))
return False
else:
return True
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for result in results:
yield result
def extract_insn_mnemonic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction mnemonic features"""
insn: DisassemblyInstruction = ih.inner
yield Mnemonic(insn.text[0].text), ih.address
def extract_insn_obfs_call_plus_5_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse call $+5 instruction from the given instruction.
"""
insn: DisassemblyInstruction = ih.inner
if insn.text[0].text == "call" and insn.text[2].text == "$+5" and insn.length == 5:
yield Characteristic("call $+5"), ih.address
def extract_insn_peb_access_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction peb access
fs:[0x30] on x86, gs:[0x60] on x64
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
results = []
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILOperation, index: int) -> bool:
if il.operation != LowLevelILOperation.LLIL_LOAD:
return True
src = il.src
if src.operation != LowLevelILOperation.LLIL_ADD:
return True
left = src.left
right = src.right
if left.operation != LowLevelILOperation.LLIL_REG:
return True
reg = left.src.name
if right.operation != LowLevelILOperation.LLIL_CONST:
return True
value = right.value.value
if not (reg, value) in (("fsbase", 0x30), ("gsbase", 0x60)):
return True
results.append((Characteristic("peb access"), ih.address))
return False
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for result in results:
yield result
def extract_insn_segment_access_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction fs or gs access"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
results = []
def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool:
if il.operation == LowLevelILOperation.LLIL_REG:
reg = il.src.name
if reg == "fsbase":
results.append((Characteristic("fs access"), ih.address))
return False
elif reg == "gsbase":
results.append((Characteristic("gs access"), ih.address))
return False
return False
return True
for llil in func.get_llils_at(ih.address):
visit_llil_exprs(llil, llil_checker)
for result in results:
yield result
def extract_insn_cross_section_cflow(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""inspect the instruction for a CALL or JMP that crosses section boundaries"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
if bv is None:
return
seg1 = bv.get_segment_at(ih.address)
sections1 = bv.get_sections_at(ih.address)
for ref in bv.get_code_refs_from(ih.address):
if len(bv.get_functions_at(ref)) == 0:
continue
seg2 = bv.get_segment_at(ref)
sections2 = bv.get_sections_at(ref)
if seg1 != seg2 or sections1 != sections2:
yield Characteristic("cross section flow"), ih.address
def extract_function_calls_from(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract functions calls from features
most relevant at the function scope, however, its most efficient to extract at the instruction scope
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
bv: BinaryView = func.view
if bv is None:
return
for il in func.get_llils_at(ih.address):
if il.operation not in [
LowLevelILOperation.LLIL_CALL,
LowLevelILOperation.LLIL_CALL_STACK_ADJUST,
LowLevelILOperation.LLIL_TAILCALL,
]:
continue
dest = il.dest
if dest.operation == LowLevelILOperation.LLIL_CONST_PTR:
value = dest.value.value
yield Characteristic("calls from"), AbsoluteVirtualAddress(value)
elif dest.operation == LowLevelILOperation.LLIL_CONST:
yield Characteristic("calls from"), AbsoluteVirtualAddress(dest.value)
elif dest.operation == LowLevelILOperation.LLIL_LOAD:
indirect_src = dest.src
if indirect_src.operation == LowLevelILOperation.LLIL_CONST_PTR:
value = indirect_src.value.value
yield Characteristic("calls from"), AbsoluteVirtualAddress(value)
elif indirect_src.operation == LowLevelILOperation.LLIL_CONST:
yield Characteristic("calls from"), AbsoluteVirtualAddress(indirect_src.value)
elif dest.operation == LowLevelILOperation.LLIL_REG:
if dest.value.type in [
RegisterValueType.ImportedAddressValue,
RegisterValueType.ConstantValue,
RegisterValueType.ConstantPointerValue,
]:
yield Characteristic("calls from"), AbsoluteVirtualAddress(dest.value.value)
def extract_function_indirect_call_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
most relevant at the function or basic block scope;
however, its most efficient to extract at the instruction scope
"""
insn: DisassemblyInstruction = ih.inner
func: Function = fh.inner
llil = func.get_llil_at(ih.address)
if llil is None or llil.operation not in [
LowLevelILOperation.LLIL_CALL,
LowLevelILOperation.LLIL_CALL_STACK_ADJUST,
LowLevelILOperation.LLIL_TAILCALL,
]:
return
if llil.dest.operation in [LowLevelILOperation.LLIL_CONST, LowLevelILOperation.LLIL_CONST_PTR]:
return
if llil.dest.operation == LowLevelILOperation.LLIL_LOAD:
src = llil.dest.src
if src.operation in [LowLevelILOperation.LLIL_CONST, LowLevelILOperation.LLIL_CONST_PTR]:
return
yield Characteristic("indirect call"), ih.address
def extract_features(f: FunctionHandle, bbh: BBHandle, insn: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract instruction features"""
for inst_handler in INSTRUCTION_HANDLERS:
for feature, ea in inst_handler(f, bbh, insn):
yield feature, ea
INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_bytes_features,
extract_insn_string_features,
extract_insn_offset_features,
extract_insn_nzxor_characteristic_features,
extract_insn_mnemonic_features,
extract_insn_obfs_call_plus_5_characteristic_features,
extract_insn_peb_access_characteristic_features,
extract_insn_cross_section_cflow,
extract_insn_segment_access_features,
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
)
def main():
""" """
if len(sys.argv) < 2:
return
from binaryninja import BinaryViewType
from capa.features.extractors.binja.extractor import BinjaFeatureExtractor
bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1])
if bv is None:
return
features = []
extractor = BinjaFeatureExtractor(bv)
for fh in extractor.get_functions():
for bbh in extractor.get_basic_blocks(fh):
for insn in extractor.get_instructions(fh, bbh):
features.extend(list(extract_features(fh, bbh, insn)))
import pprint
pprint.pprint(features)
if __name__ == "__main__":
main()

View File

@@ -13,6 +13,7 @@ import capa.features.extractors.strings
from capa.features.common import (
OS,
OS_ANY,
OS_AUTO,
ARCH_ANY,
FORMAT_PE,
FORMAT_ELF,
@@ -96,7 +97,10 @@ def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]:
return
def extract_os(buf) -> Iterator[Tuple[Feature, Address]]:
def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]:
if os != OS_AUTO:
yield OS(os), NO_ADDRESS
if buf.startswith(MATCH_PE):
yield OS(OS_WINDOWS), NO_ADDRESS
elif buf.startswith(MATCH_RESULT):
@@ -117,8 +121,6 @@ def extract_os(buf) -> Iterator[Tuple[Feature, Address]]:
# 2. handling a new file format (e.g. macho)
#
# for (1) we can't do much - its shellcode and all bets are off.
# we could maybe accept a further CLI argument to specify the OS,
# but i think this would be rarely used.
# rules that rely on OS conditions will fail to match on shellcode.
#
# for (2), this logic will need to be updated as the format is implemented.

View File

@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
class VivisectFeatureExtractor(FeatureExtractor):
def __init__(self, vw, path):
def __init__(self, vw, path, os):
super().__init__()
self.vw = vw
self.path = path
@@ -35,7 +35,7 @@ class VivisectFeatureExtractor(FeatureExtractor):
# pre-compute these because we'll yield them at *every* scope.
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(self.buf))
self.global_features.extend(capa.features.extractors.common.extract_os(self.buf))
self.global_features.extend(capa.features.extractors.common.extract_os(self.buf, os))
self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw))
def get_base_address(self):

View File

@@ -268,7 +268,8 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
basic_block=bbaddr,
address=Address.from_capa(addr),
feature=feature_from_capa(feature),
)
) # type: ignore
# Mypy is unable to recognise `basic_block` as a argument due to alias
for feature, addr in extractor.extract_basic_block_features(f, bb)
]
@@ -287,38 +288,41 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
instructions.append(
InstructionFeatures(
address=iaddr,
features=ifeatures,
features=tuple(ifeatures),
)
)
basic_blocks.append(
BasicBlockFeatures(
address=bbaddr,
features=bbfeatures,
instructions=instructions,
features=tuple(bbfeatures),
instructions=tuple(instructions),
)
)
function_features.append(
FunctionFeatures(
address=faddr,
features=ffeatures,
features=tuple(ffeatures),
basic_blocks=basic_blocks,
)
) # type: ignore
# Mypy is unable to recognise `basic_blocks` as a argument due to alias
)
features = Features(
global_=global_features,
file=file_features,
functions=function_features,
)
file=tuple(file_features),
functions=tuple(function_features),
) # type: ignore
# Mypy is unable to recognise `global_` as a argument due to alias
freeze = Freeze(
version=2,
base_address=Address.from_capa(extractor.get_base_address()),
extractor=Extractor(name=extractor.__class__.__name__),
features=features,
)
) # type: ignore
# Mypy is unable to recognise `base_address` as a argument due to alias
return freeze.json()
@@ -385,14 +389,14 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="save capa features to a file")
capa.main.install_common_args(parser, {"sample", "format", "backend", "signatures"})
capa.main.install_common_args(parser, {"sample", "format", "backend", "os", "signatures"})
parser.add_argument("output", type=str, help="Path to output file")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
sigpaths = capa.main.get_signatures(args.signatures)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths, False)
extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False)
with open(args.output, "wb") as f:
f.write(dump(extractor))

View File

@@ -101,59 +101,79 @@ class FeatureModel(BaseModel):
def feature_from_capa(f: capa.features.common.Feature) -> "Feature":
if isinstance(f, capa.features.common.OS):
assert isinstance(f.value, str)
return OSFeature(os=f.value, description=f.description)
elif isinstance(f, capa.features.common.Arch):
assert isinstance(f.value, str)
return ArchFeature(arch=f.value, description=f.description)
elif isinstance(f, capa.features.common.Format):
assert isinstance(f.value, str)
return FormatFeature(format=f.value, description=f.description)
elif isinstance(f, capa.features.common.MatchedRule):
assert isinstance(f.value, str)
return MatchFeature(match=f.value, description=f.description)
elif isinstance(f, capa.features.common.Characteristic):
assert isinstance(f.value, str)
return CharacteristicFeature(characteristic=f.value, description=f.description)
elif isinstance(f, capa.features.file.Export):
assert isinstance(f.value, str)
return ExportFeature(export=f.value, description=f.description)
elif isinstance(f, capa.features.file.Import):
return ImportFeature(import_=f.value, description=f.description)
assert isinstance(f.value, str)
return ImportFeature(import_=f.value, description=f.description) # type: ignore
# Mypy is unable to recognise `import_` as a argument due to alias
elif isinstance(f, capa.features.file.Section):
assert isinstance(f.value, str)
return SectionFeature(section=f.value, description=f.description)
elif isinstance(f, capa.features.file.FunctionName):
return FunctionNameFeature(function_name=f.value, description=f.description)
assert isinstance(f.value, str)
return FunctionNameFeature(function_name=f.value, description=f.description) # type: ignore
# Mypy is unable to recognise `function_name` as a argument due to alias
# must come before check for String due to inheritance
elif isinstance(f, capa.features.common.Substring):
assert isinstance(f.value, str)
return SubstringFeature(substring=f.value, description=f.description)
# must come before check for String due to inheritance
elif isinstance(f, capa.features.common.Regex):
assert isinstance(f.value, str)
return RegexFeature(regex=f.value, description=f.description)
elif isinstance(f, capa.features.common.String):
assert isinstance(f.value, str)
return StringFeature(string=f.value, description=f.description)
elif isinstance(f, capa.features.common.Class):
return ClassFeature(class_=f.value, description=f.description)
assert isinstance(f.value, str)
return ClassFeature(class_=f.value, description=f.description) # type: ignore
# Mypy is unable to recognise `class_` as a argument due to alias
elif isinstance(f, capa.features.common.Namespace):
assert isinstance(f.value, str)
return NamespaceFeature(namespace=f.value, description=f.description)
elif isinstance(f, capa.features.basicblock.BasicBlock):
return BasicBlockFeature(description=f.description)
elif isinstance(f, capa.features.insn.API):
assert isinstance(f.value, str)
return APIFeature(api=f.value, description=f.description)
elif isinstance(f, capa.features.insn.Property):
assert isinstance(f.value, str)
return PropertyFeature(property=f.value, access=f.access, description=f.description)
elif isinstance(f, capa.features.insn.Number):
assert isinstance(f.value, (int, float))
return NumberFeature(number=f.value, description=f.description)
elif isinstance(f, capa.features.common.Bytes):
@@ -162,16 +182,22 @@ def feature_from_capa(f: capa.features.common.Feature) -> "Feature":
return BytesFeature(bytes=binascii.hexlify(buf).decode("ascii"), description=f.description)
elif isinstance(f, capa.features.insn.Offset):
assert isinstance(f.value, int)
return OffsetFeature(offset=f.value, description=f.description)
elif isinstance(f, capa.features.insn.Mnemonic):
assert isinstance(f.value, str)
return MnemonicFeature(mnemonic=f.value, description=f.description)
elif isinstance(f, capa.features.insn.OperandNumber):
return OperandNumberFeature(index=f.index, operand_number=f.value, description=f.description)
assert isinstance(f.value, int)
return OperandNumberFeature(index=f.index, operand_number=f.value, description=f.description) # type: ignore
# Mypy is unable to recognise `operand_number` as a argument due to alias
elif isinstance(f, capa.features.insn.OperandOffset):
return OperandOffsetFeature(index=f.index, operand_offset=f.value, description=f.description)
assert isinstance(f.value, int)
return OperandOffsetFeature(index=f.index, operand_offset=f.value, description=f.description) # type: ignore
# Mypy is unable to recognise `operand_offset` as a argument due to alias
else:
raise NotImplementedError(f"feature_from_capa({type(f)}) not implemented")

View File

@@ -53,6 +53,15 @@ class Property(_AccessFeature):
class Number(Feature):
def __init__(self, value: Union[int, float], description=None):
"""
args:
value (int or float): positive or negative integer, or floating point number.
the range of the value is:
- if positive, the range of u64
- if negative, the range of i64
- if floating, the range and precision of double
"""
super().__init__(value, description=description)
def get_value_str(self):
@@ -61,7 +70,7 @@ class Number(Feature):
elif isinstance(self.value, float):
return str(self.value)
else:
raise ValueError("invalid value type")
raise ValueError(f"invalid value type {type(self.value)}")
# max recognized structure size (and therefore, offset size)
@@ -70,6 +79,14 @@ MAX_STRUCTURE_SIZE = 0x10000
class Offset(Feature):
def __init__(self, value: int, description=None):
"""
args:
value (int): the offset, which can be positive or negative.
the range of the value is:
- if positive, the range of u64
- if negative, the range of i64
"""
super().__init__(value, description=description)
def get_value_str(self):
@@ -92,7 +109,7 @@ MAX_OPERAND_INDEX = MAX_OPERAND_COUNT - 1
class _Operand(Feature, abc.ABC):
# superclass: don't use directly
# subclasses should set self.name and provide the value string formatter
def __init__(self, index: int, value: int, description=None):
def __init__(self, index: int, value: Union[int, float], description=None):
super().__init__(value, description=description)
self.index = index
@@ -108,13 +125,26 @@ class OperandNumber(_Operand):
NAMES = [f"operand[{i}].number" for i in range(MAX_OPERAND_COUNT)]
# operand[i].number: 0x12
def __init__(self, index: int, value: int, description=None):
def __init__(self, index: int, value: Union[int, float], description=None):
"""
args:
value (int or float): positive or negative integer, or floating point number.
the range of the value is:
- if positive, the range of u64
- if negative, the range of i64
- if floating, the range and precision of double
"""
super().__init__(index, value, description=description)
self.name = self.NAMES[index]
def get_value_str(self) -> str:
assert isinstance(self.value, int)
return hex(self.value)
if isinstance(self.value, int):
return capa.helpers.hex(self.value)
elif isinstance(self.value, float):
return str(self.value)
else:
raise ValueError("invalid value type")
class OperandOffset(_Operand):
@@ -123,6 +153,14 @@ class OperandOffset(_Operand):
# operand[i].offset: 0x12
def __init__(self, index: int, value: int, description=None):
"""
args:
value (int): the offset, which can be positive or negative.
the range of the value is:
- if positive, the range of u64
- if negative, the range of i64
"""
super().__init__(index, value, description=description)
self.name = self.NAMES[index]

View File

@@ -44,7 +44,7 @@ def is_runtime_ida():
return True
def assert_never(value: NoReturn) -> NoReturn:
def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})"

View File

@@ -67,7 +67,16 @@ class CapaExplorerPlugin(idaapi.plugin_t):
arg (int): bitflag. Setting LSB enables automatic analysis upon
loading. The other bits are currently undefined. See `form.Options`.
"""
self.form = CapaExplorerForm(self.PLUGIN_NAME, arg)
if not self.form:
self.form = CapaExplorerForm(self.PLUGIN_NAME, arg)
else:
widget = idaapi.find_widget(self.form.form_title)
if widget:
idaapi.activate_widget(widget, True)
else:
self.form.Show()
self.form.load_capa_results(False, True)
return True

View File

@@ -58,8 +58,12 @@ from capa.helpers import (
)
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError
from capa.features.common import (
OS_AUTO,
OS_LINUX,
OS_MACOS,
FORMAT_PE,
FORMAT_ELF,
OS_WINDOWS,
FORMAT_AUTO,
FORMAT_SC32,
FORMAT_SC64,
@@ -74,6 +78,7 @@ RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
BACKEND_VIV = "vivisect"
BACKEND_DOTNET = "dotnet"
BACKEND_BINJA = "binja"
E_MISSING_RULES = 10
E_MISSING_FILE = 11
@@ -491,7 +496,13 @@ def get_workspace(path, format_, sigpaths):
# TODO get_extractors -> List[FeatureExtractor]?
def get_extractor(
path: str, format_: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
path: str,
format_: str,
os_: str,
backend: str,
sigpaths: List[str],
should_save_workspace=False,
disable_progress=False,
) -> FeatureExtractor:
"""
raises:
@@ -506,7 +517,7 @@ def get_extractor(
if not is_supported_arch(path):
raise UnsupportedArchError()
if not is_supported_os(path):
if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()
if format_ == FORMAT_DOTNET:
@@ -514,6 +525,33 @@ def get_extractor(
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
elif backend == BACKEND_BINJA:
from capa.features.extractors.binja.find_binja_api import find_binja_path
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if is_running_standalone():
bn_api = find_binja_path()
if os.path.exists(bn_api):
sys.path.append(bn_api)
try:
from binaryninja import BinaryView, BinaryViewType
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
"https://docs.binary.ninja/dev/batch.html#install-the-api)."
)
import capa.features.extractors.binja.extractor
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = BinaryViewType.get_view_of_file(path)
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}")
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
# default to use vivisect backend
else:
import capa.features.extractors.viv.extractor
@@ -531,7 +569,7 @@ def get_extractor(
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
@@ -690,6 +728,8 @@ def get_signatures(sigs_path):
def collect_metadata(
argv: List[str],
sample_path: str,
format_: str,
os_: str,
rules_path: List[str],
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
):
@@ -707,9 +747,9 @@ def collect_metadata(
if rules_path != [RULES_PATH_DEFAULT_STRING]:
rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]
format_ = get_format(sample_path)
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
arch = get_arch(sample_path)
os_ = get_os(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
return {
"timestamp": datetime.datetime.now().isoformat(),
@@ -791,6 +831,7 @@ def install_common_args(parser, wanted=None):
wanted (Set[str]): collection of arguments to opt-into, including:
- "sample": required positional argument to input file.
- "format": flag to override file format.
- "os": flag to override file operating system.
- "backend": flag to override analysis backend.
- "rules": flag to override path to capa rules.
- "tag": flag to override/specify which rules to match.
@@ -824,6 +865,7 @@ def install_common_args(parser, wanted=None):
#
# - sample
# - format
# - os
# - rules
# - tag
#
@@ -860,10 +902,25 @@ def install_common_args(parser, wanted=None):
"--backend",
type=str,
help="select the backend to use",
choices=(BACKEND_VIV,),
choices=(BACKEND_VIV, BACKEND_BINJA),
default=BACKEND_VIV,
)
if "os" in wanted:
oses = [
(OS_AUTO, "detect OS automatically - default"),
(OS_LINUX,),
(OS_MACOS,),
(OS_WINDOWS,),
]
os_help = ", ".join([f"{o[0]} ({o[1]})" if len(o) == 2 else o[0] for o in oses])
parser.add_argument(
"--os",
choices=[o[0] for o in oses],
default=OS_AUTO,
help=f"select sample OS: {os_help}",
)
if "rules" in wanted:
parser.add_argument(
"-r",
@@ -1027,7 +1084,7 @@ def main(argv=None):
parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
)
install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
install_common_args(parser, {"sample", "format", "backend", "os", "signatures", "rules", "tag"})
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
ret = handle_common_args(args)
@@ -1145,7 +1202,13 @@ def main(argv=None):
try:
extractor = get_extractor(
args.sample, format_, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
args.sample,
format_,
args.os,
args.backend,
sig_paths,
should_save_workspace,
disable_progress=args.quiet,
)
except UnsupportedFormatError:
log_unsupported_format_error()
@@ -1158,7 +1221,7 @@ def main(argv=None):
return E_INVALID_FILE_OS
if format_ != FORMAT_RESULT:
meta = collect_metadata(argv, args.sample, args.rules, extractor)
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta["analysis"].update(counts)

View File

@@ -0,0 +1,727 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Convert capa results to protobuf format.
The functionality here is similar to the various *from_capa functions, e.g. ResultDocument.from_capa() or
feature_from_capa.
For few classes we can rely on the proto json parser (e.g. RuleMetadata).
For most classes (e.g. RuleMatches) conversion is tricky, because we use natively unsupported types (e.g. tuples),
several classes with unions, and more complex layouts. So, it's more straight forward to convert explicitly vs.
massaging the data so the protobuf json parser works.
Of note, the 3 in `syntax = "proto3"` has nothing to do with the 2 in capa_pb2.py;
see details in https://github.com/grpc/grpc/issues/15444#issuecomment-396442980.
First compile the protobuf to generate an API file and a mypy stub file
$ protoc.exe --python_out=. --mypy_out=. <path_to_proto> (e.g. capa/render/proto/capa.proto)
Alternatively, --pyi_out=. can be used to generate a Python Interface file that supports development
"""
import sys
import json
import argparse
import datetime
from typing import Any, Dict, Union
import google.protobuf.json_format
from google.protobuf.json_format import MessageToJson
import capa.rules
import capa.features.freeze as frz
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.helpers import assert_never
from capa.features.freeze import AddressType
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = dict()
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError(f"value underflow: {v}")
if v > 0xFFFFFFFFFFFFFFFF:
raise ValueError(f"value overflow: {v}")
if v < 0:
return capa_pb2.Integer(i=v)
else:
return capa_pb2.Integer(u=v)
def number_to_pb2(v: Union[int, float]) -> capa_pb2.Number:
if isinstance(v, float):
return capa_pb2.Number(f=v)
elif isinstance(v, int):
i = int_to_pb2(v)
if v < 0:
return capa_pb2.Number(i=i.i)
else:
return capa_pb2.Number(u=i.u)
else:
assert_never(v)
def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
if addr.type is AddressType.ABSOLUTE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.RELATIVE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_RELATIVE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.FILE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_FILE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN_OFFSET:
assert isinstance(addr.value, tuple)
token, offset = addr.value
assert isinstance(token, int)
assert isinstance(offset, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET,
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
else:
assert_never(addr)
def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
if scope == capa.rules.Scope.FILE:
return capa_pb2.Scope.SCOPE_FILE
elif scope == capa.rules.Scope.FUNCTION:
return capa_pb2.Scope.SCOPE_FUNCTION
elif scope == capa.rules.Scope.BASIC_BLOCK:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
else:
assert_never(scope)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.dict(), capa_pb2.Sample()),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=meta.analysis.rules,
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
),
)
def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
return capa_pb2.StatementNode(
range=capa_pb2.RangeStatement(
type="range",
description=statement.description,
min=statement.min,
max=statement.max,
child=feature_to_pb2(statement.child),
),
type="statement",
)
elif isinstance(statement, rd.SomeStatement):
return capa_pb2.StatementNode(
some=capa_pb2.SomeStatement(type=statement.type, description=statement.description, count=statement.count),
type="statement",
)
elif isinstance(statement, rd.SubscopeStatement):
return capa_pb2.StatementNode(
subscope=capa_pb2.SubscopeStatement(
type=statement.type,
description=statement.description,
scope=scope_to_pb2(statement.scope),
),
type="statement",
)
elif isinstance(statement, rd.CompoundStatement):
return capa_pb2.StatementNode(
compound=capa_pb2.CompoundStatement(type=statement.type, description=statement.description),
type="statement",
)
else:
assert_never(statement)
def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode:
if isinstance(f, frzf.OSFeature):
return capa_pb2.FeatureNode(
type="feature", os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description)
)
elif isinstance(f, frzf.ArchFeature):
return capa_pb2.FeatureNode(
type="feature", arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description)
)
elif isinstance(f, frzf.FormatFeature):
return capa_pb2.FeatureNode(
type="feature", format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description)
)
elif isinstance(f, frzf.MatchFeature):
return capa_pb2.FeatureNode(
type="feature",
match=capa_pb2.MatchFeature(
type=f.type,
match=f.match,
description=f.description,
),
)
elif isinstance(f, frzf.CharacteristicFeature):
return capa_pb2.FeatureNode(
type="feature",
characteristic=capa_pb2.CharacteristicFeature(
type=f.type, characteristic=f.characteristic, description=f.description
),
)
elif isinstance(f, frzf.ExportFeature):
return capa_pb2.FeatureNode(
type="feature", export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description)
)
elif isinstance(f, frzf.ImportFeature):
return capa_pb2.FeatureNode(
type="feature", import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description)
)
elif isinstance(f, frzf.SectionFeature):
return capa_pb2.FeatureNode(
type="feature", section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description)
)
elif isinstance(f, frzf.FunctionNameFeature):
return capa_pb2.FeatureNode(
type="function name",
function_name=capa_pb2.FunctionNameFeature(
type=f.type, function_name=f.function_name, description=f.description
),
)
elif isinstance(f, frzf.SubstringFeature):
return capa_pb2.FeatureNode(
type="feature",
substring=capa_pb2.SubstringFeature(type=f.type, substring=f.substring, description=f.description),
)
elif isinstance(f, frzf.RegexFeature):
return capa_pb2.FeatureNode(
type="feature", regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description)
)
elif isinstance(f, frzf.StringFeature):
return capa_pb2.FeatureNode(
type="feature",
string=capa_pb2.StringFeature(
type=f.type,
string=f.string,
description=f.description,
),
)
elif isinstance(f, frzf.ClassFeature):
return capa_pb2.FeatureNode(
type="feature", class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description)
)
elif isinstance(f, frzf.NamespaceFeature):
return capa_pb2.FeatureNode(
type="feature",
namespace=capa_pb2.NamespaceFeature(type=f.type, namespace=f.namespace, description=f.description),
)
elif isinstance(f, frzf.APIFeature):
return capa_pb2.FeatureNode(
type="feature", api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description)
)
elif isinstance(f, frzf.PropertyFeature):
return capa_pb2.FeatureNode(
type="feature",
property_=capa_pb2.PropertyFeature(
type=f.type, access=f.access, property_=f.property, description=f.description
),
)
elif isinstance(f, frzf.NumberFeature):
return capa_pb2.FeatureNode(
type="feature",
number=capa_pb2.NumberFeature(type=f.type, number=number_to_pb2(f.number), description=f.description),
)
elif isinstance(f, frzf.BytesFeature):
return capa_pb2.FeatureNode(
type="feature", bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description)
)
elif isinstance(f, frzf.OffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
offset=capa_pb2.OffsetFeature(type=f.type, offset=int_to_pb2(f.offset), description=f.description),
)
elif isinstance(f, frzf.MnemonicFeature):
return capa_pb2.FeatureNode(
type="feature",
mnemonic=capa_pb2.MnemonicFeature(type=f.type, mnemonic=f.mnemonic, description=f.description),
)
elif isinstance(f, frzf.OperandNumberFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_number=capa_pb2.OperandNumberFeature(
type=f.type, index=f.index, operand_number=int_to_pb2(f.operand_number), description=f.description
),
)
elif isinstance(f, frzf.OperandOffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_offset=capa_pb2.OperandOffsetFeature(
type=f.type, index=f.index, operand_offset=int_to_pb2(f.operand_offset), description=f.description
),
)
elif isinstance(f, frzf.BasicBlockFeature):
return capa_pb2.FeatureNode(
type="feature", basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description)
)
else:
assert_never(f)
def node_to_pb2(node: rd.Node) -> Union[capa_pb2.FeatureNode, capa_pb2.StatementNode]:
if isinstance(node, rd.StatementNode):
return statement_to_pb2(node.statement)
elif isinstance(node, rd.FeatureNode):
return feature_to_pb2(node.feature)
else:
assert_never(node)
def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
node = node_to_pb2(match.node)
children = list(map(match_to_pb2, match.children))
locations = list(map(addr_to_pb2, match.locations))
if isinstance(node, capa_pb2.StatementNode):
return capa_pb2.Match(
success=match.success,
statement=node,
children=children,
locations=locations,
captures={},
)
elif isinstance(node, capa_pb2.FeatureNode):
return capa_pb2.Match(
success=match.success,
feature=node,
children=children,
locations=locations,
captures={
capture: capa_pb2.Addresses(address=list(map(addr_to_pb2, locs)))
for capture, locs in match.captures.items()
},
)
else:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.dict())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
rule_matches: Dict[str, capa_pb2.RuleMatches] = {}
for rule_name, matches in doc.rules.items():
m = capa_pb2.RuleMatches(
meta=rule_metadata_to_pb2(matches.meta),
source=matches.source,
matches=[
capa_pb2.Pair_Address_Match(address=addr_to_pb2(addr), match=match_to_pb2(match))
for addr, match in matches.matches
],
)
rule_matches[rule_name] = m
r = capa_pb2.ResultDocument(meta=metadata_to_pb2(doc.meta), rules=rule_matches)
return r
def int_from_pb2(v: capa_pb2.Integer) -> int:
type = v.WhichOneof("value")
if type == "u":
return v.u
elif type == "i":
return v.i
else:
assert_never(type)
def number_from_pb2(v: capa_pb2.Number) -> Union[int, float]:
type = v.WhichOneof("value")
if type == "u":
return v.u
elif type == "i":
return v.i
elif type == "f":
return v.f
else:
assert_never(type)
def addr_from_pb2(addr: capa_pb2.Address) -> frz.Address:
if addr.type == capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE:
return frz.Address(type=frz.AddressType.ABSOLUTE, value=int_from_pb2(addr.v))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_RELATIVE:
return frz.Address(type=frz.AddressType.RELATIVE, value=int_from_pb2(addr.v))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_FILE:
return frz.Address(type=frz.AddressType.FILE, value=int_from_pb2(addr.v))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN:
return frz.Address(type=frz.AddressType.DN_TOKEN, value=int_from_pb2(addr.v))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET:
token = int_from_pb2(addr.token_offset.token)
offset = addr.token_offset.offset
return frz.Address(type=frz.AddressType.DN_TOKEN_OFFSET, value=(token, offset))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS:
return frz.Address(type=frz.AddressType.NO_ADDRESS, value=None)
else:
assert_never(addr)
def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope:
if scope == capa_pb2.Scope.SCOPE_FILE:
return capa.rules.Scope.FILE
elif scope == capa_pb2.Scope.SCOPE_FUNCTION:
return capa.rules.Scope.FUNCTION
elif scope == capa_pb2.Scope.SCOPE_BASIC_BLOCK:
return capa.rules.Scope.BASIC_BLOCK
elif scope == capa_pb2.Scope.SCOPE_INSTRUCTION:
return capa.rules.Scope.INSTRUCTION
else:
assert_never(scope)
def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
analysis=rd.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=tuple(meta.analysis.rules),
base_address=addr_from_pb2(meta.analysis.base_address),
layout=rd.Layout(
functions=[
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=[
rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=rd.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
),
)
def statement_from_pb2(statement: capa_pb2.StatementNode) -> rd.Statement:
type_ = statement.WhichOneof("statement")
if type_ == "range":
return rd.RangeStatement(
min=statement.range.min,
max=statement.range.max,
child=feature_from_pb2(statement.range.child),
description=statement.range.description or None,
)
elif type_ == "some":
return rd.SomeStatement(
count=statement.some.count,
description=statement.some.description or None,
)
elif type_ == "subscope":
return rd.SubscopeStatement(
scope=scope_from_pb2(statement.subscope.scope),
description=statement.subscope.description or None,
)
elif type_ == "compound":
return rd.CompoundStatement(
type=statement.compound.type,
description=statement.compound.description or None,
)
else:
assert_never(type_)
def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature:
type_ = f.WhichOneof("feature")
# mypy gets angry below because ff may have a different type in each branch,
# even though we don't use ff outside each branch.
# so we just let mypy know that ff might be any type to silence that warning.
# upstream issue: https://github.com/python/mypy/issues/6233
ff: Any
if type_ == "os":
ff = f.os
return frzf.OSFeature(os=ff.os, description=ff.description or None)
elif type_ == "arch":
ff = f.arch
return frzf.ArchFeature(arch=ff.arch, description=ff.description or None)
elif type_ == "format":
ff = f.format
return frzf.FormatFeature(format=ff.format, description=ff.description or None)
elif type_ == "match":
ff = f.match
return frzf.MatchFeature(match=ff.match, description=ff.description or None)
elif type_ == "characteristic":
ff = f.characteristic
return frzf.CharacteristicFeature(characteristic=ff.characteristic, description=ff.description or None)
elif type_ == "export":
ff = f.export
return frzf.ExportFeature(export=ff.export, description=ff.description or None)
elif type_ == "import_":
ff = f.import_
return frzf.ImportFeature(import_=ff.import_, description=ff.description or None)
elif type_ == "section":
ff = f.section
return frzf.SectionFeature(section=ff.section, description=ff.description or None)
elif type_ == "function_name":
ff = f.function_name
return frzf.FunctionNameFeature(function_name=ff.function_name, description=ff.description or None)
elif type_ == "substring":
ff = f.substring
return frzf.SubstringFeature(substring=ff.substring, description=ff.description or None)
elif type_ == "regex":
ff = f.regex
return frzf.RegexFeature(regex=ff.regex, description=ff.description or None)
elif type_ == "string":
ff = f.string
return frzf.StringFeature(string=ff.string, description=ff.description or None)
elif type_ == "class_":
ff = f.class_
return frzf.ClassFeature(class_=ff.class_, description=ff.description or None)
elif type_ == "namespace":
ff = f.namespace
return frzf.NamespaceFeature(namespace=ff.namespace, description=ff.description or None)
elif type_ == "api":
ff = f.api
return frzf.APIFeature(api=ff.api, description=ff.description or None)
elif type_ == "property_":
ff = f.property_
return frzf.PropertyFeature(property=ff.property_, access=ff.access or None, description=ff.description or None)
elif type_ == "number":
ff = f.number
return frzf.NumberFeature(number=number_from_pb2(ff.number), description=ff.description or None)
elif type_ == "bytes":
ff = f.bytes
return frzf.BytesFeature(bytes=ff.bytes, description=ff.description or None)
elif type_ == "offset":
ff = f.offset
return frzf.OffsetFeature(offset=int_from_pb2(ff.offset), description=ff.description or None)
elif type_ == "mnemonic":
ff = f.mnemonic
return frzf.MnemonicFeature(mnemonic=ff.mnemonic, description=ff.description or None)
elif type_ == "operand_number":
ff = f.operand_number
return frzf.OperandNumberFeature(
index=ff.index, operand_number=number_from_pb2(ff.operand_number), description=ff.description or None
)
elif type_ == "operand_offset":
ff = f.operand_offset
return frzf.OperandOffsetFeature(
index=ff.index, operand_offset=int_from_pb2(ff.operand_offset), description=ff.description or None
)
elif type_ == "basic_block":
ff = f.basic_block
return frzf.BasicBlockFeature(description=ff.description or None)
else:
assert_never(type_)
def match_from_pb2(match: capa_pb2.Match) -> rd.Match:
children = list(map(match_from_pb2, match.children))
locations = list(map(addr_from_pb2, match.locations))
node_type = match.WhichOneof("node")
if node_type == "statement":
return rd.Match(
success=match.success,
node=rd.StatementNode(statement=statement_from_pb2(match.statement)),
children=children,
locations=locations,
captures={},
)
elif node_type == "feature":
return rd.Match(
success=match.success,
node=rd.FeatureNode(feature=feature_from_pb2(match.feature)),
children=children,
locations=locations,
captures={capture: tuple(map(addr_from_pb2, locs.address)) for capture, locs in match.captures.items()},
)
else:
assert_never(node_type)
def attack_from_pb2(pb: capa_pb2.AttackSpec) -> rd.AttackSpec:
return rd.AttackSpec(
parts=tuple(pb.parts),
tactic=pb.tactic,
technique=pb.technique,
subtechnique=pb.subtechnique,
id=pb.id,
)
def mbc_from_pb2(pb: capa_pb2.MBCSpec) -> rd.MBCSpec:
return rd.MBCSpec(
parts=tuple(pb.parts),
objective=pb.objective,
behavior=pb.behavior,
method=pb.method,
id=pb.id,
)
def maec_from_pb2(pb: capa_pb2.MaecMetadata) -> rd.MaecMetadata:
return rd.MaecMetadata(
analysis_conclusion=pb.analysis_conclusion or None,
analysis_conclusion_ov=pb.analysis_conclusion_ov or None,
malware_family=pb.malware_family or None,
malware_category=pb.malware_category or None,
malware_category_ov=pb.malware_category_ov or None,
)
def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
return rd.RuleMetadata(
name=pb.name,
namespace=pb.namespace or None,
authors=tuple(pb.authors),
scope=scope_from_pb2(pb.scope),
attack=tuple([attack_from_pb2(attack) for attack in pb.attack]),
mbc=tuple([mbc_from_pb2(mbc) for mbc in pb.mbc]),
references=tuple(pb.references),
examples=tuple(pb.examples),
description=pb.description,
lib=pb.lib,
is_subscope_rule=pb.is_subscope_rule,
maec=maec_from_pb2(pb.maec),
)
def doc_from_pb2(doc: capa_pb2.ResultDocument) -> rd.ResultDocument:
rule_matches: Dict[str, rd.RuleMatches] = {}
for rule_name, matches in doc.rules.items():
m = rd.RuleMatches(
meta=rule_metadata_from_pb2(matches.meta),
source=matches.source,
matches=tuple([(addr_from_pb2(pair.address), match_from_pb2(pair.match)) for pair in matches.matches]),
)
rule_matches[rule_name] = m
return rd.ResultDocument(meta=metadata_from_pb2(doc.meta), rules=rule_matches)

View File

@@ -0,0 +1,364 @@
syntax = "proto3";
message APIFeature {
string type = 1;
string api = 2;
optional string description = 3;
}
message Address {
AddressType type = 1;
oneof value {
Integer v = 2;
Token_Offset token_offset = 3;
};
}
enum AddressType {
ADDRESSTYPE_UNSPECIFIED = 0;
ADDRESSTYPE_ABSOLUTE = 1;
ADDRESSTYPE_RELATIVE = 2;
ADDRESSTYPE_FILE = 3;
ADDRESSTYPE_DN_TOKEN = 4;
ADDRESSTYPE_DN_TOKEN_OFFSET = 5;
ADDRESSTYPE_NO_ADDRESS = 6;
}
message Analysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
Address base_address = 6;
Layout layout = 7;
FeatureCounts feature_counts = 8;
repeated LibraryFunction library_functions = 9;
}
message ArchFeature {
string type = 1;
string arch = 2;
optional string description = 3;
}
message AttackSpec {
repeated string parts = 1;
string tactic = 2;
string technique = 3;
string subtechnique = 4;
string id = 5;
}
message BasicBlockFeature {
string type = 1;
optional string description = 2;
}
message BasicBlockLayout {
Address address = 1;
}
message BytesFeature {
string type = 1;
string bytes = 2;
optional string description = 3;
}
message CharacteristicFeature {
string type = 1;
string characteristic = 2;
optional string description = 3;
}
message ClassFeature {
string type = 1;
string class_ = 2; // class is protected Python keyword
optional string description = 3;
}
message CompoundStatement {
string type = 1;
optional string description = 2;
}
message ExportFeature {
string type = 1;
string export = 2;
optional string description = 3;
}
message FeatureCounts {
uint64 file = 1;
repeated FunctionFeatureCount functions = 2;
}
message FeatureNode {
string type = 1;
oneof feature {
OSFeature os = 2;
ArchFeature arch = 3;
FormatFeature format = 4;
MatchFeature match = 5;
CharacteristicFeature characteristic = 6;
ExportFeature export = 7;
ImportFeature import_ = 8; // import is Python keyword
SectionFeature section = 9;
FunctionNameFeature function_name = 10;
SubstringFeature substring = 11;
RegexFeature regex = 12;
StringFeature string = 13;
ClassFeature class_ = 14;
NamespaceFeature namespace = 15;
APIFeature api = 16;
PropertyFeature property_ = 17; // property is a Python top-level decorator name
NumberFeature number = 18;
BytesFeature bytes = 19;
OffsetFeature offset = 20;
MnemonicFeature mnemonic = 21;
OperandNumberFeature operand_number = 22;
OperandOffsetFeature operand_offset = 23;
BasicBlockFeature basic_block = 24;
};
}
message FormatFeature {
string type = 1;
string format = 2;
optional string description = 3;
}
message FunctionFeatureCount {
Address address = 1;
uint64 count = 2;
}
message FunctionLayout {
Address address = 1;
repeated BasicBlockLayout matched_basic_blocks = 2;
}
message FunctionNameFeature {
string type = 1;
string function_name = 2;
optional string description = 3;
}
message ImportFeature {
string type = 1;
string import_ = 2;
optional string description = 3;
}
message Layout {
repeated FunctionLayout functions = 1;
}
message LibraryFunction {
Address address = 1;
string name = 2;
}
message MBCSpec {
repeated string parts = 1;
string objective = 2;
string behavior = 3;
string method = 4;
string id = 5;
}
message MaecMetadata {
string analysis_conclusion = 1;
string analysis_conclusion_ov = 2;
string malware_family = 3;
string malware_category = 4;
string malware_category_ov = 5;
}
message Match {
bool success = 1;
oneof node {
StatementNode statement = 2;
FeatureNode feature = 3;
};
repeated Match children = 5;
repeated Address locations = 6;
map <string, Addresses> captures = 7;
}
message MatchFeature {
string type = 1;
string match = 2;
optional string description = 3;
}
message Metadata {
string timestamp = 1; // iso8601 format, like: 2019-01-01T00:00:00Z
string version = 2;
repeated string argv = 3;
Sample sample = 4;
Analysis analysis = 5;
}
message MnemonicFeature {
string type = 1;
string mnemonic = 2;
optional string description = 3;
}
message NamespaceFeature {
string type = 1;
string namespace = 2;
optional string description = 3;
}
message NumberFeature {
string type = 1;
Number number = 2; // this can be positive (range: u64), negative (range: i64), or a double.
optional string description = 5;
}
message OSFeature {
string type = 1;
string os = 2;
optional string description = 3;
}
message OffsetFeature {
string type = 1;
Integer offset = 2; // offset can be negative
optional string description = 3;
}
message OperandNumberFeature {
string type = 1;
uint32 index = 2;
Integer operand_number = 3; // this can be positive (range: u64), negative (range: i64), or a double.
optional string description = 4;
}
message OperandOffsetFeature {
string type = 1;
uint32 index = 2;
Integer operand_offset = 3;
optional string description = 4;
}
message PropertyFeature {
string type = 1;
string property_ = 2; // property is a Python top-level decorator name
optional string access = 3;
optional string description = 4;
}
message RangeStatement {
string type = 1;
uint64 min = 2;
uint64 max = 3;
// reusing FeatureNode here to avoid duplication and list all features OSFeature, ArchFeature, ... again.
FeatureNode child = 4;
optional string description = 5;
}
message RegexFeature {
string type = 1;
string regex = 2;
optional string description = 3;
}
message ResultDocument {
Metadata meta = 1;
map <string, RuleMatches> rules = 2;
}
message RuleMatches {
RuleMetadata meta = 1;
string source = 2;
repeated Pair_Address_Match matches = 3;
}
message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
repeated string examples = 8;
string description = 9;
bool lib = 10;
MaecMetadata maec = 11;
bool is_subscope_rule = 12;
}
message Sample {
string md5 = 1;
string sha1 = 2;
string sha256 = 3;
string path = 4;
}
enum Scope {
SCOPE_UNSPECIFIED = 0;
SCOPE_FILE = 1;
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
SCOPE_INSTRUCTION = 4;
}
message SectionFeature {
string type = 1;
string section = 2;
optional string description = 3;
}
message SomeStatement {
string type = 1;
uint32 count = 2;
optional string description = 3;
}
message StatementNode {
string type = 1;
oneof statement {
RangeStatement range = 2;
SomeStatement some = 3;
SubscopeStatement subscope = 4;
CompoundStatement compound = 5;
};
}
message StringFeature {
string type = 1;
string string = 2;
optional string description = 3;
}
message SubscopeStatement {
string type = 1;
Scope scope = 2;
optional string description = 3;
}
message SubstringFeature {
string type = 1;
string substring = 2;
optional string description = 3;
}
message Addresses { repeated Address address = 1; }
message Pair_Address_Match {
Address address = 1;
Match match = 2;
}
message Token_Offset {
Integer token = 1;
uint64 offset = 2; // offset is always >= 0
}
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or signed int
message Number { oneof value { uint64 u = 1; sint64 i = 2; double f = 3; } }

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,7 @@ from capa.helpers import assert_never
class FrozenModel(BaseModel):
class Config:
frozen = True
extra = "forbid"
class Sample(FrozenModel):
@@ -261,7 +262,7 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N
assert_never(node)
class Match(BaseModel):
class Match(FrozenModel):
"""
args:
success: did the node match?
@@ -388,9 +389,9 @@ class Match(BaseModel):
return cls(
success=success,
node=node,
children=children,
locations=locations,
captures=captures,
children=tuple(children),
locations=tuple(locations),
captures={capture: tuple(captures[capture]) for capture in captures},
)
@@ -519,28 +520,30 @@ class RuleMetadata(FrozenModel):
namespace=rule.meta.get("namespace"),
authors=rule.meta.get("authors"),
scope=capa.rules.Scope(rule.meta.get("scope")),
attack=list(map(AttackSpec.from_str, rule.meta.get("att&ck", []))),
mbc=list(map(MBCSpec.from_str, rule.meta.get("mbc", []))),
attack=tuple(map(AttackSpec.from_str, rule.meta.get("att&ck", []))),
mbc=tuple(map(MBCSpec.from_str, rule.meta.get("mbc", []))),
references=rule.meta.get("references", []),
examples=rule.meta.get("examples", []),
description=rule.meta.get("description", ""),
lib=rule.meta.get("lib", False),
capa_subscope=rule.meta.get("capa/subscope", False),
is_subscope_rule=rule.meta.get("capa/subscope", False),
maec=MaecMetadata(
analysis_conclusion=rule.meta.get("maec/analysis-conclusion"),
analysis_conclusion_ov=rule.meta.get("maec/analysis-conclusion-ov"),
malware_family=rule.meta.get("maec/malware-family"),
malware_category=rule.meta.get("maec/malware-category"),
malware_category_ov=rule.meta.get("maec/malware-category-ov"),
),
)
), # type: ignore
# Mypy is unable to recognise arguments due to alias
) # type: ignore
# Mypy is unable to recognise arguments due to alias
class Config:
frozen = True
allow_population_by_field_name = True
class RuleMatches(BaseModel):
class RuleMatches(FrozenModel):
"""
args:
meta: the metadata from the rule
@@ -552,7 +555,7 @@ class RuleMatches(BaseModel):
matches: Tuple[Tuple[frz.Address, Match], ...]
class ResultDocument(BaseModel):
class ResultDocument(FrozenModel):
meta: Metadata
rules: Dict[str, RuleMatches]

View File

@@ -16,7 +16,7 @@ import capa.render.result_document as rd
def bold(s: str) -> str:
"""draw attention to the given string"""
return termcolor.colored(s, "blue")
return termcolor.colored(s, "cyan")
def bold2(s: str) -> str:

View File

@@ -126,6 +126,12 @@ Or install capa with build dependencies:
`$ pip install -e /local/path/to/src[build]`
#### Generate rule cache
Generate cache for all rules in the `rules` folder and save the output in the `cache` folder.
`$ python scripts/cache-ruleset.py rules/ cache/`
#### Run Pyinstaller
`$ pyinstaller .github/pyinstaller/pyinstaller.spec`

2
rules

Submodule rules updated: aa2dc1137d...d0e54bb05d

View File

@@ -69,6 +69,7 @@ import capa.main
import capa.rules
import capa.render.json
import capa.render.result_document as rd
from capa.features.common import OS_AUTO
logger = logging.getLogger("capa")
@@ -81,6 +82,7 @@ def get_capa_results(args):
rules (capa.rules.RuleSet): the rules to match
signatures (List[str]): list of file system paths to signature files
format (str): the name of the sample file format
os (str): the name of the operating system
path (str): the file system path to the sample to process
args is a tuple because i'm not quite sure how to unpack multiple arguments using `map`.
@@ -96,12 +98,12 @@ def get_capa_results(args):
meta (dict): the meta analysis results
capabilities (dict): the matched capabilities and their result objects
"""
rules, sigpaths, format, path = args
rules, sigpaths, format, os_, path = args
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
logger.info("computing capa results for: %s", path)
try:
extractor = capa.main.get_extractor(
path, format, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True
path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True
)
except capa.main.UnsupportedFormatError:
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
@@ -127,7 +129,7 @@ def get_capa_results(args):
"error": f"unexpected error: {e}",
}
meta = capa.main.collect_metadata([], path, [], extractor)
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
@@ -142,7 +144,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules", "signatures"})
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"})
parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze")
parser.add_argument(
"-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor"
@@ -195,7 +197,9 @@ def main(argv=None):
results = {}
for result in mapper(
get_capa_results, [(rules, sig_paths, "pe", sample) for sample in samples], parallelism=args.parallelism
get_capa_results,
[(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples],
parallelism=args.parallelism,
):
if result["status"] == "error":
logger.warning(result["error"])

View File

@@ -14,6 +14,7 @@ import capa.render.default
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.engine import *
from capa.features.common import OS_AUTO, FORMAT_AUTO
# == Render dictionary helpers
@@ -164,11 +165,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
rules = capa.main.get_rules([rules_path])
# extract features and find capabilities
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)
extractor = capa.main.get_extractor(
file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True
)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, rules_path, extractor)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)

View File

@@ -45,7 +45,7 @@ import capa.engine
import capa.helpers
import capa.features.insn
from capa.rules import Rule, RuleSet
from capa.features.common import FORMAT_PE, FORMAT_DOTNET, String, Feature, Substring
from capa.features.common import OS_AUTO, FORMAT_PE, FORMAT_DOTNET, String, Feature, Substring
from capa.render.result_document import RuleMetadata
logger = logging.getLogger("lint")
@@ -310,7 +310,9 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
format_ = capa.main.get_auto_format(nice_path)
logger.debug("analyzing sample: %s", nice_path)
extractor = capa.main.get_extractor(nice_path, format_, "", DEFAULT_SIGNATURES, False, disable_progress=True)
extractor = capa.main.get_extractor(
nice_path, format_, OS_AUTO, "", DEFAULT_SIGNATURES, False, disable_progress=True
)
capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True)
# mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())?

View File

@@ -71,7 +71,7 @@ def main(argv=None):
label += " (dirty)"
parser = argparse.ArgumentParser(description="Profile capa performance")
capa.main.install_common_args(parser, wanted={"format", "sample", "signatures", "rules"})
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "rules"})
parser.add_argument("--number", type=int, default=3, help="batch size of profile collection")
parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection")
@@ -99,12 +99,14 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
extractor = capa.main.get_extractor(
args.sample, args.format, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
)
with tqdm.tqdm(total=args.number * args.repeat) as pbar:

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python
"""
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
proto-from-results-json.py
Convert a JSON result document into the protobuf format.
Example:
$ capa --json foo.exe > foo.json
$ python proto-from-results.py foo.json | hexyl | head
┌────────┬─────────────────────────┬─────────────────────────┬────────┬────────┐
│00000000│ 0a d4 05 0a 1a 32 30 32 ┊ 33 2d 30 32 2d 31 30 20 │_.•_•202┊3-02-10 │
│00000010│ 31 31 3a 34 39 3a 35 32 ┊ 2e 36 39 33 34 30 30 12 │11:49:52┊.693400•│
│00000020│ 05 35 2e 30 2e 30 1a 34 ┊ 74 65 73 74 73 2f 64 61 │•5.0.0•4┊tests/da│
│00000030│ 74 61 2f 50 72 61 63 74 ┊ 69 63 61 6c 20 4d 61 6c │ta/Pract┊ical Mal│
│00000040│ 77 61 72 65 20 41 6e 61 ┊ 6c 79 73 69 73 20 4c 61 │ware Ana┊lysis La│
│00000050│ 62 20 30 31 2d 30 31 2e ┊ 64 6c 6c 5f 1a 02 2d 6a │b 01-01.┊dll_••-j│
│00000060│ 22 c4 01 0a 20 32 39 30 ┊ 39 33 34 63 36 31 64 65 │".•_ 290┊934c61de│
│00000070│ 39 31 37 36 61 64 36 38 ┊ 32 66 66 64 64 36 35 66 │9176ad68┊2ffdd65f│
│00000080│ 30 61 36 36 39 12 28 61 ┊ 34 62 33 35 64 65 37 31 │0a669•(a┊4b35de71│
"""
import sys
import logging
import argparse
import capa.render.proto
import capa.render.result_document
logger = logging.getLogger("capa.proto-from-results-json")
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Convert a capa JSON result document into the protobuf format")
parser.add_argument("json", type=str, help="path to JSON result document file, produced by `capa --json`")
logging_group = parser.add_argument_group("logging arguments")
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
logging_group.add_argument(
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
)
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
rd = capa.render.result_document.ResultDocument.parse_file(args.json)
pb = capa.render.proto.doc_to_pb2(rd)
sys.stdout.buffer.write(pb.SerializeToString(deterministic=True))
sys.stdout.flush()
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python
"""
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
proto-to-results-json.py
Convert a protobuf result document into the JSON format.
Example:
$ capa --json foo.exe > foo.json
$ python proto-from-results.py foo.json > foo.pb
$ python proto-to-results.py foo.pb | jq . | head
────┼────────────────────────────────────────────────────
1 │ {
2 │ "meta": {
3 │ "analysis": {
4 │ "arch": "i386",
5 │ "base_address": {
6 │ "type": "absolute",
7 │ "value": 268435456
8 │ },
9 │ "extractor": "VivisectFeatureExtractor",
10 │ "feature_counts": {
────┴────────────────────────────────────────────────────
"""
import sys
import logging
import argparse
import capa.render.json
import capa.render.proto
import capa.render.proto.capa_pb2
import capa.render.result_document
logger = logging.getLogger("capa.proto-to-results-json")
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Convert a capa protobuf result document into the JSON format")
parser.add_argument(
"pb", type=str, help="path to protobuf result document file, produced by `proto-from-results.py`"
)
logging_group = parser.add_argument_group("logging arguments")
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
logging_group.add_argument(
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
)
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
with open(args.pb, "rb") as f:
pb = f.read()
rdpb = capa.render.proto.capa_pb2.ResultDocument()
rdpb.ParseFromString(pb)
rd = capa.render.proto.doc_from_pb2(rdpb)
print(rd.json(exclude_none=True, indent=2, sort_keys=True))
if __name__ == "__main__":
sys.exit(main())

View File

@@ -68,6 +68,7 @@ import capa.render.verbose
import capa.features.freeze
import capa.render.result_document as rd
from capa.helpers import get_file_taste
from capa.features.common import FORMAT_AUTO
from capa.features.freeze import Address
logger = logging.getLogger("capa.show-capabilities-by-function")
@@ -130,7 +131,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"format", "backend", "sample", "signatures", "rules", "tag"})
capa.main.install_common_args(parser, wanted={"format", "os", "backend", "sample", "signatures", "rules", "tag"})
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
@@ -156,7 +157,7 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)):
format_ = "freeze"
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
@@ -166,7 +167,7 @@ def main(argv=None):
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.backend, sig_paths, should_save_workspace
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
@@ -175,7 +176,7 @@ def main(argv=None):
capa.helpers.log_unsupported_runtime_error()
return -1
meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor)
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)

View File

@@ -95,7 +95,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample")
capa.main.install_common_args(parser, wanted={"format", "sample", "signatures", "backend"})
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"})
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
args = parser.parse_args(args=argv)
@@ -113,14 +113,16 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.backend, sig_paths, should_save_workspace
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()

View File

@@ -27,7 +27,8 @@ requirements = [
"pyelftools==0.29",
"dnfile==0.13.0",
"dncil==1.0.2",
"pydantic==1.10.6",
"pydantic==1.10.7",
"protobuf==4.21.12",
]
# this sets __version__
@@ -75,10 +76,11 @@ setuptools.setup(
"pycodestyle==2.10.0",
"black==23.1.0",
"isort==5.11.4",
"mypy==1.0.1",
"mypy==1.1.1",
"psutil==5.9.2",
"stix2==3.0.1",
"requests==2.28.0",
"mypy-protobuf==3.4.0",
# type stubs for mypy
"types-backports==0.1.3",
"types-colorama==0.4.15",
@@ -87,6 +89,7 @@ setuptools.setup(
"types-termcolor==1.1.4",
"types-psutil==5.8.23",
"types_requests==2.28.1",
"types-protobuf==4.22.0.0",
],
"build": [
"pyinstaller==5.9.0",

View File

@@ -26,12 +26,14 @@ import capa.features.basicblock
from capa.features.common import (
OS,
OS_ANY,
OS_AUTO,
OS_LINUX,
ARCH_I386,
FORMAT_PE,
ARCH_AMD64,
FORMAT_ELF,
OS_WINDOWS,
FORMAT_AUTO,
FORMAT_DOTNET,
Arch,
Format,
@@ -104,9 +106,9 @@ def get_viv_extractor(path):
elif "raw64" in path:
vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths)
else:
vw = capa.main.get_workspace(path, "auto", sigpaths=sigpaths)
vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
vw.saveWorkspace()
extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO)
fixup_viv(path, extractor)
return extractor
@@ -158,6 +160,29 @@ def get_dnfile_extractor(path):
return extractor
@lru_cache(maxsize=1)
def get_binja_extractor(path):
from binaryninja import Settings, BinaryViewType
import capa.features.extractors.binja.extractor
# Workaround for a BN bug: https://github.com/Vector35/binaryninja-api/issues/4051
settings = Settings()
if path.endswith("kernel32-64.dll_"):
old_pdb = settings.get_bool("pdb.loadGlobalSymbols")
settings.set_bool("pdb.loadGlobalSymbols", False)
bv = BinaryViewType.get_view_of_file(path)
if path.endswith("kernel32-64.dll_"):
settings.set_bool("pdb.loadGlobalSymbols", old_pdb)
extractor = capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
# overload the extractor so that the fixture exposes `extractor.path`
setattr(extractor, "path", path)
return extractor
def extract_global_features(extractor):
features = collections.defaultdict(set)
for feature, va in extractor.extract_global_features():
@@ -668,7 +693,7 @@ FEATURE_PRESENCE_TESTS = sorted(
("mimikatz", "function=0x46D534", capa.features.common.Characteristic("nzxor"), False),
# insn/characteristic(nzxor): xorps
# viv needs fixup to recognize function, see above
("3b13b...", "function=0x10006860", capa.features.common.Characteristic("nzxor"), True),
("mimikatz", "function=0x410dfc", capa.features.common.Characteristic("nzxor"), True),
# insn/characteristic(peb access)
("kernel32-64", "function=0x1800017D0", capa.features.common.Characteristic("peb access"), True),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("peb access"), False),
@@ -1106,3 +1131,37 @@ def _0953c_dotnetfile_extractor():
@pytest.fixture
def _039a6_dotnetfile_extractor():
return get_dnfile_extractor(get_data_path_by_name("_039a6"))
def get_result_doc(path):
return capa.render.result_document.ResultDocument.parse_file(path)
@pytest.fixture
def pma0101_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json"))
@pytest.fixture
def dotnet_1c444e_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json"))
@pytest.fixture
def a3f3bbc_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json"))
@pytest.fixture
def al_khaserx86_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "al-khaser_x86.exe_.json"))
@pytest.fixture
def al_khaserx64_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "al-khaser_x64.exe_.json"))
@pytest.fixture
def a076114_rd():
return get_result_doc(os.path.join(CD, "data", "rd", "0761142efbda6c4b1e801223de723578.dll_.json"))

View File

@@ -0,0 +1,47 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import fixtures
from fixtures import *
logger = logging.getLogger(__file__)
# We need to skip the binja test if we cannot import binaryninja, e.g., in GitHub CI.
binja_present: bool = False
try:
import binaryninja
try:
binaryninja.load(source=b"\x90")
except RuntimeError as e:
logger.warning("Binary Ninja license is not valid, provide via $BN_LICENSE or license.dat")
else:
binja_present = True
except ImportError:
pass
@pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed")
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_binja_features(sample, scope, feature, expected):
fixtures.do_test_feature_presence(fixtures.get_binja_extractor, sample, scope, feature, expected)
@pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed")
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_binja_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_binja_extractor, sample, scope, feature, expected)

349
tests/test_proto.py Normal file
View File

@@ -0,0 +1,349 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import copy
from typing import Any
from fixtures import *
import capa.rules
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.features.address
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_doc_to_pb2(request, rd_file):
src: rd.ResultDocument = request.getfixturevalue(rd_file)
dst = capa.render.proto.doc_to_pb2(src)
assert_meta(src.meta, dst.meta)
for rule_name, matches in src.rules.items():
assert rule_name in dst.rules
m: capa_pb2.RuleMetadata = dst.rules[rule_name].meta
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.scope_to_pb2(matches.meta.scope) == m.scope
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
assert list(rd_attack.parts) == proto_attack.parts
assert rd_attack.tactic == proto_attack.tactic
assert rd_attack.technique == proto_attack.technique
assert rd_attack.subtechnique == proto_attack.subtechnique
assert len(matches.meta.mbc) == len(m.mbc)
for rd_mbc, proto_mbc in zip(matches.meta.mbc, m.mbc):
assert list(rd_mbc.parts) == proto_mbc.parts
assert rd_mbc.objective == proto_mbc.objective
assert rd_mbc.behavior == proto_mbc.behavior
assert rd_mbc.method == proto_mbc.method
assert rd_mbc.id == proto_mbc.id
assert list(matches.meta.references) == m.references
assert list(matches.meta.examples) == m.examples
assert matches.meta.description == m.description
assert matches.meta.lib == m.lib
assert matches.meta.is_subscope_rule == m.is_subscope_rule
assert cmp_optional(matches.meta.maec.analysis_conclusion, m.maec.analysis_conclusion)
assert cmp_optional(matches.meta.maec.analysis_conclusion_ov, m.maec.analysis_conclusion_ov)
assert cmp_optional(matches.meta.maec.malware_family, m.maec.malware_family)
assert cmp_optional(matches.meta.maec.malware_category, m.maec.malware_category)
assert cmp_optional(matches.meta.maec.malware_category_ov, m.maec.malware_category_ov)
assert matches.source == dst.rules[rule_name].source
assert len(matches.matches) == len(dst.rules[rule_name].matches)
for (addr, match), proto_match in zip(matches.matches, dst.rules[rule_name].matches):
assert capa.render.proto.addr_to_pb2(addr) == proto_match.address
assert_match(match, proto_match.match)
def test_addr_to_pb2():
a1 = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x400000))
a = capa.render.proto.addr_to_pb2(a1)
assert a.type == capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
a2 = capa.features.freeze.Address.from_capa(capa.features.address.RelativeVirtualAddress(0x100))
a = capa.render.proto.addr_to_pb2(a2)
assert a.type == capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
a3 = capa.features.freeze.Address.from_capa(capa.features.address.FileOffsetAddress(0x200))
a = capa.render.proto.addr_to_pb2(a3)
assert a.type == capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
a4 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenAddress(0x123456))
a = capa.render.proto.addr_to_pb2(a4)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
a5 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenOffsetAddress(0x123456, 0x10))
a = capa.render.proto.addr_to_pb2(a5)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
a6 = capa.features.freeze.Address.from_capa(capa.features.address._NoAddress())
a = capa.render.proto.addr_to_pb2(a6)
assert a.type == capa_pb2.ADDRESSTYPE_NO_ADDRESS
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FILE_SCOPE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FUNCTION_SCOPE)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.BASIC_BLOCK_SCOPE)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.INSTRUCTION_SCOPE)) == capa_pb2.SCOPE_INSTRUCTION
def cmp_optional(a: Any, b: Any) -> bool:
# proto optional value gets deserialized to "" instead of None (used by pydantic)
a = a if a is not None else ""
return a == b
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
assert [] == dst.argv
else:
assert list(meta.argv) == dst.argv
assert meta.sample.md5 == dst.sample.md5
assert meta.sample.sha1 == dst.sample.sha1
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
assert meta.analysis.format == dst.analysis.format
assert meta.analysis.arch == dst.analysis.arch
assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
for rd_f, proto_f in zip(meta.analysis.layout.functions, dst.analysis.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert meta.analysis.feature_counts.file == dst.analysis.feature_counts.file
assert len(meta.analysis.feature_counts.functions) == len(dst.analysis.feature_counts.functions)
for rd_cf, proto_cf in zip(meta.analysis.feature_counts.functions, dst.analysis.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(meta.analysis.library_functions) == len(dst.analysis.library_functions)
for rd_lf, proto_lf in zip(meta.analysis.library_functions, dst.analysis.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
assert ma.success == mb.success
# node
if isinstance(ma.node, rd.StatementNode):
assert_statement(ma.node, mb.statement)
elif isinstance(ma.node, rd.FeatureNode):
assert ma.node.type == mb.feature.type
assert_feature(ma.node.feature, mb.feature)
# children
assert len(ma.children) == len(mb.children)
for ca, cb in zip(ma.children, mb.children):
assert_match(ca, cb)
# locations
assert list(map(capa.render.proto.addr_to_pb2, ma.locations)) == mb.locations
# captures
assert len(ma.captures) == len(mb.captures)
for capture, locs in ma.captures.items():
assert capture in mb.captures
assert list(map(capa.render.proto.addr_to_pb2, locs)) == mb.captures[capture].address
def assert_feature(fa, fb):
# get field that has been set, e.g., os or api, to access inner fields
fb = getattr(fb, fb.WhichOneof("feature"))
assert fa.type == fb.type
assert cmp_optional(fa.description, fb.description)
if isinstance(fa, capa.features.freeze.features.OSFeature):
assert fa.os == fb.os
elif isinstance(fa, capa.features.freeze.features.ArchFeature):
assert fa.arch == fb.arch
elif isinstance(fa, capa.features.freeze.features.FormatFeature):
assert fa.format == fb.format
elif isinstance(fa, capa.features.freeze.features.MatchFeature):
assert fa.match == fb.match
elif isinstance(fa, capa.features.freeze.features.CharacteristicFeature):
assert fa.characteristic == fb.characteristic
elif isinstance(fa, capa.features.freeze.features.ExportFeature):
assert fa.export == fb.export
elif isinstance(fa, capa.features.freeze.features.ImportFeature):
assert fa.import_ == fb.import_ # or could use getattr
elif isinstance(fa, capa.features.freeze.features.SectionFeature):
assert fa.section == fb.section
elif isinstance(fa, capa.features.freeze.features.FunctionNameFeature):
assert fa.function_name == fb.function_name
elif isinstance(fa, capa.features.freeze.features.SubstringFeature):
assert fa.substring == fb.substring
elif isinstance(fa, capa.features.freeze.features.RegexFeature):
assert fa.regex == fb.regex
elif isinstance(fa, capa.features.freeze.features.StringFeature):
assert fa.string == fb.string
elif isinstance(fa, capa.features.freeze.features.ClassFeature):
assert fa.class_ == fb.class_
elif isinstance(fa, capa.features.freeze.features.NamespaceFeature):
assert fa.namespace == fb.namespace
elif isinstance(fa, capa.features.freeze.features.BasicBlockFeature):
pass
elif isinstance(fa, capa.features.freeze.features.APIFeature):
assert fa.api == fb.api
elif isinstance(fa, capa.features.freeze.features.PropertyFeature):
assert fa.property == fb.property_
assert fa.access == fb.access
elif isinstance(fa, capa.features.freeze.features.NumberFeature):
# get number value of set field
n = getattr(fb.number, fb.number.WhichOneof("value"))
assert fa.number == n
elif isinstance(fa, capa.features.freeze.features.BytesFeature):
assert fa.bytes == fb.bytes
elif isinstance(fa, capa.features.freeze.features.OffsetFeature):
assert fa.offset == getattr(fb.offset, fb.offset.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.MnemonicFeature):
assert fa.mnemonic == fb.mnemonic
elif isinstance(fa, capa.features.freeze.features.OperandNumberFeature):
assert fa.index == fb.index
assert fa.operand_number == getattr(fb.operand_number, fb.operand_number.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.OperandOffsetFeature):
assert fa.index == fb.index
assert fa.operand_offset == getattr(fb.operand_offset, fb.operand_offset.WhichOneof("value"))
else:
raise NotImplementedError(f"unhandled feature: {type(fa)}: {fa}")
def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
assert a.type == b.type
sa = a.statement
sb = getattr(b, str(b.WhichOneof("statement")))
assert sa.type == sb.type
assert cmp_optional(sa.description, sb.description)
if isinstance(sa, rd.RangeStatement):
assert isinstance(sb, capa_pb2.RangeStatement)
assert sa.min == sb.min
assert sa.max == sa.max
assert_feature(sa.child, sb.child)
elif isinstance(sa, rd.SomeStatement):
assert sa.count == sb.count
elif isinstance(sa, rd.SubscopeStatement):
assert capa.render.proto.scope_to_pb2(sa.scope) == sb.scope
elif isinstance(sa, rd.CompoundStatement):
# only has type and description tested above
pass
else:
# unhandled statement
assert False
def assert_round_trip(doc: rd.ResultDocument):
one = doc
pb = capa.render.proto.doc_to_pb2(one)
two = capa.render.proto.doc_from_pb2(pb)
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one == two
# second by showing their protobuf representations are the same.
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) == capa.render.proto.doc_to_pb2(
two
).SerializeToString(deterministic=True)
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) != capa.render.proto.doc_to_pb2(
three
).SerializeToString(deterministic=True)
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_round_trip(request, rd_file):
doc: rd.ResultDocument = request.getfixturevalue(rd_file)
assert_round_trip(doc)

View File

@@ -5,8 +5,11 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import copy
import pytest
import fixtures
from fixtures import *
import capa
import capa.engine as ceng
@@ -231,6 +234,43 @@ def test_basic_block_node_from_capa():
assert isinstance(node.feature, frzf.BasicBlockFeature)
def assert_round_trip(rd: rdoc.ResultDocument):
one = rd
doc = one.json(exclude_none=True)
two = rdoc.ResultDocument.parse_raw(doc)
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one == two
# second by showing their json representations are the same.
assert one.json(exclude_none=True) == two.json(exclude_none=True)
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
assert one.json(exclude_none=True) != three.json(exclude_none=True)
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_round_trip(request, rd_file):
rd: rdoc.ResultDocument = request.getfixturevalue(rd_file)
assert_round_trip(rd)
def test_json_to_rdoc():
path = fixtures.get_data_path_by_name("pma01-01-rd")
assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument)

View File

@@ -63,4 +63,22 @@ def test_bulk_process(tmpdir):
def run_program(script_path, args):
args = [sys.executable] + [script_path] + args
print(f"running: '{args}'")
return subprocess.run(args)
return subprocess.run(args, stdout=subprocess.PIPE)
def test_proto_conversion(tmpdir):
t = tmpdir.mkdir("proto-test")
json = os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json")
p = run_program(get_script_path("proto-from-results.py"), [json])
assert p.returncode == 0
pb = os.path.join(t, "pma.pb")
with open(pb, "wb") as f:
f.write(p.stdout)
p = run_program(get_script_path("proto-to-results.py"), [pb])
assert p.returncode == 0
assert p.stdout.startswith(b'{\n "meta": ')