mirror of
https://github.com/mandiant/capa.git
synced 2026-01-21 08:53:27 -08:00
Compare commits
41 Commits
feature/un
...
v1.2.1rc3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d03856c26 | ||
|
|
381e4abd17 | ||
|
|
7ab42d9889 | ||
|
|
b3c3c5579b | ||
|
|
2d20fe20c4 | ||
|
|
c4e4eb27fb | ||
|
|
f9eed2d5b2 | ||
|
|
a801a681b8 | ||
|
|
c25632b12c | ||
|
|
8e6974b10f | ||
|
|
7616603b11 | ||
|
|
7c27af8868 | ||
|
|
19e5e9b766 | ||
|
|
adeee3e834 | ||
|
|
c2997c8033 | ||
|
|
28b463f145 | ||
|
|
cc59f5b91e | ||
|
|
06ac49e629 | ||
|
|
6c07617082 | ||
|
|
13390918a1 | ||
|
|
0f44ec0dd8 | ||
|
|
c49199138e | ||
|
|
3f88bb8500 | ||
|
|
b2b9f15bc1 | ||
|
|
d2cd224fb3 | ||
|
|
aac13164a5 | ||
|
|
f2fff02b49 | ||
|
|
662a7eaae6 | ||
|
|
f6ba63083b | ||
|
|
49774110cc | ||
|
|
c7840e0769 | ||
|
|
d2155eb3a1 | ||
|
|
3772c5c0bc | ||
|
|
d47d149196 | ||
|
|
528645c0d2 | ||
|
|
7464a62943 | ||
|
|
34e7991081 | ||
|
|
3e20f0fc71 | ||
|
|
cb9bd2eab7 | ||
|
|
9d102843ac | ||
|
|
dc8870861b |
5
.github/pyinstaller/pyinstaller.spec
vendored
5
.github/pyinstaller/pyinstaller.spec
vendored
@@ -44,7 +44,6 @@ a = Analysis(
|
||||
hiddenimports=[
|
||||
# vivisect does manual/runtime importing of its modules,
|
||||
# so declare the things that could be imported here.
|
||||
"pycparser",
|
||||
"vivisect",
|
||||
"vivisect.analysis",
|
||||
"vivisect.analysis.amd64",
|
||||
@@ -92,11 +91,13 @@ a = Analysis(
|
||||
"vivisect.impapi.windows",
|
||||
"vivisect.impapi.windows.amd64",
|
||||
"vivisect.impapi.windows.i386",
|
||||
"vivisect.impapi.winkern.i386",
|
||||
"vivisect.impapi.winkern.amd64",
|
||||
"vivisect.parsers.blob",
|
||||
"vivisect.parsers.elf",
|
||||
"vivisect.parsers.ihex",
|
||||
"vivisect.parsers.macho",
|
||||
"vivisect.parsers.parse_pe",
|
||||
"vivisect.parsers.PE",
|
||||
"vivisect.parsers.utils",
|
||||
"vivisect.storage",
|
||||
"vivisect.storage.basicfile",
|
||||
|
||||
29
.github/workflows/publish.yml
vendored
Normal file
29
.github/workflows/publish.yml
vendored
Normal file
@@ -0,0 +1,29 @@
|
||||
# This workflows will upload a Python Package using Twine when a release is created
|
||||
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
|
||||
|
||||
name: publish to pypi
|
||||
|
||||
on:
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '2.7'
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install setuptools wheel twine
|
||||
- name: Build and publish
|
||||
env:
|
||||
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
|
||||
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
|
||||
run: |
|
||||
python setup.py sdist bdist_wheel
|
||||
twine upload --skip-existing dist/*
|
||||
3
.github/workflows/tests.yml
vendored
3
.github/workflows/tests.yml
vendored
@@ -62,8 +62,7 @@ jobs:
|
||||
with:
|
||||
python-version: ${{ matrix.python }}
|
||||
- name: Install capa
|
||||
# TODO: remove `pefile` when we bump lancelot >= 0.3.7
|
||||
run: pip install -e .[dev] pefile
|
||||
run: pip install -e .[dev]
|
||||
- name: Run tests
|
||||
run: pytest tests/
|
||||
|
||||
|
||||
4
.gitmodules
vendored
4
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "rules"]
|
||||
path = rules
|
||||
url = git@github.com:fireeye/capa-rules.git
|
||||
url = ../capa-rules.git
|
||||
[submodule "tests/data"]
|
||||
path = tests/data
|
||||
url = git@github.com:fireeye/capa-testfiles.git
|
||||
url = ../capa-testfiles.git
|
||||
|
||||
98
CHANGELOG.md
98
CHANGELOG.md
@@ -1,5 +1,103 @@
|
||||
# Change Log
|
||||
|
||||
## v1.2.0 (2020-08-31)
|
||||
|
||||
This release brings UI enhancements, especially for the IDA Pro plugin,
|
||||
investment towards py3 support,
|
||||
fixes some bugs identified by the community,
|
||||
and 46 (!) new rules.
|
||||
We received contributions from ten reverse engineers, including five new ones:
|
||||
|
||||
- @agithubuserlol
|
||||
- @recvfrom
|
||||
- @D4nch3n
|
||||
- @edeca
|
||||
- @winniepe
|
||||
|
||||
Download a standalone binary below and checkout the readme [here on GitHub](https://github.com/fireeye/capa/).
|
||||
Report issues on our [issue tracker](https://github.com/fireeye/capa/issues)
|
||||
and contribute new rules at [capa-rules](https://github.com/fireeye/capa-rules/).
|
||||
|
||||
### New features
|
||||
|
||||
- ida plugin: display arch flavors @mike-hunhoff
|
||||
- ida plugin: display block descriptions @mike-hunhoff
|
||||
- ida backend: extract features from nested pointers @mike-hunhoff
|
||||
- main: show more progress output @williballenthin
|
||||
- core: pin dependency versions #258 @recvfrom
|
||||
|
||||
### New rules
|
||||
- bypass UAC via AppInfo ALPC @agithubuserlol
|
||||
- bypass UAC via token manipulation @agithubuserlol
|
||||
- check for sandbox and av modules @re-fox
|
||||
- check for sandbox username @re-fox
|
||||
- check if process is running under wine @re-fox
|
||||
- validate credit card number using luhn algorithm @re-fox
|
||||
- validate credit card number using luhn algorithm with no lookup table @re-fox
|
||||
- hash data using FNV @edeca @mr-tz
|
||||
- link many functions at runtime @mr-tz
|
||||
- reference public RSA key @mr-tz
|
||||
- packed with ASPack @williballenthin
|
||||
- delete internet cache @mike-hunhoff
|
||||
- enumerate internet cache @mike-hunhoff
|
||||
- send ICMP echo request @mike-hunhoff
|
||||
- check for debugger via API @mike-hunhoff
|
||||
- check for hardware breakpoints @mike-hunhoff
|
||||
- check for kernel debugger via shared user data structure @mike-hunhoff
|
||||
- check for protected handle exception @mike-hunhoff
|
||||
- check for software breakpoints @mike-hunhoff
|
||||
- check for trap flag exception @mike-hunhoff
|
||||
- check for unexpected memory writes @mike-hunhoff
|
||||
- check process job object @mike-hunhoff
|
||||
- reference anti-VM strings targeting Parallels @mike-hunhoff
|
||||
- reference anti-VM strings targeting Qemu @mike-hunhoff
|
||||
- reference anti-VM strings targeting VirtualBox @mike-hunhoff
|
||||
- reference anti-VM strings targeting VirtualPC @mike-hunhoff
|
||||
- reference anti-VM strings targeting VMWare @mike-hunhoff
|
||||
- reference anti-VM strings targeting Xen @mike-hunhoff
|
||||
- reference analysis tools strings @mike-hunhoff
|
||||
- reference WMI statements @mike-hunhoff
|
||||
- get number of processor cores @mike-hunhoff
|
||||
- get number of processors @mike-hunhoff
|
||||
- enumerate disk properties @mike-hunhoff
|
||||
- get disk size @mike-hunhoff
|
||||
- get process heap flags @mike-hunhoff
|
||||
- get process heap force flags @mike-hunhoff
|
||||
- get Explorer PID @mike-hunhoff
|
||||
- delay execution @mike-hunhoff
|
||||
- check for process debug object @mike-hunhoff
|
||||
- check license value @mike-hunhoff
|
||||
- check ProcessDebugFlags @mike-hunhoff
|
||||
- check ProcessDebugPort @mike-hunhoff
|
||||
- check SystemKernelDebuggerInformation @mike-hunhoff
|
||||
- check thread yield allowed @mike-hunhoff
|
||||
- enumerate system firmware tables @mike-hunhoff
|
||||
- get system firmware table @mike-hunhoff
|
||||
- hide thread from debugger @mike-hunhoff
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- ida backend: extract unmapped immediate number features @mike-hunhoff
|
||||
- ida backend: fix stack cookie check #257 @mike-hunhoff
|
||||
- viv backend: better extract gs segment access @williballenthin
|
||||
- core: enable counting of string features #241 @D4nch3n @williballenthin
|
||||
- core: enable descriptions on feature with arch flavors @mike-hunhoff
|
||||
- core: update git links for non-SSH access #259 @recvfrom
|
||||
|
||||
### Changes
|
||||
|
||||
- ida plugin: better default display showing first level nesting @winniepe
|
||||
- remove unused `characteristic(switch)` feature @ana06
|
||||
- prepare testing infrastructure for multiple backends/py3 @williballenthin
|
||||
- ci: zip build artifacts @ana06
|
||||
- ci: build all supported python versions @ana06
|
||||
- code style and formatting @mr-tz
|
||||
|
||||
### Raw diffs
|
||||
|
||||
- [capa v1.1.0...v1.2.0](https://github.com/fireeye/capa/compare/v1.1.0...v1.2.0)
|
||||
- [capa-rules v1.1.0...v1.2.0](https://github.com/fireeye/capa-rules/compare/v1.1.0...v1.2.0)
|
||||
|
||||
## v1.1.0 (2020-08-05)
|
||||
|
||||
This release brings new rule format updates, such as adding `offset/x32` and negative offsets,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||

|
||||
|
||||
[](https://github.com/fireeye/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
|
||||
[](https://github.com/fireeye/capa-rules)
|
||||
[](https://github.com/fireeye/capa-rules)
|
||||
[](LICENSE.txt)
|
||||
|
||||
capa detects capabilities in executable files.
|
||||
|
||||
@@ -75,7 +75,7 @@ class IdaFeatureExtractor(FeatureExtractor):
|
||||
yield feature, ea
|
||||
|
||||
def get_basic_blocks(self, f):
|
||||
for bb in idaapi.FlowChart(f, flags=idaapi.FC_PREDS):
|
||||
for bb in capa.features.extractors.ida.helpers.get_function_blocks(f):
|
||||
yield add_ea_int_cast(bb)
|
||||
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
|
||||
@@ -20,10 +20,10 @@ from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
|
||||
|
||||
def get_printable_len(op):
|
||||
""" Return string length if all operand bytes are ascii or utf16-le printable
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable
|
||||
|
||||
args:
|
||||
op (IDA op_t)
|
||||
args:
|
||||
op (IDA op_t)
|
||||
"""
|
||||
op_val = capa.features.extractors.ida.helpers.mask_op_val(op)
|
||||
|
||||
@@ -62,10 +62,10 @@ def get_printable_len(op):
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(insn):
|
||||
""" verify instruction moves immediate onto stack
|
||||
"""verify instruction moves immediate onto stack
|
||||
|
||||
args:
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
if insn.Op2.type != idaapi.o_imm:
|
||||
return False
|
||||
@@ -80,13 +80,13 @@ def is_mov_imm_to_stack(insn):
|
||||
|
||||
|
||||
def bb_contains_stackstring(f, bb):
|
||||
""" check basic block for stackstring indicators
|
||||
"""check basic block for stackstring indicators
|
||||
|
||||
true if basic block contains enough moves of constant bytes to the stack
|
||||
true if basic block contains enough moves of constant bytes to the stack
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
"""
|
||||
count = 0
|
||||
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
@@ -98,33 +98,33 @@ def bb_contains_stackstring(f, bb):
|
||||
|
||||
|
||||
def extract_bb_stackstring(f, bb):
|
||||
""" extract stackstring indicators from basic block
|
||||
"""extract stackstring indicators from basic block
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
"""
|
||||
if bb_contains_stackstring(f, bb):
|
||||
yield Characteristic("stack string"), bb.start_ea
|
||||
|
||||
|
||||
def extract_bb_tight_loop(f, bb):
|
||||
""" extract tight loop indicators from a basic block
|
||||
"""extract tight loop indicators from a basic block
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
"""
|
||||
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(bb):
|
||||
yield Characteristic("tight loop"), bb.start_ea
|
||||
|
||||
|
||||
def extract_features(f, bb):
|
||||
""" extract basic block features
|
||||
"""extract basic block features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
"""
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for (feature, ea) in bb_handler(f, bb):
|
||||
|
||||
@@ -20,13 +20,13 @@ from capa.features.file import Export, Import, Section
|
||||
|
||||
|
||||
def check_segment_for_pe(seg):
|
||||
""" check segment for embedded PE
|
||||
"""check segment for embedded PE
|
||||
|
||||
adapted for IDA from:
|
||||
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
|
||||
adapted for IDA from:
|
||||
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
|
||||
|
||||
args:
|
||||
seg (IDA segment_t)
|
||||
args:
|
||||
seg (IDA segment_t)
|
||||
"""
|
||||
seg_max = seg.end_ea
|
||||
mz_xor = [
|
||||
@@ -67,11 +67,11 @@ def check_segment_for_pe(seg):
|
||||
|
||||
|
||||
def extract_file_embedded_pe():
|
||||
""" extract embedded PE features
|
||||
"""extract embedded PE features
|
||||
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
"""
|
||||
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
|
||||
for (ea, _) in check_segment_for_pe(seg):
|
||||
@@ -85,15 +85,15 @@ def extract_file_export_names():
|
||||
|
||||
|
||||
def extract_file_import_names():
|
||||
""" extract function imports
|
||||
"""extract function imports
|
||||
|
||||
1. imports by ordinal:
|
||||
- modulename.#ordinal
|
||||
1. imports by ordinal:
|
||||
- modulename.#ordinal
|
||||
|
||||
2. imports by name, results in two features to support importname-only
|
||||
matching:
|
||||
- modulename.importname
|
||||
- importname
|
||||
2. imports by name, results in two features to support importname-only
|
||||
matching:
|
||||
- modulename.importname
|
||||
- importname
|
||||
"""
|
||||
for (ea, info) in capa.features.extractors.ida.helpers.get_file_imports().items():
|
||||
if info[1]:
|
||||
@@ -104,22 +104,22 @@ def extract_file_import_names():
|
||||
|
||||
|
||||
def extract_file_section_names():
|
||||
""" extract section names
|
||||
"""extract section names
|
||||
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
"""
|
||||
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
|
||||
yield Section(idaapi.get_segm_name(seg)), seg.start_ea
|
||||
|
||||
|
||||
def extract_file_strings():
|
||||
""" extract ASCII and UTF-16 LE strings
|
||||
"""extract ASCII and UTF-16 LE strings
|
||||
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
IDA must load resource sections for this to be complete
|
||||
- '-R' from console
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
"""
|
||||
for seg in capa.features.extractors.ida.helpers.get_segments():
|
||||
seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(seg)
|
||||
|
||||
@@ -15,20 +15,20 @@ from capa.features.extractors import loops
|
||||
|
||||
|
||||
def extract_function_calls_to(f):
|
||||
""" extract callers to a function
|
||||
"""extract callers to a function
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
"""
|
||||
for ea in idautils.CodeRefsTo(f.start_ea, True):
|
||||
yield Characteristic("calls to"), ea
|
||||
|
||||
|
||||
def extract_function_loop(f):
|
||||
""" extract loop indicators from a function
|
||||
"""extract loop indicators from a function
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
"""
|
||||
edges = []
|
||||
|
||||
@@ -42,20 +42,20 @@ def extract_function_loop(f):
|
||||
|
||||
|
||||
def extract_recursive_call(f):
|
||||
""" extract recursive function call
|
||||
"""extract recursive function call
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
"""
|
||||
if capa.features.extractors.ida.helpers.is_function_recursive(f):
|
||||
yield Characteristic("recursive call"), f.start_ea
|
||||
|
||||
|
||||
def extract_features(f):
|
||||
""" extract function features
|
||||
"""extract function features
|
||||
|
||||
arg:
|
||||
f (IDA func_t)
|
||||
arg:
|
||||
f (IDA func_t)
|
||||
"""
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for (feature, ea) in func_handler(f):
|
||||
|
||||
@@ -15,12 +15,12 @@ import idautils
|
||||
|
||||
|
||||
def find_byte_sequence(start, end, seq):
|
||||
""" find byte sequence
|
||||
"""find byte sequence
|
||||
|
||||
args:
|
||||
start: min virtual address
|
||||
end: max virtual address
|
||||
seq: bytes to search e.g. b'\x01\x03'
|
||||
args:
|
||||
start: min virtual address
|
||||
end: max virtual address
|
||||
seq: bytes to search e.g. b'\x01\x03'
|
||||
"""
|
||||
if sys.version_info[0] >= 3:
|
||||
return idaapi.find_binary(start, end, " ".join(["%02x" % b for b in seq]), 0, idaapi.SEARCH_DOWN)
|
||||
@@ -29,14 +29,14 @@ def find_byte_sequence(start, end, seq):
|
||||
|
||||
|
||||
def get_functions(start=None, end=None, skip_thunks=False, skip_libs=False):
|
||||
""" get functions, range optional
|
||||
"""get functions, range optional
|
||||
|
||||
args:
|
||||
start: min virtual address
|
||||
end: max virtual address
|
||||
args:
|
||||
start: min virtual address
|
||||
end: max virtual address
|
||||
|
||||
ret:
|
||||
yield func_t*
|
||||
ret:
|
||||
yield func_t*
|
||||
"""
|
||||
for ea in idautils.Functions(start=start, end=end):
|
||||
f = idaapi.get_func(ea)
|
||||
@@ -45,10 +45,10 @@ def get_functions(start=None, end=None, skip_thunks=False, skip_libs=False):
|
||||
|
||||
|
||||
def get_segments(skip_header_segments=False):
|
||||
""" get list of segments (sections) in the binary image
|
||||
"""get list of segments (sections) in the binary image
|
||||
|
||||
args:
|
||||
skip_header_segments: IDA may load header segments - skip if set
|
||||
args:
|
||||
skip_header_segments: IDA may load header segments - skip if set
|
||||
"""
|
||||
for n in range(idaapi.get_segm_qty()):
|
||||
seg = idaapi.getnseg(n)
|
||||
@@ -57,9 +57,9 @@ def get_segments(skip_header_segments=False):
|
||||
|
||||
|
||||
def get_segment_buffer(seg):
|
||||
""" return bytes stored in a given segment
|
||||
"""return bytes stored in a given segment
|
||||
|
||||
decrease buffer size until IDA is able to read bytes from the segment
|
||||
decrease buffer size until IDA is able to read bytes from the segment
|
||||
"""
|
||||
buff = b""
|
||||
sz = seg.end_ea - seg.start_ea
|
||||
@@ -97,13 +97,13 @@ def get_file_imports():
|
||||
|
||||
|
||||
def get_instructions_in_range(start, end):
|
||||
""" yield instructions in range
|
||||
"""yield instructions in range
|
||||
|
||||
args:
|
||||
start: virtual address (inclusive)
|
||||
end: virtual address (exclusive)
|
||||
yield:
|
||||
(insn_t*)
|
||||
args:
|
||||
start: virtual address (inclusive)
|
||||
end: virtual address (exclusive)
|
||||
yield:
|
||||
(insn_t*)
|
||||
"""
|
||||
for head in idautils.Heads(start, end):
|
||||
insn = idautils.DecodeInstruction(head)
|
||||
@@ -183,10 +183,10 @@ def find_string_at(ea, min=4):
|
||||
|
||||
|
||||
def get_op_phrase_info(op):
|
||||
""" parse phrase features from operand
|
||||
"""parse phrase features from operand
|
||||
|
||||
Pretty much dup of sark's implementation:
|
||||
https://github.com/tmr232/Sark/blob/master/sark/code/instruction.py#L28-L73
|
||||
Pretty much dup of sark's implementation:
|
||||
https://github.com/tmr232/Sark/blob/master/sark/code/instruction.py#L28-L73
|
||||
"""
|
||||
if op.type not in (idaapi.o_phrase, idaapi.o_displ):
|
||||
return {}
|
||||
@@ -269,15 +269,15 @@ def is_op_stack_var(ea, index):
|
||||
|
||||
|
||||
def mask_op_val(op):
|
||||
""" mask value by data type
|
||||
"""mask value by data type
|
||||
|
||||
necessary due to a bug in AMD64
|
||||
necessary due to a bug in AMD64
|
||||
|
||||
Example:
|
||||
.rsrc:0054C12C mov [ebp+var_4], 0FFFFFFFFh
|
||||
Example:
|
||||
.rsrc:0054C12C mov [ebp+var_4], 0FFFFFFFFh
|
||||
|
||||
insn.Op2.dtype == idaapi.dt_dword
|
||||
insn.Op2.value == 0xffffffffffffffff
|
||||
insn.Op2.dtype == idaapi.dt_dword
|
||||
insn.Op2.value == 0xffffffffffffffff
|
||||
"""
|
||||
masks = {
|
||||
idaapi.dt_byte: 0xFF,
|
||||
@@ -289,10 +289,10 @@ def mask_op_val(op):
|
||||
|
||||
|
||||
def is_function_recursive(f):
|
||||
""" check if function is recursive
|
||||
"""check if function is recursive
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
"""
|
||||
for ref in idautils.CodeRefsTo(f.start_ea, True):
|
||||
if f.contains(ref):
|
||||
@@ -301,13 +301,13 @@ def is_function_recursive(f):
|
||||
|
||||
|
||||
def is_basic_block_tight_loop(bb):
|
||||
""" check basic block loops to self
|
||||
"""check basic block loops to self
|
||||
|
||||
true if last instruction in basic block branches to basic block start
|
||||
true if last instruction in basic block branches to basic block start
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
"""
|
||||
bb_end = idc.prev_head(bb.end_ea)
|
||||
if bb.start_ea < bb_end:
|
||||
@@ -341,3 +341,21 @@ def find_data_reference_from_insn(insn, max_depth=10):
|
||||
ea = data_refs[0]
|
||||
|
||||
return ea
|
||||
|
||||
|
||||
def get_function_blocks(f):
|
||||
"""yield basic blocks contained in specified function
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
yield:
|
||||
block (IDA BasicBlock)
|
||||
"""
|
||||
# leverage idaapi.FC_NOEXT flag to ignore useless external blocks referenced by the function
|
||||
for block in idaapi.FlowChart(f, flags=(idaapi.FC_PREDS | idaapi.FC_NOEXT)):
|
||||
yield block
|
||||
|
||||
|
||||
def is_basic_block_return(bb):
|
||||
""" check if basic block is return block """
|
||||
return bb.type == idaapi.fcb_ret
|
||||
|
||||
@@ -15,6 +15,10 @@ import capa.features.extractors.ida.helpers
|
||||
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
|
||||
from capa.features.insn import Number, Offset, Mnemonic
|
||||
|
||||
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
|
||||
|
||||
def get_arch(ctx):
|
||||
"""
|
||||
@@ -62,15 +66,15 @@ def check_for_api_call(ctx, insn):
|
||||
|
||||
|
||||
def extract_insn_api_features(f, bb, insn):
|
||||
""" parse instruction API features
|
||||
"""parse instruction API features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
|
||||
example:
|
||||
call dword [0x00473038]
|
||||
example:
|
||||
call dword [0x00473038]
|
||||
"""
|
||||
for api in check_for_api_call(f.ctx, insn):
|
||||
for (feature, ea) in capa.features.extractors.helpers.generate_api_features(api, insn.ea):
|
||||
@@ -78,15 +82,15 @@ def extract_insn_api_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_number_features(f, bb, insn):
|
||||
""" parse instruction number features
|
||||
"""parse instruction number features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
|
||||
example:
|
||||
push 3136B0h ; dwControlCode
|
||||
example:
|
||||
push 3136B0h ; dwControlCode
|
||||
"""
|
||||
if idaapi.is_ret_insn(insn):
|
||||
# skip things like:
|
||||
@@ -109,15 +113,15 @@ def extract_insn_number_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_bytes_features(f, bb, insn):
|
||||
""" parse referenced byte sequences
|
||||
"""parse referenced byte sequences
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
|
||||
example:
|
||||
push offset iid_004118d4_IShellLinkA ; riid
|
||||
example:
|
||||
push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref != insn.ea:
|
||||
@@ -127,15 +131,15 @@ def extract_insn_bytes_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_string_features(f, bb, insn):
|
||||
""" parse instruction string features
|
||||
"""parse instruction string features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
|
||||
example:
|
||||
push offset aAcr ; "ACR > "
|
||||
example:
|
||||
push offset aAcr ; "ACR > "
|
||||
"""
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref != insn.ea:
|
||||
@@ -145,15 +149,15 @@ def extract_insn_string_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_offset_features(f, bb, insn):
|
||||
""" parse instruction structure offset features
|
||||
"""parse instruction structure offset features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
|
||||
example:
|
||||
.text:0040112F cmp [esi+4], ebx
|
||||
example:
|
||||
.text:0040112F cmp [esi+4], ebx
|
||||
"""
|
||||
for op in capa.features.extractors.ida.helpers.get_insn_ops(insn, target_ops=(idaapi.o_phrase, idaapi.o_displ)):
|
||||
if capa.features.extractors.ida.helpers.is_op_stack_var(insn.ea, op.n):
|
||||
@@ -175,11 +179,11 @@ def extract_insn_offset_features(f, bb, insn):
|
||||
|
||||
|
||||
def contains_stack_cookie_keywords(s):
|
||||
""" check if string contains stack cookie keywords
|
||||
"""check if string contains stack cookie keywords
|
||||
|
||||
Examples:
|
||||
xor ecx, ebp ; StackCookie
|
||||
mov eax, ___security_cookie
|
||||
Examples:
|
||||
xor ecx, ebp ; StackCookie
|
||||
mov eax, ___security_cookie
|
||||
"""
|
||||
if not s:
|
||||
return False
|
||||
@@ -190,30 +194,30 @@ def contains_stack_cookie_keywords(s):
|
||||
|
||||
|
||||
def bb_stack_cookie_registers(bb):
|
||||
""" scan basic block for stack cookie operations
|
||||
"""scan basic block for stack cookie operations
|
||||
|
||||
yield registers ids that may have been used for stack cookie operations
|
||||
yield registers ids that may have been used for stack cookie operations
|
||||
|
||||
assume instruction that sets stack cookie and nzxor exist in same block
|
||||
and stack cookie register is not modified prior to nzxor
|
||||
assume instruction that sets stack cookie and nzxor exist in same block
|
||||
and stack cookie register is not modified prior to nzxor
|
||||
|
||||
Example:
|
||||
.text:004062DA mov eax, ___security_cookie <-- stack cookie
|
||||
.text:004062DF mov ecx, eax
|
||||
.text:004062E1 mov ebx, [esi]
|
||||
.text:004062E3 and ecx, 1Fh
|
||||
.text:004062E6 mov edi, [esi+4]
|
||||
.text:004062E9 xor ebx, eax
|
||||
.text:004062EB mov esi, [esi+8]
|
||||
.text:004062EE xor edi, eax <-- ignore
|
||||
.text:004062F0 xor esi, eax <-- ignore
|
||||
.text:004062F2 ror edi, cl
|
||||
.text:004062F4 ror esi, cl
|
||||
.text:004062F6 ror ebx, cl
|
||||
.text:004062F8 cmp edi, esi
|
||||
.text:004062FA jnz loc_40639D
|
||||
Example:
|
||||
.text:004062DA mov eax, ___security_cookie <-- stack cookie
|
||||
.text:004062DF mov ecx, eax
|
||||
.text:004062E1 mov ebx, [esi]
|
||||
.text:004062E3 and ecx, 1Fh
|
||||
.text:004062E6 mov edi, [esi+4]
|
||||
.text:004062E9 xor ebx, eax
|
||||
.text:004062EB mov esi, [esi+8]
|
||||
.text:004062EE xor edi, eax <-- ignore
|
||||
.text:004062F0 xor esi, eax <-- ignore
|
||||
.text:004062F2 ror edi, cl
|
||||
.text:004062F4 ror esi, cl
|
||||
.text:004062F6 ror ebx, cl
|
||||
.text:004062F8 cmp edi, esi
|
||||
.text:004062FA jnz loc_40639D
|
||||
|
||||
TODO: this is expensive, but necessary?...
|
||||
TODO: this is expensive, but necessary?...
|
||||
"""
|
||||
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
if contains_stack_cookie_keywords(idc.GetDisasm(insn.ea)):
|
||||
@@ -223,12 +227,37 @@ def bb_stack_cookie_registers(bb):
|
||||
yield op.reg
|
||||
|
||||
|
||||
def is_nzxor_stack_cookie_delta(f, bb, insn):
|
||||
""" check if nzxor exists within stack cookie delta """
|
||||
# security cookie check should use SP or BP
|
||||
if not capa.features.extractors.ida.helpers.is_frame_register(insn.Op2.reg):
|
||||
return False
|
||||
|
||||
f_bbs = tuple(capa.features.extractors.ida.helpers.get_function_blocks(f))
|
||||
|
||||
# expect security cookie init in first basic block within first bytes (instructions)
|
||||
if capa.features.extractors.ida.helpers.is_basic_block_equal(bb, f_bbs[0]) and insn.ea < (
|
||||
bb.start_ea + SECURITY_COOKIE_BYTES_DELTA
|
||||
):
|
||||
return True
|
||||
|
||||
# ... or within last bytes (instructions) before a return
|
||||
if capa.features.extractors.ida.helpers.is_basic_block_return(bb) and insn.ea > (
|
||||
bb.start_ea + capa.features.extractors.ida.helpers.basic_block_size(bb) - SECURITY_COOKIE_BYTES_DELTA
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_nzxor_stack_cookie(f, bb, insn):
|
||||
""" check if nzxor is related to stack cookie """
|
||||
if contains_stack_cookie_keywords(idaapi.get_cmt(insn.ea, False)):
|
||||
# Example:
|
||||
# xor ecx, ebp ; StackCookie
|
||||
return True
|
||||
if is_nzxor_stack_cookie_delta(f, bb, insn):
|
||||
return True
|
||||
stack_cookie_regs = tuple(bb_stack_cookie_registers(bb))
|
||||
if any(op_reg in stack_cookie_regs for op_reg in (insn.Op1.reg, insn.Op2.reg)):
|
||||
# Example:
|
||||
@@ -239,14 +268,14 @@ def is_nzxor_stack_cookie(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(f, bb, insn):
|
||||
""" parse instruction non-zeroing XOR instruction
|
||||
"""parse instruction non-zeroing XOR instruction
|
||||
|
||||
ignore expected non-zeroing XORs, e.g. security cookies
|
||||
ignore expected non-zeroing XORs, e.g. security cookies
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
if insn.itype != idaapi.NN_xor:
|
||||
return
|
||||
@@ -258,23 +287,23 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(f, bb, insn):
|
||||
""" parse instruction mnemonic features
|
||||
"""parse instruction mnemonic features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
yield Mnemonic(insn.get_canon_mnem()), insn.ea
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(f, bb, insn):
|
||||
""" parse instruction peb access
|
||||
"""parse instruction peb access
|
||||
|
||||
fs:[0x30] on x86, gs:[0x60] on x64
|
||||
fs:[0x30] on x86, gs:[0x60] on x64
|
||||
|
||||
TODO:
|
||||
IDA should be able to do this..
|
||||
TODO:
|
||||
IDA should be able to do this..
|
||||
"""
|
||||
if insn.itype not in (idaapi.NN_push, idaapi.NN_mov):
|
||||
return
|
||||
@@ -291,10 +320,10 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(f, bb, insn):
|
||||
""" parse instruction fs or gs access
|
||||
"""parse instruction fs or gs access
|
||||
|
||||
TODO:
|
||||
IDA should be able to do this...
|
||||
TODO:
|
||||
IDA should be able to do this...
|
||||
"""
|
||||
if all(map(lambda op: op.type != idaapi.o_mem, insn.ops)):
|
||||
# try to optimize for only memory references
|
||||
@@ -312,12 +341,12 @@ def extract_insn_segment_access_features(f, bb, insn):
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(f, bb, insn):
|
||||
""" inspect the instruction for a CALL or JMP that crosses section boundaries
|
||||
"""inspect the instruction for a CALL or JMP that crosses section boundaries
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
for ref in idautils.CodeRefsFrom(insn.ea, False):
|
||||
if ref in get_imports(f.ctx).keys():
|
||||
@@ -332,14 +361,14 @@ def extract_insn_cross_section_cflow(f, bb, insn):
|
||||
|
||||
|
||||
def extract_function_calls_from(f, bb, insn):
|
||||
""" extract functions calls from features
|
||||
"""extract functions calls from features
|
||||
|
||||
most relevant at the function scope, however, its most efficient to extract at the instruction scope
|
||||
most relevant at the function scope, however, its most efficient to extract at the instruction scope
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
if idaapi.is_call_insn(insn):
|
||||
for ref in idautils.CodeRefsFrom(insn.ea, False):
|
||||
@@ -347,28 +376,28 @@ def extract_function_calls_from(f, bb, insn):
|
||||
|
||||
|
||||
def extract_function_indirect_call_characteristic_features(f, bb, insn):
|
||||
""" extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
|
||||
most relevant at the function or basic block scope;
|
||||
however, its most efficient to extract at the instruction scope
|
||||
most relevant at the function or basic block scope;
|
||||
however, its most efficient to extract at the instruction scope
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
if idaapi.is_call_insn(insn) and idc.get_operand_type(insn.ea, 0) in (idc.o_reg, idc.o_phrase, idc.o_displ):
|
||||
yield Characteristic("indirect call"), insn.ea
|
||||
|
||||
|
||||
def extract_features(f, bb, insn):
|
||||
""" extract instruction features
|
||||
"""extract instruction features
|
||||
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
args:
|
||||
f (IDA func_t)
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
for inst_handler in INSTRUCTION_HANDLERS:
|
||||
for (feature, ea) in inst_handler(f, bb, insn):
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import logging
|
||||
|
||||
import lancelot
|
||||
|
||||
import capa.features.extractors
|
||||
import capa.features.extractors.lancelot.file
|
||||
import capa.features.extractors.lancelot.insn
|
||||
import capa.features.extractors.lancelot.function
|
||||
import capa.features.extractors.lancelot.basicblock
|
||||
|
||||
__all__ = ["file", "function", "basicblock", "insn"]
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BB(object):
|
||||
"""extend the lancelot.BasicBlock with an __int__ method to access the address"""
|
||||
|
||||
def __init__(self, ws, bb):
|
||||
super(BB, self).__init__()
|
||||
self.ws = ws
|
||||
self.address = bb.address
|
||||
self.length = bb.length
|
||||
self.predecessors = bb.predecessors
|
||||
self.successors = bb.successors
|
||||
|
||||
def __int__(self):
|
||||
return self.address
|
||||
|
||||
@property
|
||||
def instructions(self):
|
||||
va = self.address
|
||||
while va < self.address + self.length:
|
||||
try:
|
||||
insn = self.ws.read_insn(va)
|
||||
except ValueError:
|
||||
logger.warning("failed to read instruction at 0x%x", va)
|
||||
return
|
||||
|
||||
yield insn
|
||||
va += insn.length
|
||||
|
||||
|
||||
class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor):
|
||||
def __init__(self, buf):
|
||||
super(LancelotFeatureExtractor, self).__init__()
|
||||
self.buf = buf
|
||||
self.ws = lancelot.from_bytes(buf)
|
||||
self.ctx = {}
|
||||
|
||||
def get_base_address(self):
|
||||
return self.ws.base_address
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf):
|
||||
yield feature, va
|
||||
|
||||
def get_functions(self):
|
||||
for va in self.ws.get_functions():
|
||||
# this is just the address of the function
|
||||
yield va
|
||||
|
||||
def extract_function_features(self, f):
|
||||
for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f):
|
||||
yield feature, va
|
||||
|
||||
def get_basic_blocks(self, f):
|
||||
try:
|
||||
cfg = self.ws.build_cfg(f)
|
||||
except:
|
||||
logger.warning("failed to build CFG for 0x%x", f)
|
||||
return
|
||||
else:
|
||||
for bb in cfg.basic_blocks.values():
|
||||
yield BB(self.ws, bb)
|
||||
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb):
|
||||
yield feature, va
|
||||
|
||||
def get_instructions(self, f, bb):
|
||||
return bb.instructions
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn):
|
||||
yield feature, va
|
||||
@@ -1,120 +0,0 @@
|
||||
import string
|
||||
import struct
|
||||
import logging
|
||||
|
||||
from lancelot import (
|
||||
FLOW_VA,
|
||||
OPERAND_SIZE,
|
||||
OPERAND_TYPE,
|
||||
MEMORY_OPERAND_BASE,
|
||||
OPERAND_TYPE_MEMORY,
|
||||
OPERAND_TYPE_IMMEDIATE,
|
||||
IMMEDIATE_OPERAND_VALUE,
|
||||
)
|
||||
|
||||
from capa.features import Characteristic
|
||||
from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_bb_tight_loop(ws, bb):
|
||||
""" check basic block for tight loop indicators """
|
||||
if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors):
|
||||
yield Characteristic("tight loop"), bb.address
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(insn):
|
||||
if not insn.mnemonic.startswith("mov"):
|
||||
return False
|
||||
|
||||
try:
|
||||
dst, src = insn.operands
|
||||
except ValueError:
|
||||
# not two operands
|
||||
return False
|
||||
|
||||
if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
|
||||
return False
|
||||
|
||||
if src[IMMEDIATE_OPERAND_VALUE] < 0:
|
||||
return False
|
||||
|
||||
if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
|
||||
return False
|
||||
|
||||
if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == b"\x00" for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
def get_printable_len(operand):
|
||||
"""
|
||||
Return string length if all operand bytes are ascii or utf16-le printable
|
||||
"""
|
||||
operand_size = operand[OPERAND_SIZE]
|
||||
if operand_size == 8:
|
||||
chars = struct.pack("<B", operand[IMMEDIATE_OPERAND_VALUE])
|
||||
elif operand_size == 16:
|
||||
chars = struct.pack("<H", operand[IMMEDIATE_OPERAND_VALUE])
|
||||
elif operand_size == 32:
|
||||
chars = struct.pack("<I", operand[IMMEDIATE_OPERAND_VALUE])
|
||||
elif operand_size == 64:
|
||||
chars = struct.pack("<Q", operand[IMMEDIATE_OPERAND_VALUE])
|
||||
else:
|
||||
raise ValueError("unexpected operand size: " + str(operand_size))
|
||||
|
||||
if is_printable_ascii(chars):
|
||||
return operand_size / 8
|
||||
if is_printable_utf16le(chars):
|
||||
return operand_size / 16
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def _bb_has_stackstring(ws, bb):
|
||||
"""
|
||||
extract potential stackstring creation, using the following heuristics:
|
||||
- basic block contains enough moves of constant bytes to the stack
|
||||
"""
|
||||
count = 0
|
||||
for insn in bb.instructions:
|
||||
if is_mov_imm_to_stack(insn):
|
||||
# add number of operand bytes
|
||||
src = insn.operands[1]
|
||||
count += get_printable_len(src)
|
||||
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def extract_stackstring(ws, bb):
|
||||
""" check basic block for stackstring indicators """
|
||||
if _bb_has_stackstring(ws, bb):
|
||||
yield Characteristic("stack string"), bb.address
|
||||
|
||||
|
||||
def extract_basic_block_features(ws, bb):
|
||||
yield BasicBlock(), bb.address
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for feature, va in bb_handler(ws, bb):
|
||||
yield feature, va
|
||||
|
||||
|
||||
BASIC_BLOCK_HANDLERS = (
|
||||
extract_bb_tight_loop,
|
||||
extract_stackstring,
|
||||
)
|
||||
@@ -1,81 +0,0 @@
|
||||
import pefile
|
||||
|
||||
import capa.features.extractors.strings
|
||||
from capa.features import String, Characteristic
|
||||
from capa.features.file import Export, Import, Section
|
||||
|
||||
|
||||
def extract_file_embedded_pe(buf, pe):
|
||||
buf = buf[2:]
|
||||
|
||||
total_offset = 2
|
||||
while True:
|
||||
try:
|
||||
offset = buf.index(b"MZ")
|
||||
except ValueError:
|
||||
return
|
||||
else:
|
||||
rest = buf[offset:]
|
||||
total_offset += offset
|
||||
|
||||
try:
|
||||
_ = pefile.PE(data=rest)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
yield Characteristic("embedded pe"), total_offset
|
||||
|
||||
buf = rest[2:]
|
||||
total_offset += 2
|
||||
|
||||
|
||||
def extract_file_export_names(buf, pe):
|
||||
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
|
||||
return
|
||||
|
||||
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
|
||||
yield Export(exp.name.decode("ascii")), base_address + exp.address
|
||||
|
||||
|
||||
def extract_file_import_names(buf, pe):
|
||||
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||
libname = entry.dll.decode("ascii").lower().partition(".")[0]
|
||||
for imp in entry.imports:
|
||||
if imp.ordinal:
|
||||
yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address
|
||||
else:
|
||||
impname = imp.name.decode("ascii")
|
||||
yield Import("%s.%s" % (libname, impname)), imp.address
|
||||
yield Import("%s" % (impname)), imp.address
|
||||
|
||||
|
||||
def extract_file_section_names(buf, pe):
|
||||
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||
for section in pe.sections:
|
||||
yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress
|
||||
|
||||
|
||||
def extract_file_strings(buf, pe):
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_features(buf):
|
||||
pe = pefile.PE(data=buf)
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(buf, pe):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_embedded_pe,
|
||||
extract_file_export_names,
|
||||
extract_file_import_names,
|
||||
extract_file_section_names,
|
||||
extract_file_strings,
|
||||
)
|
||||
@@ -1,64 +0,0 @@
|
||||
import logging
|
||||
|
||||
try:
|
||||
from functools import lru_cache
|
||||
except ImportError:
|
||||
from backports.functools_lru_cache import lru_cache
|
||||
|
||||
from lancelot import (
|
||||
FLOW_VA,
|
||||
FLOW_TYPE,
|
||||
FLOW_TYPE_CONDITIONAL_JUMP,
|
||||
FLOW_TYPE_CONDITIONAL_MOVE,
|
||||
FLOW_TYPE_UNCONDITIONAL_JUMP,
|
||||
)
|
||||
|
||||
from capa.features import Characteristic
|
||||
from capa.features.extractors import loops
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_call_graph(ws):
|
||||
return ws.build_call_graph()
|
||||
|
||||
|
||||
def extract_function_calls_to(ws, f):
|
||||
cg = get_call_graph(ws)
|
||||
|
||||
for caller in cg.calls_to.get(f, []):
|
||||
yield Characteristic("calls to"), caller
|
||||
|
||||
|
||||
def extract_function_loop(ws, f):
|
||||
edges = []
|
||||
for bb in ws.build_cfg(f).basic_blocks.values():
|
||||
for flow in bb.successors:
|
||||
if flow[FLOW_TYPE] in (
|
||||
FLOW_TYPE_UNCONDITIONAL_JUMP,
|
||||
FLOW_TYPE_CONDITIONAL_JUMP,
|
||||
FLOW_TYPE_CONDITIONAL_MOVE,
|
||||
):
|
||||
edges.append((bb.address, flow[FLOW_VA]))
|
||||
continue
|
||||
|
||||
if edges and loops.has_loop(edges):
|
||||
yield Characteristic("loop"), f
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
|
||||
|
||||
_not_implemented = set([])
|
||||
|
||||
|
||||
def extract_function_features(ws, f):
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
try:
|
||||
for feature, va in func_handler(ws, f):
|
||||
yield feature, va
|
||||
except NotImplementedError:
|
||||
if func_handler.__name__ not in _not_implemented:
|
||||
logger.warning("not implemented: %s", func_handler.__name__)
|
||||
_not_implemented.add(func_handler.__name__)
|
||||
@@ -1,33 +0,0 @@
|
||||
from lancelot import (
|
||||
OPERAND_TYPE,
|
||||
MEMORY_OPERAND_BASE,
|
||||
MEMORY_OPERAND_DISP,
|
||||
OPERAND_TYPE_MEMORY,
|
||||
OPERAND_TYPE_IMMEDIATE,
|
||||
IMMEDIATE_OPERAND_VALUE,
|
||||
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||
)
|
||||
|
||||
|
||||
def get_operand_target(insn, op):
|
||||
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||
# call direct, x64
|
||||
# rip relative
|
||||
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
|
||||
if op[MEMORY_OPERAND_BASE] == "rip":
|
||||
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||
|
||||
# call direct, x32
|
||||
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
|
||||
elif op[MEMORY_OPERAND_BASE] == None:
|
||||
return op[MEMORY_OPERAND_DISP]
|
||||
|
||||
# call via thunk
|
||||
# mimikatz:0x455A41 call LsaQueryInformationPolicy
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
|
||||
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
|
||||
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
|
||||
return op[IMMEDIATE_OPERAND_VALUE]
|
||||
|
||||
raise ValueError("memory operand has no target")
|
||||
@@ -1,149 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from lancelot import (
|
||||
FLOW_VA,
|
||||
OPERAND_TYPE,
|
||||
PERMISSION_READ,
|
||||
MEMORY_OPERAND_BASE,
|
||||
MEMORY_OPERAND_DISP,
|
||||
OPERAND_TYPE_MEMORY,
|
||||
MEMORY_OPERAND_INDEX,
|
||||
OPERAND_TYPE_REGISTER,
|
||||
MEMORY_OPERAND_SEGMENT,
|
||||
OPERAND_TYPE_IMMEDIATE,
|
||||
IMMEDIATE_OPERAND_VALUE,
|
||||
REGISTER_OPERAND_REGISTER,
|
||||
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||
)
|
||||
|
||||
from capa.features.extractors.lancelot.helpers import get_operand_target
|
||||
|
||||
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
|
||||
|
||||
|
||||
class NotFoundError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def read_instructions(ws, bb):
|
||||
va = bb.address
|
||||
while va < bb.address + bb.length:
|
||||
try:
|
||||
insn = ws.read_insn(va)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
yield insn
|
||||
va += insn.length
|
||||
|
||||
|
||||
def build_instruction_predecessors(ws, cfg):
|
||||
preds = collections.defaultdict(set)
|
||||
|
||||
for bb in cfg.basic_blocks.values():
|
||||
insns = list(read_instructions(ws, bb))
|
||||
|
||||
for i, insn in enumerate(insns):
|
||||
if i == 0:
|
||||
for pred in bb.predecessors:
|
||||
pred_bb = cfg.basic_blocks[pred[FLOW_VA]]
|
||||
preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address)
|
||||
else:
|
||||
preds[insn.address].add(insns[i - 1].address)
|
||||
|
||||
return preds
|
||||
|
||||
|
||||
def find_definition(ws, f, insn):
|
||||
"""
|
||||
scan backwards from the given address looking for assignments to the given register.
|
||||
if a constant, return that value.
|
||||
args:
|
||||
ws (lancelot.PE)
|
||||
f (int): the function start address
|
||||
insn (lancelot.Instruction): call instruction to resolve
|
||||
returns:
|
||||
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
|
||||
raises:
|
||||
NotFoundError: when the definition cannot be found.
|
||||
"""
|
||||
assert insn.mnemonic == "call"
|
||||
op0 = insn.operands[0]
|
||||
assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||
reg = op0[REGISTER_OPERAND_REGISTER]
|
||||
|
||||
cfg = ws.build_cfg(f)
|
||||
preds = build_instruction_predecessors(ws, cfg)
|
||||
|
||||
q = collections.deque()
|
||||
seen = set([])
|
||||
q.extend(preds[insn.address])
|
||||
while q:
|
||||
cur = q.popleft()
|
||||
|
||||
# skip if we've already processed this location
|
||||
if cur in seen:
|
||||
continue
|
||||
seen.add(cur)
|
||||
|
||||
insn = ws.read_insn(cur)
|
||||
operands = insn.operands
|
||||
|
||||
if len(operands) == 0:
|
||||
q.extend(preds[cur])
|
||||
continue
|
||||
|
||||
op0 = operands[0]
|
||||
if not (
|
||||
op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||
and op0[REGISTER_OPERAND_REGISTER] == reg
|
||||
and insn.mnemonic in DESTRUCTIVE_MNEMONICS
|
||||
):
|
||||
q.extend(preds[cur])
|
||||
continue
|
||||
|
||||
# if we reach here, the instruction is destructive to our target register.
|
||||
|
||||
# we currently only support extracting the constant from something like: `mov $reg, IAT`
|
||||
# so, any other pattern results in an unknown value, represented by None.
|
||||
# this is a good place to extend in the future, if we need more robust support.
|
||||
if insn.mnemonic != "mov":
|
||||
return (cur, None)
|
||||
else:
|
||||
op1 = operands[1]
|
||||
try:
|
||||
target = get_operand_target(insn, op1)
|
||||
except ValueError:
|
||||
return (cur, None)
|
||||
else:
|
||||
return (cur, target)
|
||||
|
||||
raise NotFoundError()
|
||||
|
||||
|
||||
def is_indirect_call(insn):
|
||||
return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||
|
||||
|
||||
def resolve_indirect_call(ws, f, insn):
|
||||
"""
|
||||
inspect the given indirect call instruction and attempt to resolve the target address.
|
||||
args:
|
||||
ws (lancelot.PE): the analysis workspace
|
||||
f (int): the address of the function to analyze
|
||||
insn (lancelot.Instruction): the instruction at which to start analysis
|
||||
returns:
|
||||
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
|
||||
raises:
|
||||
NotFoundError: when the definition cannot be found.
|
||||
"""
|
||||
assert is_indirect_call(insn)
|
||||
return find_definition(ws, f, insn)
|
||||
@@ -1,487 +0,0 @@
|
||||
import logging
|
||||
import itertools
|
||||
|
||||
import pefile
|
||||
|
||||
try:
|
||||
from functools import lru_cache
|
||||
except ImportError:
|
||||
from backports.functools_lru_cache import lru_cache
|
||||
|
||||
from lancelot import (
|
||||
OPERAND_TYPE,
|
||||
PERMISSION_READ,
|
||||
MEMORY_OPERAND_BASE,
|
||||
MEMORY_OPERAND_DISP,
|
||||
OPERAND_TYPE_MEMORY,
|
||||
MEMORY_OPERAND_INDEX,
|
||||
OPERAND_TYPE_REGISTER,
|
||||
MEMORY_OPERAND_SEGMENT,
|
||||
OPERAND_TYPE_IMMEDIATE,
|
||||
IMMEDIATE_OPERAND_VALUE,
|
||||
REGISTER_OPERAND_REGISTER,
|
||||
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||
)
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
|
||||
from capa.features.insn import Number, Offset, Mnemonic
|
||||
from capa.features.extractors.lancelot.helpers import get_operand_target
|
||||
from capa.features.extractors.lancelot.function import get_call_graph
|
||||
from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
|
||||
|
||||
def get_arch(ws):
|
||||
if ws.arch == "x32":
|
||||
return ARCH_X32
|
||||
elif ws.arch == "x64":
|
||||
return ARCH_X64
|
||||
else:
|
||||
raise ValueError("unexpected architecture")
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_pefile(xtor):
|
||||
return pefile.PE(data=xtor.buf)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_imports(xtor):
|
||||
pe = get_pefile(xtor)
|
||||
|
||||
imports = {}
|
||||
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||
libname = entry.dll.decode("ascii").lower().partition(".")[0]
|
||||
for imp in entry.imports:
|
||||
if imp.ordinal:
|
||||
imports[imp.address] = "%s.#%s" % (libname, imp.ordinal)
|
||||
else:
|
||||
impname = imp.name.decode("ascii")
|
||||
imports[imp.address] = "%s.%s" % (libname, impname)
|
||||
return imports
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_thunks(xtor):
|
||||
thunks = {}
|
||||
for va in xtor.ws.get_functions():
|
||||
try:
|
||||
insn = xtor.ws.read_insn(va)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if insn.mnemonic != "jmp":
|
||||
continue
|
||||
|
||||
op0 = insn.operands[0]
|
||||
|
||||
try:
|
||||
target = get_operand_target(insn, op0)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target not in imports:
|
||||
continue
|
||||
|
||||
thunks[va] = imports[target]
|
||||
|
||||
return thunks
|
||||
|
||||
|
||||
def extract_insn_api_features(xtor, f, bb, insn):
|
||||
"""parse API features from the given instruction."""
|
||||
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
op0 = insn.operands[0]
|
||||
|
||||
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
|
||||
try:
|
||||
(_, target) = resolve_indirect_call(xtor.ws, f, insn)
|
||||
except NotFoundError:
|
||||
return
|
||||
if target is None:
|
||||
return
|
||||
else:
|
||||
try:
|
||||
target = get_operand_target(insn, op0)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target in imports:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
|
||||
yield feature, va
|
||||
return
|
||||
|
||||
thunks = get_thunks(xtor)
|
||||
if target in thunks:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
|
||||
yield feature, va
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(xtor, f, bb, insn):
|
||||
"""parse mnemonic features from the given instruction."""
|
||||
yield Mnemonic(insn.mnemonic), insn.address
|
||||
|
||||
|
||||
def extract_insn_number_features(xtor, f, bb, insn):
|
||||
"""parse number features from the given instruction."""
|
||||
operands = insn.operands
|
||||
|
||||
for operand in operands:
|
||||
if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
|
||||
continue
|
||||
|
||||
v = operand[IMMEDIATE_OPERAND_VALUE]
|
||||
|
||||
if xtor.ws.probe(v) & PERMISSION_READ:
|
||||
# v is a valid address
|
||||
# therefore, assume its not also a constant.
|
||||
continue
|
||||
|
||||
if (
|
||||
insn.mnemonic == "add"
|
||||
and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||
and operands[0][REGISTER_OPERAND_REGISTER] == "esp"
|
||||
):
|
||||
# skip things like:
|
||||
#
|
||||
# .text:00401140 call sub_407E2B
|
||||
# .text:00401145 add esp, 0Ch
|
||||
return
|
||||
|
||||
yield Number(v), insn.address
|
||||
yield Number(v, arch=get_arch(xtor.ws)), insn.address
|
||||
|
||||
|
||||
def extract_insn_offset_features(xtor, f, bb, insn):
|
||||
"""parse structure offset features from the given instruction."""
|
||||
operands = insn.operands
|
||||
|
||||
for operand in operands:
|
||||
if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
|
||||
continue
|
||||
|
||||
if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"):
|
||||
continue
|
||||
|
||||
# lancelot provides `None` when the displacement is not present.
|
||||
v = operand[MEMORY_OPERAND_DISP] or 0
|
||||
|
||||
yield Offset(v), insn.address
|
||||
yield Offset(v, arch=get_arch(xtor.ws)), insn.address
|
||||
|
||||
|
||||
def derefs(xtor, p):
|
||||
"""
|
||||
recursively follow the given pointer, yielding the valid memory addresses along the way.
|
||||
useful when you may have a pointer to string, or pointer to pointer to string, etc.
|
||||
this is a "do what i mean" type of helper function.
|
||||
"""
|
||||
|
||||
depth = 0
|
||||
while True:
|
||||
if not xtor.ws.probe(p) & PERMISSION_READ:
|
||||
return
|
||||
yield p
|
||||
|
||||
next = xtor.ws.read_pointer(p)
|
||||
|
||||
# sanity: pointer points to self
|
||||
if next == p:
|
||||
return
|
||||
|
||||
# sanity: avoid chains of pointers that are unreasonably deep
|
||||
depth += 1
|
||||
if depth > 10:
|
||||
return
|
||||
|
||||
p = next
|
||||
|
||||
|
||||
def read_bytes(xtor, va):
|
||||
"""
|
||||
read up to MAX_BYTES_FEATURE_SIZE from the given address.
|
||||
|
||||
raises:
|
||||
ValueError: if the given address is not valid.
|
||||
"""
|
||||
start = va
|
||||
end = va + MAX_BYTES_FEATURE_SIZE
|
||||
pe = get_pefile(xtor)
|
||||
|
||||
for section in pe.sections:
|
||||
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
|
||||
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
|
||||
|
||||
if section_start <= start < section_end:
|
||||
end = min(end, section_end)
|
||||
return xtor.ws.read_bytes(start, end - start)
|
||||
|
||||
raise ValueError("invalid address")
|
||||
|
||||
|
||||
# these are mnemonics that may flow (jump) elsewhere
|
||||
FLOW_MNEMONICS = set(
|
||||
[
|
||||
"call",
|
||||
"jb",
|
||||
"jbe",
|
||||
"jcxz",
|
||||
"jecxz",
|
||||
"jknzd",
|
||||
"jkzd",
|
||||
"jl",
|
||||
"jle",
|
||||
"jmp",
|
||||
"jnb",
|
||||
"jnbe",
|
||||
"jnl",
|
||||
"jnle",
|
||||
"jno",
|
||||
"jnp",
|
||||
"jns",
|
||||
"jnz",
|
||||
"jo",
|
||||
"jp",
|
||||
"jrcxz",
|
||||
"js",
|
||||
"jz",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def extract_insn_bytes_features(xtor, f, bb, insn):
|
||||
"""
|
||||
parse byte sequence features from the given instruction.
|
||||
"""
|
||||
if insn.mnemonic in FLOW_MNEMONICS:
|
||||
return
|
||||
|
||||
for operand in insn.operands:
|
||||
try:
|
||||
target = get_operand_target(insn, operand)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
for ptr in derefs(xtor, target):
|
||||
try:
|
||||
buf = read_bytes(xtor, ptr)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if capa.features.extractors.helpers.all_zeros(buf):
|
||||
continue
|
||||
|
||||
yield Bytes(buf), insn.address
|
||||
|
||||
|
||||
def first(s):
|
||||
"""enumerate the first element in the sequence"""
|
||||
for i in s:
|
||||
yield i
|
||||
break
|
||||
|
||||
|
||||
def extract_insn_string_features(xtor, f, bb, insn):
|
||||
"""parse string features from the given instruction."""
|
||||
for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn):
|
||||
buf = bytez.value
|
||||
|
||||
for s in itertools.chain(
|
||||
first(capa.features.extractors.strings.extract_ascii_strings(buf)),
|
||||
first(capa.features.extractors.strings.extract_unicode_strings(buf)),
|
||||
):
|
||||
if s.offset == 0:
|
||||
yield String(s.s), va
|
||||
|
||||
|
||||
def is_security_cookie(xtor, f, bb, insn):
|
||||
"""
|
||||
check if an instruction is related to security cookie checks
|
||||
"""
|
||||
op1 = insn.operands[1]
|
||||
if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in (
|
||||
"esp",
|
||||
"ebp",
|
||||
"rbp",
|
||||
"rsp",
|
||||
):
|
||||
return False
|
||||
|
||||
# expect security cookie init in first basic block within first bytes (instructions)
|
||||
if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA):
|
||||
return True
|
||||
|
||||
# ... or within last bytes (instructions) before a return
|
||||
insns = list(xtor.get_instructions(f, bb))
|
||||
if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn):
|
||||
"""
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
if insn.mnemonic != "xor":
|
||||
return
|
||||
|
||||
operands = insn.operands
|
||||
if operands[0] == operands[1]:
|
||||
return
|
||||
|
||||
if is_security_cookie(xtor, f, bb, insn):
|
||||
return
|
||||
|
||||
yield Characteristic("nzxor"), insn.address
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn):
|
||||
"""
|
||||
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||
"""
|
||||
for operand in insn.operands:
|
||||
if (
|
||||
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
|
||||
and operand[MEMORY_OPERAND_SEGMENT] == "gs"
|
||||
and operand[MEMORY_OPERAND_DISP] == 0x60
|
||||
):
|
||||
yield Characteristic("peb access"), insn.address
|
||||
|
||||
if (
|
||||
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
|
||||
and operand[MEMORY_OPERAND_SEGMENT] == "fs"
|
||||
and operand[MEMORY_OPERAND_DISP] == 0x30
|
||||
):
|
||||
yield Characteristic("peb access"), insn.address
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(xtor, f, bb, insn):
|
||||
""" parse the instruction for access to fs or gs """
|
||||
for operand in insn.operands:
|
||||
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs":
|
||||
yield Characteristic("gs access"), insn.address
|
||||
|
||||
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs":
|
||||
yield Characteristic("fs access"), insn.address
|
||||
|
||||
|
||||
def get_section(xtor, va):
|
||||
pe = get_pefile(xtor)
|
||||
|
||||
for i, section in enumerate(pe.sections):
|
||||
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
|
||||
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
|
||||
|
||||
if section_start <= va < section_end:
|
||||
return i
|
||||
|
||||
raise ValueError("invalid address")
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(xtor, f, bb, insn):
|
||||
"""
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
if insn.mnemonic not in FLOW_MNEMONICS:
|
||||
return
|
||||
|
||||
try:
|
||||
target = get_operand_target(insn, insn.operands[0])
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
if target in get_imports(xtor):
|
||||
return
|
||||
|
||||
try:
|
||||
if get_section(xtor, insn.address) != get_section(xtor, target):
|
||||
yield Characteristic("cross section flow"), insn.address
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
|
||||
def extract_function_calls_from(xtor, f, bb, insn):
|
||||
cg = get_call_graph(xtor.ws)
|
||||
|
||||
for callee in cg.calls_from.get(insn.address, []):
|
||||
yield Characteristic("calls from"), callee
|
||||
|
||||
if callee == f:
|
||||
yield Characteristic("recursive call"), insn.address
|
||||
|
||||
# lancelot doesn't count API calls when constructing the call graph
|
||||
# so we still have to scan for calls to an import
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
try:
|
||||
target = get_operand_target(insn, insn.operands[0])
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target in imports:
|
||||
yield Characteristic("calls from"), target
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function or basic block scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn):
|
||||
"""
|
||||
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
"""
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
op0 = insn.operands[0]
|
||||
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
|
||||
yield Characteristic("indirect call"), insn.address
|
||||
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None:
|
||||
yield Characteristic("indirect call"), insn.address
|
||||
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None:
|
||||
yield Characteristic("indirect call"), insn.address
|
||||
|
||||
|
||||
_not_implemented = set([])
|
||||
|
||||
|
||||
def extract_insn_features(xtor, f, bb, insn):
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
try:
|
||||
for feature, va in insn_handler(xtor, f, bb, insn):
|
||||
yield feature, va
|
||||
except NotImplementedError:
|
||||
if insn_handler.__name__ not in _not_implemented:
|
||||
logger.warning("not implemented: %s", insn_handler.__name__)
|
||||
_not_implemented.add(insn_handler.__name__)
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
extract_insn_number_features,
|
||||
extract_insn_string_features,
|
||||
extract_insn_bytes_features,
|
||||
extract_insn_offset_features,
|
||||
extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
extract_insn_peb_access_characteristic_features,
|
||||
extract_insn_cross_section_cflow,
|
||||
extract_insn_segment_access_features,
|
||||
extract_function_calls_from,
|
||||
extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
@@ -11,14 +11,14 @@ from networkx.algorithms.components import strongly_connected_components
|
||||
|
||||
|
||||
def has_loop(edges, threshold=2):
|
||||
""" check if a list of edges representing a directed graph contains a loop
|
||||
"""check if a list of edges representing a directed graph contains a loop
|
||||
|
||||
args:
|
||||
edges: list of edge sets representing a directed graph i.e. [(1, 2), (2, 1)]
|
||||
threshold: min number of nodes contained in loop
|
||||
args:
|
||||
edges: list of edge sets representing a directed graph i.e. [(1, 2), (2, 1)]
|
||||
threshold: min number of nodes contained in loop
|
||||
|
||||
returns:
|
||||
bool
|
||||
returns:
|
||||
bool
|
||||
"""
|
||||
g = nx.DiGraph()
|
||||
g.add_edges_from(edges)
|
||||
|
||||
@@ -8,7 +8,11 @@
|
||||
|
||||
import types
|
||||
|
||||
import file
|
||||
import insn
|
||||
import function
|
||||
import viv_utils
|
||||
import basicblock
|
||||
|
||||
import capa.features.extractors
|
||||
import capa.features.extractors.viv.file
|
||||
|
||||
@@ -84,7 +84,16 @@ def dumps(extractor):
|
||||
returns:
|
||||
str: the serialized features.
|
||||
"""
|
||||
ret = {"version": 1, "functions": {}, "scopes": {"file": [], "function": [], "basic block": [], "instruction": [],}}
|
||||
ret = {
|
||||
"version": 1,
|
||||
"functions": {},
|
||||
"scopes": {
|
||||
"file": [],
|
||||
"function": [],
|
||||
"basic block": [],
|
||||
"instruction": [],
|
||||
},
|
||||
}
|
||||
|
||||
for feature, va in extractor.extract_file_features():
|
||||
ret["scopes"]["file"].append(serialize_feature(feature) + (hex(va), ()))
|
||||
@@ -99,7 +108,16 @@ def dumps(extractor):
|
||||
ret["functions"][hex(f)][hex(bb)] = []
|
||||
|
||||
for feature, va in extractor.extract_basic_block_features(f, bb):
|
||||
ret["scopes"]["basic block"].append(serialize_feature(feature) + (hex(va), (hex(f), hex(bb),)))
|
||||
ret["scopes"]["basic block"].append(
|
||||
serialize_feature(feature)
|
||||
+ (
|
||||
hex(va),
|
||||
(
|
||||
hex(f),
|
||||
hex(bb),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
for insnva, insn in sorted(
|
||||
[(insn.__int__(), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0]
|
||||
@@ -108,7 +126,15 @@ def dumps(extractor):
|
||||
|
||||
for feature, va in extractor.extract_insn_features(f, bb, insn):
|
||||
ret["scopes"]["instruction"].append(
|
||||
serialize_feature(feature) + (hex(va), (hex(f), hex(bb), hex(insnva),))
|
||||
serialize_feature(feature)
|
||||
+ (
|
||||
hex(va),
|
||||
(
|
||||
hex(f),
|
||||
hex(bb),
|
||||
hex(insnva),
|
||||
),
|
||||
)
|
||||
)
|
||||
return json.dumps(ret)
|
||||
|
||||
|
||||
@@ -24,10 +24,7 @@ class Number(Feature):
|
||||
super(Number, self).__init__(value, arch=arch, description=description)
|
||||
|
||||
def get_value_str(self):
|
||||
if self.value < 0:
|
||||
return "-0x%X" % (-self.value)
|
||||
else:
|
||||
return "0x%X" % self.value
|
||||
return "0x%X" % self.value
|
||||
|
||||
|
||||
class Offset(Feature):
|
||||
|
||||
@@ -17,9 +17,9 @@ import capa.ida.helpers
|
||||
|
||||
|
||||
def info_to_name(display):
|
||||
""" extract root value from display name
|
||||
"""extract root value from display name
|
||||
|
||||
e.g. function(my_function) => my_function
|
||||
e.g. function(my_function) => my_function
|
||||
"""
|
||||
try:
|
||||
return display.split("(")[1].rstrip(")")
|
||||
@@ -68,16 +68,16 @@ class CapaExplorerDataItem(object):
|
||||
return self._checked
|
||||
|
||||
def appendChild(self, item):
|
||||
""" add child item
|
||||
"""add child item
|
||||
|
||||
@param item: CapaExplorerDataItem*
|
||||
@param item: CapaExplorerDataItem*
|
||||
"""
|
||||
self.children.append(item)
|
||||
|
||||
def child(self, row):
|
||||
""" get child row
|
||||
"""get child row
|
||||
|
||||
@param row: TODO
|
||||
@param row: TODO
|
||||
"""
|
||||
return self.children[row]
|
||||
|
||||
|
||||
@@ -65,11 +65,11 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
self.endResetModel()
|
||||
|
||||
def columnCount(self, model_index):
|
||||
""" get the number of columns for the children of the given parent
|
||||
"""get the number of columns for the children of the given parent
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
|
||||
@retval column count
|
||||
@retval column count
|
||||
"""
|
||||
if model_index.isValid():
|
||||
return model_index.internalPointer().columnCount()
|
||||
@@ -77,12 +77,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return self.root_node.columnCount()
|
||||
|
||||
def data(self, model_index, role):
|
||||
""" get data stored under the given role for the item referred to by the index
|
||||
"""get data stored under the given role for the item referred to by the index
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param role: QtCore.Qt.*
|
||||
@param model_index: QModelIndex*
|
||||
@param role: QtCore.Qt.*
|
||||
|
||||
@retval data to be displayed
|
||||
@retval data to be displayed
|
||||
"""
|
||||
if not model_index.isValid():
|
||||
return None
|
||||
@@ -151,11 +151,11 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return None
|
||||
|
||||
def flags(self, model_index):
|
||||
""" get item flags for given index
|
||||
"""get item flags for given index
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
|
||||
@retval QtCore.Qt.ItemFlags
|
||||
@retval QtCore.Qt.ItemFlags
|
||||
"""
|
||||
if not model_index.isValid():
|
||||
return QtCore.Qt.NoItemFlags
|
||||
@@ -163,13 +163,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return model_index.internalPointer().flags
|
||||
|
||||
def headerData(self, section, orientation, role):
|
||||
""" get data for the given role and section in the header with the specified orientation
|
||||
"""get data for the given role and section in the header with the specified orientation
|
||||
|
||||
@param section: int
|
||||
@param orientation: QtCore.Qt.Orientation
|
||||
@param role: QtCore.Qt.DisplayRole
|
||||
@param section: int
|
||||
@param orientation: QtCore.Qt.Orientation
|
||||
@param role: QtCore.Qt.DisplayRole
|
||||
|
||||
@retval header data list()
|
||||
@retval header data list()
|
||||
"""
|
||||
if orientation == QtCore.Qt.Horizontal and role == QtCore.Qt.DisplayRole:
|
||||
return self.root_node.data(section)
|
||||
@@ -177,13 +177,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return None
|
||||
|
||||
def index(self, row, column, parent):
|
||||
""" get index of the item in the model specified by the given row, column and parent index
|
||||
"""get index of the item in the model specified by the given row, column and parent index
|
||||
|
||||
@param row: int
|
||||
@param column: int
|
||||
@param parent: QModelIndex*
|
||||
@param row: int
|
||||
@param column: int
|
||||
@param parent: QModelIndex*
|
||||
|
||||
@retval QModelIndex*
|
||||
@retval QModelIndex*
|
||||
"""
|
||||
if not self.hasIndex(row, column, parent):
|
||||
return QtCore.QModelIndex()
|
||||
@@ -201,13 +201,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return QtCore.QModelIndex()
|
||||
|
||||
def parent(self, model_index):
|
||||
""" get parent of the model item with the given index
|
||||
"""get parent of the model item with the given index
|
||||
|
||||
if the item has no parent, an invalid QModelIndex* is returned
|
||||
if the item has no parent, an invalid QModelIndex* is returned
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
|
||||
@retval QModelIndex*
|
||||
@retval QModelIndex*
|
||||
"""
|
||||
if not model_index.isValid():
|
||||
return QtCore.QModelIndex()
|
||||
@@ -221,12 +221,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return self.createIndex(parent.row(), 0, parent)
|
||||
|
||||
def iterateChildrenIndexFromRootIndex(self, model_index, ignore_root=True):
|
||||
""" depth-first traversal of child nodes
|
||||
"""depth-first traversal of child nodes
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param ignore_root: if set, do not return root index
|
||||
@param model_index: QModelIndex*
|
||||
@param ignore_root: if set, do not return root index
|
||||
|
||||
@retval yield QModelIndex*
|
||||
@retval yield QModelIndex*
|
||||
"""
|
||||
visited = set()
|
||||
stack = deque((model_index,))
|
||||
@@ -248,10 +248,10 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
stack.append(child_index.child(idx, 0))
|
||||
|
||||
def reset_ida_highlighting(self, item, checked):
|
||||
""" reset IDA highlight for an item
|
||||
"""reset IDA highlight for an item
|
||||
|
||||
@param item: capa explorer item
|
||||
@param checked: indicates item is or not checked
|
||||
@param item: capa explorer item
|
||||
@param checked: indicates item is or not checked
|
||||
"""
|
||||
if not isinstance(
|
||||
item, (CapaExplorerStringViewItem, CapaExplorerInstructionViewItem, CapaExplorerByteViewItem)
|
||||
@@ -275,13 +275,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
idc.set_color(item.location, idc.CIC_ITEM, item.ida_highlight)
|
||||
|
||||
def setData(self, model_index, value, role):
|
||||
""" set the role data for the item at index to value
|
||||
"""set the role data for the item at index to value
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param value: QVariant*
|
||||
@param role: QtCore.Qt.EditRole
|
||||
@param model_index: QModelIndex*
|
||||
@param value: QVariant*
|
||||
@param role: QtCore.Qt.EditRole
|
||||
|
||||
@retval True/False
|
||||
@retval True/False
|
||||
"""
|
||||
if not model_index.isValid():
|
||||
return False
|
||||
@@ -316,14 +316,14 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return False
|
||||
|
||||
def rowCount(self, model_index):
|
||||
""" get the number of rows under the given parent
|
||||
"""get the number of rows under the given parent
|
||||
|
||||
when the parent is valid it means that is returning the number of
|
||||
children of parent
|
||||
when the parent is valid it means that is returning the number of
|
||||
children of parent
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
|
||||
@retval row count
|
||||
@retval row count
|
||||
"""
|
||||
if model_index.column() > 0:
|
||||
return 0
|
||||
@@ -336,24 +336,30 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return item.childCount()
|
||||
|
||||
def render_capa_doc_statement_node(self, parent, statement, locations, doc):
|
||||
""" render capa statement read from doc
|
||||
"""render capa statement read from doc
|
||||
|
||||
@param parent: parent to which new child is assigned
|
||||
@param statement: statement read from doc
|
||||
@param locations: locations of children (applies to range only?)
|
||||
@param doc: capa result doc
|
||||
@param parent: parent to which new child is assigned
|
||||
@param statement: statement read from doc
|
||||
@param locations: locations of children (applies to range only?)
|
||||
@param doc: capa result doc
|
||||
|
||||
"statement": {
|
||||
"type": "or"
|
||||
},
|
||||
"statement": {
|
||||
"type": "or"
|
||||
},
|
||||
"""
|
||||
if statement["type"] in ("and", "or", "optional"):
|
||||
return CapaExplorerDefaultItem(parent, statement["type"])
|
||||
display = statement["type"]
|
||||
if statement.get("description"):
|
||||
display += " (%s)" % statement["description"]
|
||||
return CapaExplorerDefaultItem(parent, display)
|
||||
elif statement["type"] == "not":
|
||||
# TODO: do we display 'not'
|
||||
pass
|
||||
elif statement["type"] == "some":
|
||||
return CapaExplorerDefaultItem(parent, str(statement["count"]) + " or more")
|
||||
display = "%d or more" % statement["count"]
|
||||
if statement.get("description"):
|
||||
display += " (%s)" % statement["description"]
|
||||
return CapaExplorerDefaultItem(parent, display)
|
||||
elif statement["type"] == "range":
|
||||
# `range` is a weird node, its almost a hybrid of statement + feature.
|
||||
# it is a specific feature repeated multiple times.
|
||||
@@ -370,6 +376,9 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
else:
|
||||
display += "between %d and %d" % (statement["min"], statement["max"])
|
||||
|
||||
if statement.get("description"):
|
||||
display += " (%s)" % statement["description"]
|
||||
|
||||
parent2 = CapaExplorerFeatureItem(parent, display=display)
|
||||
|
||||
for location in locations:
|
||||
@@ -378,33 +387,36 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
|
||||
return parent2
|
||||
elif statement["type"] == "subscope":
|
||||
return CapaExplorerSubscopeItem(parent, statement[statement["type"]])
|
||||
display = statement[statement["type"]]
|
||||
if statement.get("description"):
|
||||
display += " (%s)" % statement["description"]
|
||||
return CapaExplorerSubscopeItem(parent, display)
|
||||
else:
|
||||
raise RuntimeError("unexpected match statement type: " + str(statement))
|
||||
|
||||
def render_capa_doc_match(self, parent, match, doc):
|
||||
""" render capa match read from doc
|
||||
"""render capa match read from doc
|
||||
|
||||
@param parent: parent node to which new child is assigned
|
||||
@param match: match read from doc
|
||||
@param doc: capa result doc
|
||||
@param parent: parent node to which new child is assigned
|
||||
@param match: match read from doc
|
||||
@param doc: capa result doc
|
||||
|
||||
"matches": {
|
||||
"0": {
|
||||
"children": [],
|
||||
"locations": [
|
||||
4317184
|
||||
],
|
||||
"node": {
|
||||
"feature": {
|
||||
"section": ".rsrc",
|
||||
"type": "section"
|
||||
},
|
||||
"type": "feature"
|
||||
"matches": {
|
||||
"0": {
|
||||
"children": [],
|
||||
"locations": [
|
||||
4317184
|
||||
],
|
||||
"node": {
|
||||
"feature": {
|
||||
"section": ".rsrc",
|
||||
"type": "section"
|
||||
},
|
||||
"success": true
|
||||
}
|
||||
},
|
||||
"type": "feature"
|
||||
},
|
||||
"success": true
|
||||
}
|
||||
},
|
||||
"""
|
||||
if not match["success"]:
|
||||
# TODO: display failed branches at some point? Help with debugging rules?
|
||||
@@ -431,9 +443,9 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
self.render_capa_doc_match(parent2, child, doc)
|
||||
|
||||
def render_capa_doc(self, doc):
|
||||
""" render capa features specified in doc
|
||||
"""render capa features specified in doc
|
||||
|
||||
@param doc: capa result doc
|
||||
@param doc: capa result doc
|
||||
"""
|
||||
# inform model that changes are about to occur
|
||||
self.beginResetModel()
|
||||
@@ -457,18 +469,18 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
self.endResetModel()
|
||||
|
||||
def capa_doc_feature_to_display(self, feature):
|
||||
""" convert capa doc feature type string to display string for ui
|
||||
"""convert capa doc feature type string to display string for ui
|
||||
|
||||
@param feature: capa feature read from doc
|
||||
@param feature: capa feature read from doc
|
||||
|
||||
Example:
|
||||
"feature": {
|
||||
"bytes": "01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46",
|
||||
"description": "CLSID_ShellLink",
|
||||
"type": "bytes"
|
||||
}
|
||||
Example:
|
||||
"feature": {
|
||||
"bytes": "01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46",
|
||||
"description": "CLSID_ShellLink",
|
||||
"type": "bytes"
|
||||
}
|
||||
|
||||
bytes(01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_ShellLink)
|
||||
bytes(01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_ShellLink)
|
||||
"""
|
||||
if feature[feature["type"]]:
|
||||
if feature.get("description", ""):
|
||||
@@ -479,25 +491,31 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return "%s" % feature["type"]
|
||||
|
||||
def render_capa_doc_feature_node(self, parent, feature, locations, doc):
|
||||
""" process capa doc feature node
|
||||
"""process capa doc feature node
|
||||
|
||||
@param parent: parent node to which child is assigned
|
||||
@param feature: capa doc feature node
|
||||
@param locations: locations identified for feature
|
||||
@param doc: capa doc
|
||||
@param parent: parent node to which child is assigned
|
||||
@param feature: capa doc feature node
|
||||
@param locations: locations identified for feature
|
||||
@param doc: capa doc
|
||||
|
||||
Example:
|
||||
"feature": {
|
||||
"description": "FILE_WRITE_DATA",
|
||||
"number": "0x2",
|
||||
"type": "number"
|
||||
}
|
||||
Example:
|
||||
"feature": {
|
||||
"description": "FILE_WRITE_DATA",
|
||||
"number": "0x2",
|
||||
"type": "number"
|
||||
}
|
||||
"""
|
||||
display = self.capa_doc_feature_to_display(feature)
|
||||
|
||||
if len(locations) == 1:
|
||||
# only one location for feature so no need to nest children
|
||||
parent2 = self.render_capa_doc_feature(parent, feature, next(iter(locations)), doc, display=display,)
|
||||
parent2 = self.render_capa_doc_feature(
|
||||
parent,
|
||||
feature,
|
||||
next(iter(locations)),
|
||||
doc,
|
||||
display=display,
|
||||
)
|
||||
else:
|
||||
# feature has multiple children, nest under one parent feature node
|
||||
parent2 = CapaExplorerFeatureItem(parent, display)
|
||||
@@ -508,20 +526,20 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
return parent2
|
||||
|
||||
def render_capa_doc_feature(self, parent, feature, location, doc, display="-"):
|
||||
""" render capa feature read from doc
|
||||
"""render capa feature read from doc
|
||||
|
||||
@param parent: parent node to which new child is assigned
|
||||
@param feature: feature read from doc
|
||||
@param doc: capa feature doc
|
||||
@param location: address of feature
|
||||
@param display: text to display in plugin ui
|
||||
@param parent: parent node to which new child is assigned
|
||||
@param feature: feature read from doc
|
||||
@param doc: capa feature doc
|
||||
@param location: address of feature
|
||||
@param display: text to display in plugin ui
|
||||
|
||||
Example:
|
||||
"feature": {
|
||||
"description": "FILE_WRITE_DATA",
|
||||
"number": "0x2",
|
||||
"type": "number"
|
||||
}
|
||||
Example:
|
||||
"feature": {
|
||||
"description": "FILE_WRITE_DATA",
|
||||
"number": "0x2",
|
||||
"type": "number"
|
||||
}
|
||||
"""
|
||||
# special handling for characteristic pending type
|
||||
if feature["type"] == "characteristic":
|
||||
@@ -575,10 +593,10 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
raise RuntimeError("unexpected feature type: " + str(feature["type"]))
|
||||
|
||||
def update_function_name(self, old_name, new_name):
|
||||
""" update all instances of old function name with new function name
|
||||
"""update all instances of old function name with new function name
|
||||
|
||||
@param old_name: previous function name
|
||||
@param new_name: new function name
|
||||
@param old_name: previous function name
|
||||
@param new_name: new function name
|
||||
"""
|
||||
# create empty root index for search
|
||||
root_index = self.index(0, 0, QtCore.QModelIndex())
|
||||
|
||||
@@ -16,13 +16,16 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
|
||||
""" """
|
||||
super(CapaExplorerSortFilterProxyModel, self).__init__(parent)
|
||||
|
||||
self.min_ea = None
|
||||
self.max_ea = None
|
||||
|
||||
def lessThan(self, left, right):
|
||||
""" true if the value of the left item is less than value of right item
|
||||
"""true if the value of the left item is less than value of right item
|
||||
|
||||
@param left: QModelIndex*
|
||||
@param right: QModelIndex*
|
||||
@param left: QModelIndex*
|
||||
@param right: QModelIndex*
|
||||
|
||||
@retval True/False
|
||||
@retval True/False
|
||||
"""
|
||||
ldata = left.internalPointer().data(left.column())
|
||||
rdata = right.internalPointer().data(right.column())
|
||||
@@ -40,13 +43,13 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
|
||||
return ldata.lower() < rdata.lower()
|
||||
|
||||
def filterAcceptsRow(self, row, parent):
|
||||
""" true if the item in the row indicated by the given row and parent
|
||||
should be included in the model; otherwise returns false
|
||||
"""true if the item in the row indicated by the given row and parent
|
||||
should be included in the model; otherwise returns false
|
||||
|
||||
@param row: int
|
||||
@param parent: QModelIndex*
|
||||
@param row: int
|
||||
@param parent: QModelIndex*
|
||||
|
||||
@retval True/False
|
||||
@retval True/False
|
||||
"""
|
||||
if self.filter_accepts_row_self(row, parent):
|
||||
return True
|
||||
@@ -62,15 +65,6 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
|
||||
|
||||
return False
|
||||
|
||||
def add_single_string_filter(self, column, string):
|
||||
""" add fixed string filter
|
||||
|
||||
@param column: key column
|
||||
@param string: string to sort
|
||||
"""
|
||||
self.setFilterKeyColumn(column)
|
||||
self.setFilterFixedString(string)
|
||||
|
||||
def index_has_accepted_children(self, row, parent):
|
||||
""" """
|
||||
model_index = self.sourceModel().index(row, 0, parent)
|
||||
@@ -86,4 +80,33 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
|
||||
|
||||
def filter_accepts_row_self(self, row, parent):
|
||||
""" """
|
||||
return super(CapaExplorerSortFilterProxyModel, self).filterAcceptsRow(row, parent)
|
||||
# filter not set
|
||||
if self.min_ea is None and self.max_ea is None:
|
||||
return True
|
||||
|
||||
index = self.sourceModel().index(row, 0, parent)
|
||||
data = index.internalPointer().data(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS)
|
||||
|
||||
if not data:
|
||||
return False
|
||||
|
||||
ea = int(data, 16)
|
||||
|
||||
if self.min_ea <= ea and ea < self.max_ea:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def add_address_range_filter(self, min_ea, max_ea):
|
||||
""" """
|
||||
self.min_ea = min_ea
|
||||
self.max_ea = max_ea
|
||||
|
||||
self.setFilterKeyColumn(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS)
|
||||
self.invalidateFilter()
|
||||
|
||||
def reset_address_range_filter(self):
|
||||
""" """
|
||||
self.min_ea = None
|
||||
self.max_ea = None
|
||||
self.invalidateFilter()
|
||||
|
||||
@@ -15,13 +15,13 @@ from capa.ida.explorer.model import CapaExplorerDataModel
|
||||
|
||||
|
||||
class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
""" capa explorer QTreeView implementation
|
||||
"""capa explorer QTreeView implementation
|
||||
|
||||
view controls UI action responses and displays data from
|
||||
CapaExplorerDataModel
|
||||
view controls UI action responses and displays data from
|
||||
CapaExplorerDataModel
|
||||
|
||||
view does not modify CapaExplorerDataModel directly - data
|
||||
modifications should be implemented in CapaExplorerDataModel
|
||||
view does not modify CapaExplorerDataModel directly - data
|
||||
modifications should be implemented in CapaExplorerDataModel
|
||||
"""
|
||||
|
||||
def __init__(self, model, parent=None):
|
||||
@@ -54,12 +54,12 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
self.setStyleSheet("QTreeView::item {padding-right: 15 px;padding-bottom: 2 px;}")
|
||||
|
||||
def reset(self):
|
||||
""" reset user interface changes
|
||||
"""reset user interface changes
|
||||
|
||||
called when view should reset any user interface changes
|
||||
made since the last reset e.g. IDA window highlighting
|
||||
called when view should reset any user interface changes
|
||||
made since the last reset e.g. IDA window highlighting
|
||||
"""
|
||||
self.collapseAll()
|
||||
self.expandToDepth(0)
|
||||
self.resize_columns_to_content()
|
||||
|
||||
def resize_columns_to_content(self):
|
||||
@@ -67,31 +67,31 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
self.header().resizeSections(QtWidgets.QHeaderView.ResizeToContents)
|
||||
|
||||
def map_index_to_source_item(self, model_index):
|
||||
""" map proxy model index to source model item
|
||||
"""map proxy model index to source model item
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
|
||||
@retval QObject*
|
||||
@retval QObject*
|
||||
"""
|
||||
return self.model.mapToSource(model_index).internalPointer()
|
||||
|
||||
def send_data_to_clipboard(self, data):
|
||||
""" copy data to the clipboard
|
||||
"""copy data to the clipboard
|
||||
|
||||
@param data: data to be copied
|
||||
@param data: data to be copied
|
||||
"""
|
||||
clip = QtWidgets.QApplication.clipboard()
|
||||
clip.clear(mode=clip.Clipboard)
|
||||
clip.setText(data, mode=clip.Clipboard)
|
||||
|
||||
def new_action(self, display, data, slot):
|
||||
""" create action for context menu
|
||||
"""create action for context menu
|
||||
|
||||
@param display: text displayed to user in context menu
|
||||
@param data: data passed to slot
|
||||
@param slot: slot to connect
|
||||
@param display: text displayed to user in context menu
|
||||
@param data: data passed to slot
|
||||
@param slot: slot to connect
|
||||
|
||||
@retval QAction*
|
||||
@retval QAction*
|
||||
"""
|
||||
action = QtWidgets.QAction(display, self.parent)
|
||||
action.setData(data)
|
||||
@@ -100,11 +100,11 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
return action
|
||||
|
||||
def load_default_context_menu_actions(self, data):
|
||||
""" yield actions specific to function custom context menu
|
||||
"""yield actions specific to function custom context menu
|
||||
|
||||
@param data: tuple
|
||||
@param data: tuple
|
||||
|
||||
@yield QAction*
|
||||
@yield QAction*
|
||||
"""
|
||||
default_actions = (
|
||||
("Copy column", data, self.slot_copy_column),
|
||||
@@ -116,11 +116,11 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
yield self.new_action(*action)
|
||||
|
||||
def load_function_context_menu_actions(self, data):
|
||||
""" yield actions specific to function custom context menu
|
||||
"""yield actions specific to function custom context menu
|
||||
|
||||
@param data: tuple
|
||||
@param data: tuple
|
||||
|
||||
@yield QAction*
|
||||
@yield QAction*
|
||||
"""
|
||||
function_actions = (("Rename function", data, self.slot_rename_function),)
|
||||
|
||||
@@ -133,15 +133,15 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
yield action
|
||||
|
||||
def load_default_context_menu(self, pos, item, model_index):
|
||||
""" create default custom context menu
|
||||
"""create default custom context menu
|
||||
|
||||
creates custom context menu containing default actions
|
||||
creates custom context menu containing default actions
|
||||
|
||||
@param pos: TODO
|
||||
@param item: TODO
|
||||
@param model_index: TODO
|
||||
@param pos: TODO
|
||||
@param item: TODO
|
||||
@param model_index: TODO
|
||||
|
||||
@retval QMenu*
|
||||
@retval QMenu*
|
||||
"""
|
||||
menu = QtWidgets.QMenu()
|
||||
|
||||
@@ -151,16 +151,16 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
return menu
|
||||
|
||||
def load_function_item_context_menu(self, pos, item, model_index):
|
||||
""" create function custom context menu
|
||||
"""create function custom context menu
|
||||
|
||||
creates custom context menu containing actions specific to functions
|
||||
and the default actions
|
||||
creates custom context menu containing actions specific to functions
|
||||
and the default actions
|
||||
|
||||
@param pos: TODO
|
||||
@param item: TODO
|
||||
@param model_index: TODO
|
||||
@param pos: TODO
|
||||
@param item: TODO
|
||||
@param model_index: TODO
|
||||
|
||||
@retval QMenu*
|
||||
@retval QMenu*
|
||||
"""
|
||||
menu = QtWidgets.QMenu()
|
||||
|
||||
@@ -170,43 +170,43 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
return menu
|
||||
|
||||
def show_custom_context_menu(self, menu, pos):
|
||||
""" display custom context menu in view
|
||||
"""display custom context menu in view
|
||||
|
||||
@param menu: TODO
|
||||
@param pos: TODO
|
||||
@param menu: TODO
|
||||
@param pos: TODO
|
||||
"""
|
||||
if menu:
|
||||
menu.exec_(self.viewport().mapToGlobal(pos))
|
||||
|
||||
def slot_copy_column(self, action):
|
||||
""" slot connected to custom context menu
|
||||
"""slot connected to custom context menu
|
||||
|
||||
allows user to select a column and copy the data
|
||||
to clipboard
|
||||
allows user to select a column and copy the data
|
||||
to clipboard
|
||||
|
||||
@param action: QAction*
|
||||
@param action: QAction*
|
||||
"""
|
||||
_, item, model_index = action.data()
|
||||
self.send_data_to_clipboard(item.data(model_index.column()))
|
||||
|
||||
def slot_copy_row(self, action):
|
||||
""" slot connected to custom context menu
|
||||
"""slot connected to custom context menu
|
||||
|
||||
allows user to select a row and copy the space-delimited
|
||||
data to clipboard
|
||||
allows user to select a row and copy the space-delimited
|
||||
data to clipboard
|
||||
|
||||
@param action: QAction*
|
||||
@param action: QAction*
|
||||
"""
|
||||
_, item, _ = action.data()
|
||||
self.send_data_to_clipboard(str(item))
|
||||
|
||||
def slot_rename_function(self, action):
|
||||
""" slot connected to custom context menu
|
||||
"""slot connected to custom context menu
|
||||
|
||||
allows user to select a edit a function name and push
|
||||
changes to IDA
|
||||
allows user to select a edit a function name and push
|
||||
changes to IDA
|
||||
|
||||
@param action: QAction*
|
||||
@param action: QAction*
|
||||
"""
|
||||
_, item, model_index = action.data()
|
||||
|
||||
@@ -216,12 +216,12 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
item.setIsEditable(False)
|
||||
|
||||
def slot_custom_context_menu_requested(self, pos):
|
||||
""" slot connected to custom context menu request
|
||||
"""slot connected to custom context menu request
|
||||
|
||||
displays custom context menu to user containing action
|
||||
relevant to the data item selected
|
||||
displays custom context menu to user containing action
|
||||
relevant to the data item selected
|
||||
|
||||
@param pos: TODO
|
||||
@param pos: TODO
|
||||
"""
|
||||
model_index = self.indexAt(pos)
|
||||
|
||||
@@ -243,9 +243,9 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
|
||||
self.show_custom_context_menu(menu, pos)
|
||||
|
||||
def slot_double_click(self, model_index):
|
||||
""" slot connected to double click event
|
||||
"""slot connected to double click event
|
||||
|
||||
@param model_index: QModelIndex*
|
||||
@param model_index: QModelIndex*
|
||||
"""
|
||||
if not model_index.isValid():
|
||||
return
|
||||
|
||||
@@ -102,6 +102,9 @@ def collect_metadata():
|
||||
"sha256": sha256,
|
||||
"path": idaapi.get_input_file_path(),
|
||||
},
|
||||
"analysis": {"format": idaapi.get_file_type_name(), "extractor": "ida",},
|
||||
"analysis": {
|
||||
"format": idaapi.get_file_type_name(),
|
||||
"extractor": "ida",
|
||||
},
|
||||
"version": capa.version.__version__,
|
||||
}
|
||||
|
||||
@@ -30,10 +30,10 @@ logger = logging.getLogger("capa")
|
||||
|
||||
class CapaExplorerIdaHooks(idaapi.UI_Hooks):
|
||||
def __init__(self, screen_ea_changed_hook, action_hooks):
|
||||
""" facilitate IDA UI hooks
|
||||
"""facilitate IDA UI hooks
|
||||
|
||||
@param screen_ea_changed_hook: function hook for IDA screen ea changed
|
||||
@param action_hooks: dict of IDA action handles
|
||||
@param screen_ea_changed_hook: function hook for IDA screen ea changed
|
||||
@param action_hooks: dict of IDA action handles
|
||||
"""
|
||||
super(CapaExplorerIdaHooks, self).__init__()
|
||||
|
||||
@@ -43,11 +43,11 @@ class CapaExplorerIdaHooks(idaapi.UI_Hooks):
|
||||
self.process_action_meta = {}
|
||||
|
||||
def preprocess_action(self, name):
|
||||
""" called prior to action completed
|
||||
"""called prior to action completed
|
||||
|
||||
@param name: name of action defined by idagui.cfg
|
||||
@param name: name of action defined by idagui.cfg
|
||||
|
||||
@retval must be 0
|
||||
@retval must be 0
|
||||
"""
|
||||
self.process_action_handle = self.process_action_hooks.get(name, None)
|
||||
|
||||
@@ -66,10 +66,10 @@ class CapaExplorerIdaHooks(idaapi.UI_Hooks):
|
||||
self.reset()
|
||||
|
||||
def screen_ea_changed(self, curr_ea, prev_ea):
|
||||
""" called after screen location is changed
|
||||
"""called after screen location is changed
|
||||
|
||||
@param curr_ea: current location
|
||||
@param prev_ea: prev location
|
||||
@param curr_ea: current location
|
||||
@param prev_ea: prev location
|
||||
"""
|
||||
self.screen_ea_changed_hook(idaapi.get_current_widget(), curr_ea, prev_ea)
|
||||
|
||||
@@ -300,13 +300,13 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.ida_hooks.unhook()
|
||||
|
||||
def ida_hook_rename(self, meta, post=False):
|
||||
""" hook for IDA rename action
|
||||
"""hook for IDA rename action
|
||||
|
||||
called twice, once before action and once after
|
||||
action completes
|
||||
called twice, once before action and once after
|
||||
action completes
|
||||
|
||||
@param meta: metadata cache
|
||||
@param post: indicates pre or post action
|
||||
@param meta: metadata cache
|
||||
@param post: indicates pre or post action
|
||||
"""
|
||||
location = idaapi.get_screen_ea()
|
||||
if not location or not capa.ida.helpers.is_func_start(location):
|
||||
@@ -322,37 +322,27 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
meta["prev_name"] = curr_name
|
||||
|
||||
def ida_hook_screen_ea_changed(self, widget, new_ea, old_ea):
|
||||
""" hook for IDA screen ea changed
|
||||
"""hook for IDA screen ea changed
|
||||
|
||||
@param widget: IDA widget type
|
||||
@param new_ea: destination ea
|
||||
@param old_ea: source ea
|
||||
"""
|
||||
this hook is currently only relevant for limiting results displayed in the UI
|
||||
|
||||
@param widget: IDA widget type
|
||||
@param new_ea: destination ea
|
||||
@param old_ea: source ea
|
||||
"""
|
||||
if not self.view_limit_results_by_function.isChecked():
|
||||
# ignore if checkbox not selected
|
||||
# ignore if limit checkbox not selected
|
||||
return
|
||||
|
||||
if idaapi.get_widget_type(widget) != idaapi.BWN_DISASM:
|
||||
# ignore views other than asm
|
||||
# ignore views not the assembly view
|
||||
return
|
||||
|
||||
# attempt to map virtual addresses to function start addresses
|
||||
new_func_start = capa.ida.helpers.get_func_start_ea(new_ea)
|
||||
old_func_start = capa.ida.helpers.get_func_start_ea(old_ea)
|
||||
|
||||
if new_func_start and new_func_start == old_func_start:
|
||||
# navigated within the same function - do nothing
|
||||
if idaapi.get_func(new_ea) == idaapi.get_func(old_ea):
|
||||
# user navigated same function - ignore
|
||||
return
|
||||
|
||||
if new_func_start:
|
||||
# navigated to new function - filter for function start virtual address
|
||||
match = capa.ida.explorer.item.location_to_hex(new_func_start)
|
||||
else:
|
||||
# navigated to virtual address not in valid function - clear filter
|
||||
match = ""
|
||||
|
||||
# filter on virtual address to avoid updating filter string if function name is changed
|
||||
self.model_proxy.add_single_string_filter(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS, match)
|
||||
self.limit_results_to_function(idaapi.get_func(new_ea))
|
||||
self.view_tree.resize_columns_to_content()
|
||||
|
||||
def load_capa_results(self):
|
||||
@@ -508,9 +498,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
idaapi.info("%s reload completed." % PLUGIN_NAME)
|
||||
|
||||
def reset(self):
|
||||
""" reset UI elements
|
||||
"""reset UI elements
|
||||
|
||||
e.g. checkboxes and IDA highlighting
|
||||
e.g. checkboxes and IDA highlighting
|
||||
"""
|
||||
self.ida_reset()
|
||||
|
||||
@@ -518,31 +508,39 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
idaapi.info("%s reset completed." % PLUGIN_NAME)
|
||||
|
||||
def slot_menu_bar_hovered(self, action):
|
||||
""" display menu action tooltip
|
||||
"""display menu action tooltip
|
||||
|
||||
@param action: QtWidgets.QAction*
|
||||
@param action: QtWidgets.QAction*
|
||||
|
||||
@reference: https://stackoverflow.com/questions/21725119/why-wont-qtooltips-appear-on-qactions-within-a-qmenu
|
||||
@reference: https://stackoverflow.com/questions/21725119/why-wont-qtooltips-appear-on-qactions-within-a-qmenu
|
||||
"""
|
||||
QtWidgets.QToolTip.showText(
|
||||
QtGui.QCursor.pos(), action.toolTip(), self.view_menu_bar, self.view_menu_bar.actionGeometry(action)
|
||||
)
|
||||
|
||||
def slot_checkbox_limit_by_changed(self):
|
||||
""" slot activated if checkbox clicked
|
||||
"""slot activated if checkbox clicked
|
||||
|
||||
if checked, configure function filter if screen location is located
|
||||
in function, otherwise clear filter
|
||||
if checked, configure function filter if screen location is located
|
||||
in function, otherwise clear filter
|
||||
"""
|
||||
match = ""
|
||||
if self.view_limit_results_by_function.isChecked():
|
||||
location = capa.ida.helpers.get_func_start_ea(idaapi.get_screen_ea())
|
||||
if location:
|
||||
match = capa.ida.explorer.item.location_to_hex(location)
|
||||
self.limit_results_to_function(idaapi.get_func(idaapi.get_screen_ea()))
|
||||
else:
|
||||
self.model_proxy.reset_address_range_filter()
|
||||
|
||||
self.model_proxy.add_single_string_filter(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS, match)
|
||||
self.view_tree.reset()
|
||||
|
||||
self.view_tree.resize_columns_to_content()
|
||||
def limit_results_to_function(self, f):
|
||||
"""add filter to limit results to current function
|
||||
|
||||
@param f: (IDA func_t)
|
||||
"""
|
||||
if f:
|
||||
self.model_proxy.add_address_range_filter(f.start_ea, f.end_ea)
|
||||
else:
|
||||
# if function not exists don't display any results (address should not be -1)
|
||||
self.model_proxy.add_address_range_filter(-1, -1)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
28
capa/main.py
28
capa/main.py
@@ -32,7 +32,7 @@ import capa.features.extractors
|
||||
from capa.helpers import oint, get_file_taste
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SUPPORTED_FILE_MAGIC = set([b"MZ"])
|
||||
SUPPORTED_FILE_MAGIC = set(["MZ"])
|
||||
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
@@ -105,7 +105,12 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
|
||||
all_function_matches = collections.defaultdict(list)
|
||||
all_bb_matches = collections.defaultdict(list)
|
||||
|
||||
meta = {"feature_counts": {"file": 0, "functions": {},}}
|
||||
meta = {
|
||||
"feature_counts": {
|
||||
"file": 0,
|
||||
"functions": {},
|
||||
}
|
||||
}
|
||||
|
||||
for f in tqdm.tqdm(list(extractor.get_functions()), disable=disable_progress, desc="matching", unit=" functions"):
|
||||
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
||||
@@ -290,24 +295,7 @@ class UnsupportedRuntimeError(RuntimeError):
|
||||
|
||||
|
||||
def get_extractor_py3(path, format, disable_progress=False):
|
||||
try:
|
||||
import lancelot
|
||||
|
||||
import capa.features.extractors.lancelot
|
||||
except ImportError:
|
||||
logger.warning("lancelot not installed")
|
||||
raise UnsupportedRuntimeError()
|
||||
|
||||
if format not in ("pe", "auto"):
|
||||
raise UnsupportedFormatError(format)
|
||||
|
||||
if not is_supported_file_type(path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
|
||||
raise UnsupportedRuntimeError()
|
||||
|
||||
|
||||
def get_extractor(path, format, disable_progress=False):
|
||||
|
||||
@@ -16,15 +16,15 @@ import capa.engine
|
||||
|
||||
def convert_statement_to_result_document(statement):
|
||||
"""
|
||||
"statement": {
|
||||
"type": "or"
|
||||
},
|
||||
"statement": {
|
||||
"type": "or"
|
||||
},
|
||||
|
||||
"statement": {
|
||||
"max": 9223372036854775808,
|
||||
"min": 2,
|
||||
"type": "range"
|
||||
},
|
||||
"statement": {
|
||||
"max": 9223372036854775808,
|
||||
"min": 2,
|
||||
"type": "range"
|
||||
},
|
||||
"""
|
||||
statement_type = statement.name.lower()
|
||||
result = {"type": statement_type}
|
||||
@@ -47,28 +47,28 @@ def convert_statement_to_result_document(statement):
|
||||
|
||||
def convert_feature_to_result_document(feature):
|
||||
"""
|
||||
"feature": {
|
||||
"number": 6,
|
||||
"type": "number"
|
||||
},
|
||||
"feature": {
|
||||
"number": 6,
|
||||
"type": "number"
|
||||
},
|
||||
|
||||
"feature": {
|
||||
"api": "ws2_32.WSASocket",
|
||||
"type": "api"
|
||||
},
|
||||
"feature": {
|
||||
"api": "ws2_32.WSASocket",
|
||||
"type": "api"
|
||||
},
|
||||
|
||||
"feature": {
|
||||
"match": "create TCP socket",
|
||||
"type": "match"
|
||||
},
|
||||
"feature": {
|
||||
"match": "create TCP socket",
|
||||
"type": "match"
|
||||
},
|
||||
|
||||
"feature": {
|
||||
"characteristic": [
|
||||
"loop",
|
||||
true
|
||||
],
|
||||
"type": "characteristic"
|
||||
},
|
||||
"feature": {
|
||||
"characteristic": [
|
||||
"loop",
|
||||
true
|
||||
],
|
||||
"type": "characteristic"
|
||||
},
|
||||
"""
|
||||
result = {"type": feature.name, feature.name: feature.get_value_str()}
|
||||
if feature.description:
|
||||
@@ -80,15 +80,15 @@ def convert_feature_to_result_document(feature):
|
||||
|
||||
def convert_node_to_result_document(node):
|
||||
"""
|
||||
"node": {
|
||||
"type": "statement",
|
||||
"statement": { ... }
|
||||
},
|
||||
"node": {
|
||||
"type": "statement",
|
||||
"statement": { ... }
|
||||
},
|
||||
|
||||
"node": {
|
||||
"type": "feature",
|
||||
"feature": { ... }
|
||||
},
|
||||
"node": {
|
||||
"type": "feature",
|
||||
"feature": { ... }
|
||||
},
|
||||
"""
|
||||
|
||||
if isinstance(node, capa.engine.Statement):
|
||||
@@ -152,7 +152,10 @@ def convert_match_to_result_document(rules, capabilities, result):
|
||||
scope = rule.meta["scope"]
|
||||
doc["node"] = {
|
||||
"type": "statement",
|
||||
"statement": {"type": "subscope", "subscope": scope,},
|
||||
"statement": {
|
||||
"type": "subscope",
|
||||
"subscope": scope,
|
||||
},
|
||||
}
|
||||
|
||||
for location in doc["locations"]:
|
||||
@@ -257,5 +260,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
|
||||
def render_json(meta, rules, capabilities):
|
||||
return json.dumps(
|
||||
convert_capabilities_to_result_document(meta, rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
|
||||
convert_capabilities_to_result_document(meta, rules, capabilities),
|
||||
cls=CapaJsonObjectEncoder,
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
@@ -109,7 +109,12 @@ def render_attack(doc, ostream):
|
||||
inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected ATT&CK spec format")
|
||||
rows.append((rutils.bold(tactic.upper()), "\n".join(inner_rows),))
|
||||
rows.append(
|
||||
(
|
||||
rutils.bold(tactic.upper()),
|
||||
"\n".join(inner_rows),
|
||||
)
|
||||
)
|
||||
|
||||
if rows:
|
||||
ostream.write(
|
||||
|
||||
@@ -262,7 +262,7 @@ def parse_description(s, value_type, description=None):
|
||||
raise InvalidRule(
|
||||
"unexpected bytes value: byte sequences must be no larger than %s bytes" % MAX_BYTES_FEATURE_SIZE
|
||||
)
|
||||
elif value_type in {"number", "offset"}:
|
||||
elif value_type in ("number", "offset") or value_type.startswith(("number/", "offset/")):
|
||||
try:
|
||||
value = parse_int(value)
|
||||
except ValueError:
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
import sys
|
||||
import logging
|
||||
|
||||
try:
|
||||
from functools import lru_cache
|
||||
except ImportError:
|
||||
from backports.functools_lru_cache import lru_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotPackedError(ValueError):
|
||||
def __init__(self):
|
||||
super(NotPackedError, self).__init__("not packed")
|
||||
|
||||
|
||||
def can_unpack():
|
||||
# the unpacking backend is based on Speakeasy, which supports python 3.6+
|
||||
return sys.version_info >= (3, 6)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_unpackers():
|
||||
# break import loop
|
||||
import capa.unpack.aspack
|
||||
|
||||
return {p.name: p for p in [capa.unpack.aspack.AspackUnpacker]}
|
||||
|
||||
|
||||
def detect_packer(buf):
|
||||
for unpacker in get_unpackers().values():
|
||||
if unpacker.is_packed(buf):
|
||||
return unpacker.name
|
||||
|
||||
raise NotPackedError()
|
||||
|
||||
|
||||
def is_packed(buf):
|
||||
try:
|
||||
detect_packer(buf)
|
||||
return True
|
||||
except NotPackedError:
|
||||
return False
|
||||
|
||||
|
||||
def unpack_pe(packer, buf):
|
||||
return get_unpackers()[packer].unpack_pe(buf)
|
||||
@@ -1,459 +0,0 @@
|
||||
import io
|
||||
import struct
|
||||
import logging
|
||||
import contextlib
|
||||
import collections
|
||||
|
||||
import pefile
|
||||
import speakeasy
|
||||
import speakeasy.common as se_common
|
||||
import speakeasy.profiler
|
||||
import speakeasy.windows.objman
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def pefile_get_section_by_name(pe, section_name):
|
||||
for section in pe.sections:
|
||||
try:
|
||||
if section.Name.partition(b"\x00")[0].decode("ascii") == section_name:
|
||||
return section
|
||||
except:
|
||||
continue
|
||||
raise ValueError("section not found")
|
||||
|
||||
|
||||
def prepare_emu_context(se, module):
|
||||
"""
|
||||
prepare an Speakeasy instance for emulating the given module, without running it.
|
||||
|
||||
this is useful when planning to manually control the emulator,
|
||||
such as via `Speakeasy.emu.emu_eng.start(...)`.
|
||||
typically, Speakeasy expects to do "Run based" analysis,
|
||||
which doesn't give us too much control.
|
||||
|
||||
much of this was derived from win32::Win32Emulator::run_module.
|
||||
hopefully this can eventually be merged into Speakeasy.
|
||||
|
||||
args:
|
||||
se (speakeasy.Speakeasy): the instance to prepare
|
||||
module (speakeasy.Module): the module that will be emulated
|
||||
"""
|
||||
se._init_hooks()
|
||||
|
||||
main_exe = None
|
||||
if not module.is_exe():
|
||||
container = se.emu.init_container_process()
|
||||
if container:
|
||||
se.emu.processes.append(container)
|
||||
se.emu.curr_process = container
|
||||
else:
|
||||
main_exe = module
|
||||
|
||||
if main_exe:
|
||||
se.emu.user_modules = [main_exe] + se.emu.user_modules
|
||||
|
||||
# Create an empty process object for the module if none is supplied
|
||||
if len(se.emu.processes) == 0:
|
||||
p = speakeasy.windows.objman.Process(se.emu, path=module.get_emu_path(), base=module.base, pe=module)
|
||||
se.emu.curr_process = p
|
||||
|
||||
t = speakeasy.windows.objman.Thread(se.emu, stack_base=se.emu.stack_base, stack_commit=module.stack_commit)
|
||||
|
||||
se.emu.om.objects.update({t.address: t})
|
||||
se.emu.curr_process.threads.append(t)
|
||||
se.emu.curr_thread = t
|
||||
|
||||
peb = se.emu.alloc_peb(se.emu.curr_process)
|
||||
se.emu.init_teb(t, peb)
|
||||
|
||||
|
||||
INSN_PUSHA = 0x60
|
||||
INSN_POPA = 0x61
|
||||
|
||||
|
||||
class AspackUnpacker(speakeasy.Speakeasy):
|
||||
name = "aspack"
|
||||
|
||||
def __init__(self, buf, debug=False):
|
||||
super(AspackUnpacker, self).__init__(debug=debug)
|
||||
self.module = self.load_module(data=buf)
|
||||
prepare_emu_context(self, self.module)
|
||||
|
||||
@staticmethod
|
||||
def detect_aspack(buf):
|
||||
"""
|
||||
return True if the given buffer contains an ASPack'd PE file.
|
||||
we detect aspack by looking at the section names for .aspack.
|
||||
the unpacking routine contains further validation and will raise an exception if necessary.
|
||||
|
||||
args:
|
||||
buf (bytes): the contents of a PE file.
|
||||
|
||||
returns: bool
|
||||
"""
|
||||
try:
|
||||
pe = pefile.PE(data=buf, fast_load=True)
|
||||
except:
|
||||
return False
|
||||
|
||||
try:
|
||||
pefile_get_section_by_name(pe, ".aspack")
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def unpack_pe(cls, buf):
|
||||
"""
|
||||
unpack the given buffer that contains an ASPack'd PE file.
|
||||
return the contents of a reconstructed PE file.
|
||||
|
||||
args:
|
||||
buf (bytes): the contents of an ASPack'd PE file.
|
||||
|
||||
returns: bytes
|
||||
"""
|
||||
unpacker = cls(buf)
|
||||
return unpacker.unpack()
|
||||
|
||||
def stepi(self):
|
||||
self.emu.emu_eng.start(self.emu.get_pc(), count=1)
|
||||
|
||||
def remove_hook(self, hook_type, hook_handle):
|
||||
# TODO: this should be part of speakeasy
|
||||
self.emu.hooks[hook_type].remove(hook_handle)
|
||||
self.emu.emu_eng.hook_remove(hook_handle.handle)
|
||||
|
||||
def remove_mem_read_hook(self, hook_handle):
|
||||
# TODO: this should be part of speakeasy
|
||||
self.remove_hook(se_common.HOOK_MEM_READ, hook_handle)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def mem_read_hook(self, hook):
|
||||
"""
|
||||
context manager for temporarily installing a hook on the emulator.
|
||||
|
||||
example:
|
||||
|
||||
with self.mem_read_hook(lambda emu, access, addr, size, ctx: emu.stop()):
|
||||
self.emu.emu_eng.start(0x401000)
|
||||
|
||||
args:
|
||||
hook (speakeasy.common.MemReadHook): the hook to install
|
||||
"""
|
||||
handle = self.add_mem_read_hook(hook)
|
||||
# if this fails, then there's still an unfixed bug in Speakeasy
|
||||
assert handle.handle != 0
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.remove_mem_read_hook(handle)
|
||||
|
||||
def remove_code_hook(self, hook_handle):
|
||||
# TODO: this should be part of speakeasy
|
||||
self.remove_hook(se_common.HOOK_CODE, hook_handle)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def code_hook(self, hook):
|
||||
"""
|
||||
context manager for temporarily installing a hook on the emulator.
|
||||
|
||||
example:
|
||||
|
||||
with self.code_hook(lambda emu, addr, size, ctx: emu.stop()):
|
||||
self.emu.emu_eng.start(0x401000)
|
||||
|
||||
args:
|
||||
hook (speakeasy.common.CodeHook): the hook to install
|
||||
"""
|
||||
handle = self.add_code_hook(hook)
|
||||
assert handle.handle != 0
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.remove_code_hook(handle)
|
||||
|
||||
def read_ptr(self, va):
|
||||
endian = "little"
|
||||
val = self.mem_read(va, self.emu.ptr_size)
|
||||
return int.from_bytes(val, endian)
|
||||
|
||||
def dump(self):
|
||||
"""
|
||||
emulate the loaded module, pausing after an appropriate section hop.
|
||||
then, dump and return the module's memory and OEP.
|
||||
|
||||
this routine is specific to aspack. it makes the following assumptions:
|
||||
- aspack starts with a PUSHA to save off the CPU context
|
||||
- aspeck then runs its unpacking stub
|
||||
- aspeck executes POPA to restore the CPU context
|
||||
- aspack section hops to the OEP
|
||||
|
||||
we'll emulate in a few phases:
|
||||
1. single step over PUSHA at the entrypoint
|
||||
2. extract the address of the saved CPU context
|
||||
3. emulate until the saved CPU context is read
|
||||
4. assert this is a POPA instruction
|
||||
5. emulate until a section hop
|
||||
6. profit!
|
||||
|
||||
return the module's memory segment and the OEP.
|
||||
|
||||
returns: Tuple[byte, int]
|
||||
"""
|
||||
|
||||
# prime the emulator.
|
||||
# this is derived from winemu::WindowsEmulator::start()
|
||||
self.emu.curr_run = speakeasy.profiler.Run()
|
||||
self.emu.curr_mod = self.module
|
||||
self.emu.set_hooks()
|
||||
self.emu._set_emu_hooks()
|
||||
|
||||
# 0. sanity checking: assert entrypoint is a PUSHA instruction
|
||||
entrypoint = self.module.base + self.module.ep
|
||||
opcode = self.emu.mem_read(entrypoint, 1)[0]
|
||||
if opcode != INSN_PUSHA:
|
||||
raise ValueError("not packed with supported ASPack")
|
||||
|
||||
# 1. single step over PUSHA
|
||||
self.emu.set_pc(entrypoint)
|
||||
self.stepi()
|
||||
|
||||
# 2. extract address of saved CPU context
|
||||
saved_cpu_context = self.emu.get_stack_ptr()
|
||||
|
||||
# 3. emulate until saved CPU context is accessed
|
||||
def until_read(target):
|
||||
"""return a mem_read hook that stops the emulator when an address is read."""
|
||||
|
||||
def inner(emu, _access, addr, _size, _value, _ctx):
|
||||
if addr == target:
|
||||
emu.stop()
|
||||
return True
|
||||
|
||||
return inner
|
||||
|
||||
with self.mem_read_hook(until_read(saved_cpu_context)):
|
||||
self.emu.emu_eng.start(self.emu.get_pc())
|
||||
|
||||
# 4. assert this is a POPA instruction
|
||||
opcode = self.emu.mem_read(self.emu.get_pc(), 1)[0]
|
||||
if opcode != INSN_POPA:
|
||||
raise ValueError("not packed with supported ASPack")
|
||||
logger.debug("POPA: 0x%x", self.emu.get_pc())
|
||||
|
||||
# 5. emulate until a section hop
|
||||
aspack_section = self.module.get_section_by_name(".aspack")
|
||||
start = self.module.base + aspack_section.VirtualAddress
|
||||
end = start + aspack_section.Misc_VirtualSize
|
||||
|
||||
def until_section_hop(start, end):
|
||||
def inner(emu, addr, _size, _ctx):
|
||||
if addr < start or addr >= end:
|
||||
emu.stop()
|
||||
return True
|
||||
|
||||
return inner
|
||||
|
||||
with self.code_hook(until_section_hop(start, end)):
|
||||
self.emu.emu_eng.start(self.emu.get_pc())
|
||||
|
||||
# 6. dump and return
|
||||
oep = self.emu.get_pc()
|
||||
logger.debug("OEP: 0x%x", oep)
|
||||
|
||||
mm = self.get_address_map(self.module.base)
|
||||
buf = self.mem_read(mm.base, mm.size)
|
||||
|
||||
return buf, oep
|
||||
|
||||
def fixup(self, buf, oep):
|
||||
"""
|
||||
fixup a PE image that's been dumped from memory after unpacking aspack.
|
||||
|
||||
there are two big fixes that need to happen:
|
||||
1. update the section pointers and sizes
|
||||
2. rebuild the import table
|
||||
|
||||
for (1) updating the section pointers, we'll just update the
|
||||
physical pointers to match the virtual pointers, since this is a loaded image.
|
||||
|
||||
for (2) rebuilding the import table, we'll:
|
||||
(a) inspect the emulation results for resolved imports, which tells us dll/symbol names
|
||||
(b) scan the dumped image for the unpacked import thunks (Import Address Table/Thunk Table)
|
||||
(c) match the import thunks with resolved imports
|
||||
(d) build the import table structures
|
||||
(e) write the reconstructed table into the .aspack section
|
||||
|
||||
since the .aspack section contains the unpacking stub, which is no longer used,
|
||||
then we'll write the reconstructed IAT there. hopefully its big enough.
|
||||
"""
|
||||
pe = pefile.PE(data=buf)
|
||||
|
||||
pe.OPTIONAL_HEADER.AddressOfEntryPoint = oep - self.module.base
|
||||
|
||||
# 1. update section pointers and sizes.
|
||||
for section in pe.sections:
|
||||
section.PointerToRawData = section.VirtualAddress
|
||||
section.SizeOfRawData = section.Misc_VirtualSize
|
||||
|
||||
# 2. rebuild the import table
|
||||
|
||||
# place the reconstructed import table in the .aspack section (unpacking stub)
|
||||
reconstruction_target = pefile_get_section_by_name(pe, ".aspack").VirtualAddress
|
||||
|
||||
# mapping from import pointer to (dll name, symbol name).
|
||||
# the import pointer is generated by speakeasy and is not mapped.
|
||||
# it often looks something like 0xfeedf008.
|
||||
# as we encounter pointers with values like this, we can resolve the symbol.
|
||||
imports = {}
|
||||
|
||||
# 2a. find resolved imports
|
||||
for addr, (dll, sym) in self.module.import_table.items():
|
||||
# these are items in the original import table.
|
||||
logger.debug(f"found static import {dll}.{sym}")
|
||||
imports[addr] = (dll, sym)
|
||||
for (addr, dll, sym) in self.emu.dyn_imps:
|
||||
# these are imports that have been resolved at runtime by the unpacking stub.
|
||||
logger.debug(f"found dynamic import {dll}.{sym}")
|
||||
imports[addr] = (dll, sym)
|
||||
|
||||
# 2b. find the existing thunk tables
|
||||
# these are pointer-aligned tables of import pointers.
|
||||
# in my test sample, its found at the start of the first section.
|
||||
|
||||
# ordered list of tuples (VA, import pointer)
|
||||
# look up the symbol using the import pointer and the `imports` mapping.
|
||||
thunks = []
|
||||
|
||||
# scan from the start of the first section
|
||||
# until we reach values that don't look like thunk tables.
|
||||
for va in range(pe.sections[0].VirtualAddress + self.module.base, 0xFFFFFFFFFFFFFFFF, self.emu.ptr_size):
|
||||
ptr = self.read_ptr(va)
|
||||
if ptr == 0:
|
||||
# probably padding/terminating entry
|
||||
continue
|
||||
|
||||
if ptr in imports:
|
||||
thunks.append((va, ptr,))
|
||||
logger.debug(f"found import thunk at {va:08x} to {ptr:08x} for {imports[ptr][0]}\t{imports[ptr][1]}")
|
||||
continue
|
||||
|
||||
# otherwise, at the end of the thunk tables
|
||||
break
|
||||
|
||||
# collect the thunk entries into contiguous tables, grouped by dll name.
|
||||
#
|
||||
# list of thunk tuples that are contiguous and have the same dll name:
|
||||
# (VA, import pointer, dll name, symbol name)
|
||||
curr_idt_table = []
|
||||
# list of list of thunk tuples, like above
|
||||
idt_tables = []
|
||||
for thunk in thunks:
|
||||
va, imp = thunk
|
||||
dll, sym = imports[imp]
|
||||
|
||||
if not curr_idt_table:
|
||||
curr_idt_table.append((va, imp, dll, sym))
|
||||
elif curr_idt_table[0][2] == dll:
|
||||
curr_idt_table.append((va, imp, dll, sym))
|
||||
else:
|
||||
idt_tables.append(curr_idt_table)
|
||||
curr_idt_table = [(va, imp, dll, sym)]
|
||||
idt_tables.append(curr_idt_table)
|
||||
|
||||
# 2d. build the import table structures
|
||||
|
||||
# mapping from the data identifier to its RVA (which will be found within the reconstruction blob)
|
||||
locations = {}
|
||||
# the raw bytes of the reconstructed import structures.
|
||||
# it will have the following layout:
|
||||
# 1. DLL name strings and Hint/Name table entries
|
||||
# 2. Import Lookup Tables (points into (1))
|
||||
# 3. Import Directory Tables (points into (1), (2), and original Thunk Tables)
|
||||
reconstruction = io.BytesIO()
|
||||
|
||||
# list of dll names
|
||||
dlls = list(sorted(set(map(lambda pair: pair[0], imports.values()))))
|
||||
# mapping from dll name to list of symbols
|
||||
symbols = collections.defaultdict(set)
|
||||
for dll, sym in imports.values():
|
||||
symbols[dll].add(sym)
|
||||
|
||||
# emit strings into the reconstruction blob
|
||||
for dll in dlls:
|
||||
locations[("dll", dll)] = reconstruction_target + reconstruction.tell()
|
||||
reconstruction.write(dll.encode("ascii") + b"\x00")
|
||||
if reconstruction.tell() % 2 == 1:
|
||||
# padding
|
||||
reconstruction.write(b"\x00")
|
||||
|
||||
for sym in sorted(symbols[dll]):
|
||||
locations[("hint", dll, sym)] = reconstruction_target + reconstruction.tell()
|
||||
# export name pointer table hint == 0
|
||||
reconstruction.write(b"\x00\x00")
|
||||
# name
|
||||
reconstruction.write(sym.encode("ascii") + b"\x00")
|
||||
if reconstruction.tell() % 2 == 1:
|
||||
# padding
|
||||
reconstruction.write(b"\x00")
|
||||
|
||||
# emit Import Lookup Tables for each recovered thunk table
|
||||
ptr_format = "<I" if self.emu.ptr_size == 4 else "<Q"
|
||||
for i, idt_entry in enumerate(idt_tables):
|
||||
locations[("import lookup table", i)] = reconstruction_target + reconstruction.tell()
|
||||
for (va, imp, dll, sym) in idt_entry:
|
||||
reconstruction.write(struct.pack(ptr_format, locations[("hint", dll, sym)]))
|
||||
reconstruction.write(b"\x00" * 8)
|
||||
|
||||
# emit Import Descriptor Tables for each recovered thunk table
|
||||
IDT_ENTRY_SIZE = 0x20
|
||||
for i, idt_entry in enumerate(idt_tables):
|
||||
va, _, dll, _ = idt_entry[0]
|
||||
rva = va - self.module.base
|
||||
locations[("import descriptor table", i)] = reconstruction_target + reconstruction.tell()
|
||||
|
||||
# import lookup table rva
|
||||
reconstruction.write(struct.pack("<I", locations[("import lookup table", i)]))
|
||||
# date stamp
|
||||
reconstruction.write(struct.pack("<I", 0x0))
|
||||
# forwarder chain
|
||||
reconstruction.write(struct.pack("<I", 0x0))
|
||||
# name rva
|
||||
reconstruction.write(struct.pack("<I", locations[("dll", dll)]))
|
||||
# import address table rva
|
||||
reconstruction.write(struct.pack("<I", rva))
|
||||
# empty last entry
|
||||
reconstruction.write(b"\x00" * IDT_ENTRY_SIZE)
|
||||
|
||||
# if the reconstructed import structures are larger than the unpacking stub...
|
||||
# i'm not sure what we'll do. probably need to add a section.
|
||||
assert len(reconstruction.getvalue()) <= pefile_get_section_by_name(pe, ".aspack").Misc_VirtualSize
|
||||
|
||||
pe.set_bytes_at_rva(reconstruction_target, reconstruction.getvalue())
|
||||
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = locations[("import descriptor table", 0)]
|
||||
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = IDT_ENTRY_SIZE * len(idt_tables)
|
||||
|
||||
return pe.write()
|
||||
|
||||
def unpack(self):
|
||||
buf, oep = self.dump()
|
||||
buf = self.fixup(buf, oep)
|
||||
return buf
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
input = sys.argv[1]
|
||||
output = sys.argv[1]
|
||||
|
||||
with open(sys.argv[1], "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
with open(sys.argv[2], "wb") as f:
|
||||
f.write(AspackUnpacker.unpack_pe(buf))
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.1.0"
|
||||
__version__ = "1.2.0"
|
||||
|
||||
@@ -34,7 +34,6 @@ We like to use capa to help find the most interesting parts of a program, such a
|
||||
To install the plugin, you'll need to be running IDA Pro 7.4 or 7.5 with either Python 2 or Python 3.
|
||||
Next make sure pip commands are run using the Python install that is configured for your IDA install:
|
||||
|
||||
1. Only if running Python 2.7, run command `$ pip install https://github.com/williballenthin/vivisect/zipball/master`
|
||||
2. Run `$ pip install .` from capa root directory
|
||||
3. Open IDA and navigate to `File > Script file…` or `Alt+F7`
|
||||
4. Navigate to `<capa_install_dir>\capa\ida\` and choose `ida_capa_explorer.py`
|
||||
1. Run `$ pip install .` from capa root directory
|
||||
2. Open IDA and navigate to `File > Script file…` or `Alt+F7`
|
||||
3. Navigate to `<capa_install_dir>\capa\ida\` and choose `ida_capa_explorer.py`
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 7ae5ae215f...c8d8c05f59
@@ -13,8 +13,7 @@ It will mark up functions with their capa matches, like:
|
||||
UninstallService proc near
|
||||
...
|
||||
|
||||
To use, invoke from the Binary Ninja Tools menu, or from the
|
||||
command-palette.
|
||||
To use, invoke from the Binary Ninja Tools menu, or from the command-palette.
|
||||
|
||||
Adapted for Binary Ninja by @psifertex
|
||||
|
||||
@@ -31,7 +30,7 @@ from binaryninja import *
|
||||
|
||||
def append_func_cmt(bv, va, cmt):
|
||||
"""
|
||||
add the given comment to the given function,
|
||||
add the given comment to the given function,
|
||||
if it doesn't already exist.
|
||||
"""
|
||||
func = bv.get_function_at(va)
|
||||
|
||||
@@ -41,7 +41,7 @@ logger = logging.getLogger("capa")
|
||||
|
||||
def append_func_cmt(va, cmt, repeatable=False):
|
||||
"""
|
||||
add the given comment to the given function,
|
||||
add the given comment to the given function,
|
||||
if it doesn't already exist.
|
||||
"""
|
||||
func = ida_funcs.get_func(va)
|
||||
|
||||
@@ -399,7 +399,11 @@ def lint_rule(ctx, rule):
|
||||
print("")
|
||||
print(
|
||||
"%s%s %s"
|
||||
% (" (nursery) " if is_nursery_rule(rule) else "", rule.name, ("(%s)" % category) if category else "",)
|
||||
% (
|
||||
" (nursery) " if is_nursery_rule(rule) else "",
|
||||
rule.name,
|
||||
("(%s)" % category) if category else "",
|
||||
)
|
||||
)
|
||||
|
||||
level = "WARN" if is_nursery_rule(rule) else "FAIL"
|
||||
@@ -407,7 +411,12 @@ def lint_rule(ctx, rule):
|
||||
for violation in violations:
|
||||
print(
|
||||
"%s %s: %s: %s"
|
||||
% (" " if is_nursery_rule(rule) else "", level, violation.name, violation.recommendation,)
|
||||
% (
|
||||
" " if is_nursery_rule(rule) else "",
|
||||
level,
|
||||
violation.name,
|
||||
violation.recommendation,
|
||||
)
|
||||
)
|
||||
|
||||
elif len(violations) == 0 and is_nursery_rule(rule):
|
||||
@@ -487,7 +496,9 @@ def main(argv=None):
|
||||
parser.add_argument("rules", type=str, help="Path to rules")
|
||||
parser.add_argument("--samples", type=str, default=samples_path, help="Path to samples")
|
||||
parser.add_argument(
|
||||
"--thorough", action="store_true", help="Enable thorough linting - takes more time, but does a better job",
|
||||
"--thorough",
|
||||
action="store_true",
|
||||
help="Enable thorough linting - takes more time, but does a better job",
|
||||
)
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
|
||||
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
|
||||
|
||||
@@ -71,22 +71,22 @@ logger = logging.getLogger("capa.show-capabilities-by-function")
|
||||
|
||||
def render_matches_by_function(doc):
|
||||
"""
|
||||
like:
|
||||
like:
|
||||
|
||||
function at 0x1000321a with 33 features:
|
||||
- get hostname
|
||||
- initialize Winsock library
|
||||
function at 0x10003286 with 63 features:
|
||||
- create thread
|
||||
- terminate thread
|
||||
function at 0x10003415 with 116 features:
|
||||
- write file
|
||||
- send data
|
||||
- link function at runtime
|
||||
- create HTTP request
|
||||
- get common file path
|
||||
- send HTTP request
|
||||
- connect to HTTP server
|
||||
function at 0x1000321a with 33 features:
|
||||
- get hostname
|
||||
- initialize Winsock library
|
||||
function at 0x10003286 with 63 features:
|
||||
- create thread
|
||||
- terminate thread
|
||||
function at 0x10003415 with 116 features:
|
||||
- write file
|
||||
- send data
|
||||
- link function at runtime
|
||||
- create HTTP request
|
||||
- get common file path
|
||||
- send HTTP request
|
||||
- connect to HTTP server
|
||||
"""
|
||||
ostream = rutils.StringIO()
|
||||
|
||||
|
||||
13
setup.py
13
setup.py
@@ -17,11 +17,10 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r
|
||||
if sys.version_info >= (3, 0):
|
||||
# py3
|
||||
requirements.append("networkx")
|
||||
requirements.append("pylancelot~=0.3.6")
|
||||
else:
|
||||
# py2
|
||||
requirements.append("enum34")
|
||||
requirements.append("vivisect @ https://github.com/williballenthin/vivisect/tarball/v0.0.20200804#egg=vivisect")
|
||||
requirements.append("enum34==1.1.6") # v1.1.6 is needed by halo 0.0.30 / spinners 0.0.24
|
||||
requirements.append("vivisect==0.1.0rc3")
|
||||
requirements.append("viv-utils")
|
||||
requirements.append("networkx==2.2") # v2.2 is last version supported by Python 2.7
|
||||
requirements.append("backports.functools-lru-cache")
|
||||
@@ -43,7 +42,11 @@ setuptools.setup(
|
||||
url="https://www.github.com/fireeye/capa",
|
||||
packages=setuptools.find_packages(exclude=["tests"]),
|
||||
package_dir={"capa": "capa"},
|
||||
entry_points={"console_scripts": ["capa=capa.main:main",]},
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"capa=capa.main:main",
|
||||
]
|
||||
},
|
||||
include_package_data=True,
|
||||
install_requires=requirements,
|
||||
extras_require={
|
||||
@@ -55,7 +58,7 @@ setuptools.setup(
|
||||
"pycodestyle",
|
||||
"black ; python_version>'3.0'",
|
||||
"isort",
|
||||
],
|
||||
]
|
||||
},
|
||||
zip_safe=False,
|
||||
keywords="capa",
|
||||
|
||||
Submodule tests/data updated: aeb505b914...768cda2a09
@@ -80,16 +80,6 @@ def get_viv_extractor(path):
|
||||
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_lancelot_extractor(path):
|
||||
import capa.features.extractors.lancelot
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def extract_file_features(extractor):
|
||||
features = collections.defaultdict(set)
|
||||
@@ -150,8 +140,6 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_")
|
||||
elif name.startswith("c9188"):
|
||||
return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_")
|
||||
elif name == "aspack":
|
||||
return os.path.join(CD, "data", "2055994ff75b4309eee3a49c5749d306")
|
||||
else:
|
||||
raise ValueError("unexpected sample fixture")
|
||||
|
||||
@@ -352,10 +340,20 @@ FEATURE_PRESENCE_TESTS = [
|
||||
("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), True),
|
||||
("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True),
|
||||
# insn/api: x64
|
||||
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True,),
|
||||
(
|
||||
"kernel32-64",
|
||||
"function=0x180001010",
|
||||
capa.features.insn.API("RtlVirtualUnwind"),
|
||||
True,
|
||||
),
|
||||
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True),
|
||||
# insn/api: x64 thunk
|
||||
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True,),
|
||||
(
|
||||
"kernel32-64",
|
||||
"function=0x1800202B0",
|
||||
capa.features.insn.API("RtlCaptureContext"),
|
||||
True,
|
||||
),
|
||||
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True),
|
||||
# insn/api: resolve indirect calls
|
||||
("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), True),
|
||||
@@ -439,7 +437,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
|
||||
|
||||
def get_extractor(path):
|
||||
if sys.version_info >= (3, 0):
|
||||
extractor = get_lancelot_extractor(path)
|
||||
raise RuntimeError("no supported py3 backends yet")
|
||||
else:
|
||||
extractor = get_viv_extractor(path)
|
||||
|
||||
@@ -506,8 +504,3 @@ def z499c2_extractor():
|
||||
@pytest.fixture
|
||||
def al_khaser_x86_extractor():
|
||||
return get_extractor(get_data_path_by_name("al-khaser x86"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def aspack_extractor():
|
||||
return get_extractor(get_data_path_by_name("aspack"))
|
||||
|
||||
@@ -59,7 +59,13 @@ def test_some():
|
||||
)
|
||||
assert (
|
||||
Some(2, [Number(1), Number(2), Number(3)]).evaluate(
|
||||
{Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}, Number(4): {1},}
|
||||
{
|
||||
Number(0): {1},
|
||||
Number(1): {1},
|
||||
Number(2): {1},
|
||||
Number(3): {1},
|
||||
Number(4): {1},
|
||||
}
|
||||
)
|
||||
== True
|
||||
)
|
||||
@@ -258,7 +264,9 @@ def test_match_matched_rules():
|
||||
]
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.Number(100): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule1") in features
|
||||
assert capa.features.MatchedRule("test rule2") in features
|
||||
@@ -266,7 +274,9 @@ def test_match_matched_rules():
|
||||
# the ordering of the rules must not matter,
|
||||
# the engine should match rules in an appropriate order.
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(reversed(rules)), {capa.features.insn.Number(100): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(reversed(rules)),
|
||||
{capa.features.insn.Number(100): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule1") in features
|
||||
assert capa.features.MatchedRule("test rule2") in features
|
||||
@@ -312,22 +322,30 @@ def test_regex():
|
||||
),
|
||||
]
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.Number(100): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") not in features
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("aaaa"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("aaaa"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") not in features
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("aBBBBa"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") not in features
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("abbbba"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("abbbba"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") in features
|
||||
assert capa.features.MatchedRule("rule with implied wildcards") in features
|
||||
@@ -350,7 +368,9 @@ def test_regex_ignorecase():
|
||||
),
|
||||
]
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("aBBBBa"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") in features
|
||||
|
||||
@@ -429,7 +449,9 @@ def test_match_namespace():
|
||||
]
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.insn.API("CreateFile"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.API("CreateFile"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert "CreateFile API" in matches
|
||||
assert "file-create" in matches
|
||||
@@ -439,7 +461,9 @@ def test_match_namespace():
|
||||
assert capa.features.MatchedRule("file/create/CreateFile") in features
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.insn.API("WriteFile"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.API("WriteFile"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert "WriteFile API" in matches
|
||||
assert "file-create" not in matches
|
||||
|
||||
@@ -21,13 +21,19 @@ import capa.features.extractors
|
||||
EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
|
||||
{
|
||||
"base address": 0x401000,
|
||||
"file features": [(0x402345, capa.features.Characteristic("embedded pe")),],
|
||||
"file features": [
|
||||
(0x402345, capa.features.Characteristic("embedded pe")),
|
||||
],
|
||||
"functions": {
|
||||
0x401000: {
|
||||
"features": [(0x401000, capa.features.Characteristic("indirect call")),],
|
||||
"features": [
|
||||
(0x401000, capa.features.Characteristic("indirect call")),
|
||||
],
|
||||
"basic blocks": {
|
||||
0x401000: {
|
||||
"features": [(0x401000, capa.features.Characteristic("tight loop")),],
|
||||
"features": [
|
||||
(0x401000, capa.features.Characteristic("tight loop")),
|
||||
],
|
||||
"instructions": {
|
||||
0x401000: {
|
||||
"features": [
|
||||
@@ -35,7 +41,11 @@ EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
|
||||
(0x401000, capa.features.Characteristic("nzxor")),
|
||||
],
|
||||
},
|
||||
0x401002: {"features": [(0x401002, capa.features.insn.Mnemonic("mov")),],},
|
||||
0x401002: {
|
||||
"features": [
|
||||
(0x401002, capa.features.insn.Mnemonic("mov")),
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,104 +1,104 @@
|
||||
# run this script from within IDA with ./tests/data/mimikatz.exe open
|
||||
import sys
|
||||
import logging
|
||||
import os.path
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
sys.path.append(os.path.dirname(__file__))
|
||||
from fixtures import *
|
||||
finally:
|
||||
sys.path.pop()
|
||||
|
||||
|
||||
logger = logging.getLogger("test_ida_features")
|
||||
|
||||
|
||||
def check_input_file(wanted):
|
||||
import idautils
|
||||
|
||||
# some versions (7.4) of IDA return a truncated version of the MD5.
|
||||
# https://github.com/idapython/bin/issues/11
|
||||
try:
|
||||
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
|
||||
except UnicodeDecodeError:
|
||||
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
|
||||
# rather than the hex digest
|
||||
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
|
||||
|
||||
if not wanted.startswith(found):
|
||||
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
|
||||
|
||||
|
||||
def get_ida_extractor(_path):
|
||||
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
|
||||
|
||||
# have to import import this inline so pytest doesn't bail outside of IDA
|
||||
import capa.features.extractors.ida
|
||||
|
||||
return capa.features.extractors.ida.IdaFeatureExtractor()
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_features():
|
||||
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
|
||||
id = make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
print("SKIP %s" % (id))
|
||||
continue
|
||||
|
||||
scope = resolve_scope(scope)
|
||||
sample = resolve_sample(sample)
|
||||
|
||||
try:
|
||||
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception as e:
|
||||
print("FAIL %s" % (id))
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("OK %s" % (id))
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_feature_counts():
|
||||
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
|
||||
id = make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
print("SKIP %s" % (id))
|
||||
continue
|
||||
|
||||
scope = resolve_scope(scope)
|
||||
sample = resolve_sample(sample)
|
||||
|
||||
try:
|
||||
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception as e:
|
||||
print("FAIL %s" % (id))
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("OK %s" % (id))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("-" * 80)
|
||||
|
||||
# invoke all functions in this module that start with `test_`
|
||||
for name in dir(sys.modules[__name__]):
|
||||
if not name.startswith("test_"):
|
||||
continue
|
||||
|
||||
test = getattr(sys.modules[__name__], name)
|
||||
logger.debug("invoking test: %s", name)
|
||||
sys.stderr.flush()
|
||||
test()
|
||||
|
||||
print("DONE")
|
||||
# run this script from within IDA with ./tests/data/mimikatz.exe open
|
||||
import sys
|
||||
import logging
|
||||
import os.path
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
sys.path.append(os.path.dirname(__file__))
|
||||
from fixtures import *
|
||||
finally:
|
||||
sys.path.pop()
|
||||
|
||||
|
||||
logger = logging.getLogger("test_ida_features")
|
||||
|
||||
|
||||
def check_input_file(wanted):
|
||||
import idautils
|
||||
|
||||
# some versions (7.4) of IDA return a truncated version of the MD5.
|
||||
# https://github.com/idapython/bin/issues/11
|
||||
try:
|
||||
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
|
||||
except UnicodeDecodeError:
|
||||
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
|
||||
# rather than the hex digest
|
||||
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
|
||||
|
||||
if not wanted.startswith(found):
|
||||
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
|
||||
|
||||
|
||||
def get_ida_extractor(_path):
|
||||
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
|
||||
|
||||
# have to import import this inline so pytest doesn't bail outside of IDA
|
||||
import capa.features.extractors.ida
|
||||
|
||||
return capa.features.extractors.ida.IdaFeatureExtractor()
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_features():
|
||||
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
|
||||
id = make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
print("SKIP %s" % (id))
|
||||
continue
|
||||
|
||||
scope = resolve_scope(scope)
|
||||
sample = resolve_sample(sample)
|
||||
|
||||
try:
|
||||
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception as e:
|
||||
print("FAIL %s" % (id))
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("OK %s" % (id))
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_feature_counts():
|
||||
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
|
||||
id = make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
check_input_file(get_sample_md5_by_name(sample))
|
||||
except RuntimeError:
|
||||
print("SKIP %s" % (id))
|
||||
continue
|
||||
|
||||
scope = resolve_scope(scope)
|
||||
sample = resolve_sample(sample)
|
||||
|
||||
try:
|
||||
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
|
||||
except Exception as e:
|
||||
print("FAIL %s" % (id))
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("OK %s" % (id))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("-" * 80)
|
||||
|
||||
# invoke all functions in this module that start with `test_`
|
||||
for name in dir(sys.modules[__name__]):
|
||||
if not name.startswith("test_"):
|
||||
continue
|
||||
|
||||
test = getattr(sys.modules[__name__], name)
|
||||
logger.debug("invoking test: %s", name)
|
||||
sys.stderr.flush()
|
||||
test()
|
||||
|
||||
print("DONE")
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
|
||||
from fixtures import *
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
|
||||
)
|
||||
def test_lancelot_features(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
|
||||
do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
|
||||
)
|
||||
def test_lancelot_feature_counts(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
|
||||
do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected)
|
||||
@@ -44,10 +44,20 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
|
||||
path = z9324d_extractor.path
|
||||
rule_file = tmpdir.mkdir("capa").join("rule.yml")
|
||||
rule_file.write(RULE_CONTENT)
|
||||
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
|
||||
assert (
|
||||
capa.main.main(
|
||||
[
|
||||
path,
|
||||
"-v",
|
||||
"-r",
|
||||
rule_file.strpath,
|
||||
]
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces")
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main_shellcode(z499c2_extractor):
|
||||
path = z499c2_extractor.path
|
||||
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0
|
||||
|
||||
@@ -483,6 +483,21 @@ def test_number_arch():
|
||||
assert r.evaluate({Number(2, arch=ARCH_X64): {1}}) == False
|
||||
|
||||
|
||||
def test_number_arch_symbol():
|
||||
r = capa.rules.Rule.from_yaml(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test rule
|
||||
features:
|
||||
- number/x32: 2 = some constant
|
||||
"""
|
||||
)
|
||||
)
|
||||
assert r.evaluate({Number(2, arch=ARCH_X32, description="some constant"): {1}}) == True
|
||||
|
||||
|
||||
def test_offset_symbol():
|
||||
rule = textwrap.dedent(
|
||||
"""
|
||||
@@ -546,6 +561,21 @@ def test_offset_arch():
|
||||
assert r.evaluate({Offset(2, arch=ARCH_X64): {1}}) == False
|
||||
|
||||
|
||||
def test_offset_arch_symbol():
|
||||
r = capa.rules.Rule.from_yaml(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: test rule
|
||||
features:
|
||||
- offset/x32: 2 = some constant
|
||||
"""
|
||||
)
|
||||
)
|
||||
assert r.evaluate({Offset(2, arch=ARCH_X32, description="some constant"): {1}}) == True
|
||||
|
||||
|
||||
def test_invalid_offset():
|
||||
with pytest.raises(capa.rules.InvalidRule):
|
||||
r = capa.rules.Rule.from_yaml(
|
||||
@@ -650,12 +680,16 @@ def test_regex_values_always_string():
|
||||
),
|
||||
]
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("123"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("123"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") in features
|
||||
|
||||
features, matches = capa.engine.match(
|
||||
capa.engine.topologically_order_rules(rules), {capa.features.String("0x123"): {1}}, 0x0,
|
||||
capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String("0x123"): {1}},
|
||||
0x0,
|
||||
)
|
||||
assert capa.features.MatchedRule("test rule") in features
|
||||
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
|
||||
import pefile
|
||||
import pytest
|
||||
from fixtures import *
|
||||
|
||||
import capa.unpack
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_is_packed(aspack_extractor):
|
||||
path = aspack_extractor.path
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
assert capa.unpack.is_packed(buf) is True
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_detect(aspack_extractor):
|
||||
path = aspack_extractor.path
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
assert capa.unpack.detect_packer(buf) == "aspack"
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_unpack(aspack_extractor):
|
||||
with open(aspack_extractor.path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
unpacked = capa.unpack.unpack_pe("aspack", buf)
|
||||
|
||||
pe = pefile.PE(data=unpacked)
|
||||
assert pe.OPTIONAL_HEADER.ImageBase == 0x4AD00000
|
||||
assert pe.OPTIONAL_HEADER.AddressOfEntryPoint == 0x1A610
|
||||
assert b"This program cannot be run in DOS mode" in unpacked
|
||||
assert "(C) Copyright 1985-2000 Microsoft Corp.".encode("utf-16le") in unpacked
|
||||
assert "CMD.EXE has halted. %0".encode("utf-16le") in unpacked
|
||||
|
||||
dlls = set([])
|
||||
syms = set([])
|
||||
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||
dlls.add(entry.dll.decode("ascii").lower().partition(".")[0])
|
||||
for imp in entry.imports:
|
||||
syms.add(imp.name.decode("ascii"))
|
||||
|
||||
assert dlls == {"advapi32", "kernel32", "msvcrt", "user32"}
|
||||
assert "RegQueryValueExW" in syms
|
||||
assert "WriteConsoleW" in syms
|
||||
assert "realloc" in syms
|
||||
assert "GetProcessWindowStation" in syms
|
||||
@@ -11,7 +11,9 @@ from fixtures import *
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_PRESENCE_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_viv_features(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):
|
||||
@@ -19,7 +21,9 @@ def test_viv_features(sample, scope, feature, expected):
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_viv_feature_counts(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):
|
||||
|
||||
Reference in New Issue
Block a user