Compare commits

..

36 Commits

Author SHA1 Message Date
Capa Bot
ed0f4f994c Sync capa rules submodule 2020-08-31 20:19:29 +00:00
Capa Bot
f9eed2d5b2 Sync capa rules submodule 2020-08-31 19:19:49 +00:00
Willi Ballenthin
a801a681b8 Merge pull request #266 from fireeye/release-v1.2.0
release v1.2.0
2020-08-31 10:29:38 -06:00
mike-hunhoff
c25632b12c Merge pull request #264 from winniepe/master 2020-08-31 09:22:34 -06:00
Capa Bot
8e6974b10f Sync capa rules submodule 2020-08-31 13:51:49 +00:00
Capa Bot
7616603b11 Sync capa rules submodule 2020-08-31 13:02:37 +00:00
winniepe
7c27af8868 Restore default expansion after unselecting 'Limit results to current function' checkbox. 2020-08-30 16:48:51 +00:00
winniepe
19e5e9b766 Expand one layer by default to make IDA navigation easier. 2020-08-30 16:27:48 +00:00
William Ballenthin
adeee3e834 changelog: don't forget to reference @edeca! 2020-08-29 22:53:51 -06:00
William Ballenthin
c2997c8033 changelog: add entry from #264 2020-08-29 22:32:24 -06:00
William Ballenthin
28b463f145 changelog: add entries for v1.2.0 2020-08-29 22:26:40 -06:00
William Ballenthin
cc59f5b91e setup: bump version to v1.2.0 2020-08-29 21:54:16 -06:00
William Ballenthin
06ac49e629 submodule: rules, data update 2020-08-29 21:51:40 -06:00
Capa Bot
6c07617082 Sync capa rules submodule 2020-08-29 00:11:38 +00:00
Capa Bot
13390918a1 Sync capa rules submodule 2020-08-28 20:09:50 +00:00
Capa Bot
0f44ec0dd8 Sync capa-testfiles submodule 2020-08-28 19:59:22 +00:00
mike-hunhoff
c49199138e Merge pull request #261 from fireeye/explorer_include_block_scope_limit_by_func 2020-08-28 10:46:40 -06:00
Michael Hunhoff
3f88bb8500 adding code to include basic block scope when limiting results by a function 2020-08-28 10:30:09 -06:00
Willi Ballenthin
b2b9f15bc1 Merge pull request #260 from fireeye/explorer_plugin_display_statement_description
explorer: display statement descriptions
2020-08-27 17:16:38 -06:00
Michael Hunhoff
d2cd224fb3 adding code to display statement description in explorer plugin UI 2020-08-27 14:49:49 -06:00
Capa Bot
aac13164a5 Sync capa rules submodule 2020-08-27 20:40:06 +00:00
Capa Bot
f2fff02b49 Sync capa rules submodule 2020-08-27 20:39:21 +00:00
Willi Ballenthin
662a7eaae6 Merge pull request #259 from recvfrom/master
Fix #255: Use relative paths for the git submodule
2020-08-27 14:20:10 -06:00
Willi Ballenthin
f6ba63083b Merge pull request #258 from recvfrom/fix-256
Fix 256: Pin enum34 version to 1.1.6 for python2.7
2020-08-27 14:19:43 -06:00
Andrew
49774110cc Fix #255: Use relative paths for the git submodule
Fixes #255

This enables both HTTPS and SSH to be used to checkout
the project, per https://stackoverflow.com/a/44630028/9457431
2020-08-27 15:25:14 -04:00
Andrew
c7840e0769 Fix 256: Pin enum34 version to 1.1.6 for python2.7
Fixes #256 - capa requires halo==0.0.30, which has a dependency on
spinners>=0.0.24. spinners 0.0.24 has a dependency on enum34==1.1.6,
but 1.1.10 gets installed and used on my machine without the version
being pinned to 1.1.6. This issue occurs when using python 2.7.
2020-08-27 14:59:58 -04:00
mike-hunhoff
d2155eb3a1 Merge pull request #257 from fireeye/fix-237 2020-08-27 12:39:20 -06:00
Michael Hunhoff
3772c5c0bc add additional nzxor stack cookie check for IDA extractor 2020-08-27 12:32:44 -06:00
Capa Bot
d47d149196 Sync capa rules submodule 2020-08-27 16:08:48 +00:00
Capa Bot
528645c0d2 Sync capa rules submodule 2020-08-27 13:53:01 +00:00
Willi Ballenthin
7464a62943 Merge pull request #253 from fireeye/black-reformat
Black reformat
2020-08-27 07:50:46 -06:00
Moritz Raabe
34e7991081 black 20.8b1 updates 2020-08-27 11:26:28 +02:00
Moritz Raabe
3e20f0fc71 dos2unix 2020-08-27 11:25:43 +02:00
Capa Bot
cb9bd2eab7 Sync capa-testfiles submodule 2020-08-27 08:40:12 +00:00
Willi Ballenthin
9d102843ac Merge pull request #251 from fireeye/bugfix-249-arch-description
bugfix 249
2020-08-26 17:18:34 -06:00
Michael Hunhoff
dc8870861b fixes 249 2020-08-26 16:31:07 -06:00
50 changed files with 993 additions and 2314 deletions

View File

@@ -62,8 +62,7 @@ jobs:
with:
python-version: ${{ matrix.python }}
- name: Install capa
# TODO: remove `pefile` when we bump lancelot >= 0.3.7
run: pip install -e .[dev] pefile
run: pip install -e .[dev]
- name: Run tests
run: pytest tests/

4
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "rules"]
path = rules
url = git@github.com:fireeye/capa-rules.git
url = ../capa-rules.git
[submodule "tests/data"]
path = tests/data
url = git@github.com:fireeye/capa-testfiles.git
url = ../capa-testfiles.git

View File

@@ -1,5 +1,103 @@
# Change Log
## v1.2.0 (2020-08-31)
This release brings UI enhancements, especially for the IDA Pro plugin,
investment towards py3 support,
fixes some bugs identified by the community,
and 46 (!) new rules.
We received contributions from ten reverse engineers, including five new ones:
- @agithubuserlol
- @recvfrom
- @D4nch3n
- @edeca
- @winniepe
Download a standalone binary below and checkout the readme [here on GitHub](https://github.com/fireeye/capa/).
Report issues on our [issue tracker](https://github.com/fireeye/capa/issues)
and contribute new rules at [capa-rules](https://github.com/fireeye/capa-rules/).
### New features
- ida plugin: display arch flavors @mike-hunhoff
- ida plugin: display block descriptions @mike-hunhoff
- ida backend: extract features from nested pointers @mike-hunhoff
- main: show more progress output @williballenthin
- core: pin dependency versions #258 @recvfrom
### New rules
- bypass UAC via AppInfo ALPC @agithubuserlol
- bypass UAC via token manipulation @agithubuserlol
- check for sandbox and av modules @re-fox
- check for sandbox username @re-fox
- check if process is running under wine @re-fox
- validate credit card number using luhn algorithm @re-fox
- validate credit card number using luhn algorithm with no lookup table @re-fox
- hash data using FNV @edeca @mr-tz
- link many functions at runtime @mr-tz
- reference public RSA key @mr-tz
- packed with ASPack @williballenthin
- delete internet cache @mike-hunhoff
- enumerate internet cache @mike-hunhoff
- send ICMP echo request @mike-hunhoff
- check for debugger via API @mike-hunhoff
- check for hardware breakpoints @mike-hunhoff
- check for kernel debugger via shared user data structure @mike-hunhoff
- check for protected handle exception @mike-hunhoff
- check for software breakpoints @mike-hunhoff
- check for trap flag exception @mike-hunhoff
- check for unexpected memory writes @mike-hunhoff
- check process job object @mike-hunhoff
- reference anti-VM strings targeting Parallels @mike-hunhoff
- reference anti-VM strings targeting Qemu @mike-hunhoff
- reference anti-VM strings targeting VirtualBox @mike-hunhoff
- reference anti-VM strings targeting VirtualPC @mike-hunhoff
- reference anti-VM strings targeting VMWare @mike-hunhoff
- reference anti-VM strings targeting Xen @mike-hunhoff
- reference analysis tools strings @mike-hunhoff
- reference WMI statements @mike-hunhoff
- get number of processor cores @mike-hunhoff
- get number of processors @mike-hunhoff
- enumerate disk properties @mike-hunhoff
- get disk size @mike-hunhoff
- get process heap flags @mike-hunhoff
- get process heap force flags @mike-hunhoff
- get Explorer PID @mike-hunhoff
- delay execution @mike-hunhoff
- check for process debug object @mike-hunhoff
- check license value @mike-hunhoff
- check ProcessDebugFlags @mike-hunhoff
- check ProcessDebugPort @mike-hunhoff
- check SystemKernelDebuggerInformation @mike-hunhoff
- check thread yield allowed @mike-hunhoff
- enumerate system firmware tables @mike-hunhoff
- get system firmware table @mike-hunhoff
- hide thread from debugger @mike-hunhoff
### Bug fixes
- ida backend: extract unmapped immediate number features @mike-hunhoff
- ida backend: fix stack cookie check #257 @mike-hunhoff
- viv backend: better extract gs segment access @williballenthin
- core: enable counting of string features #241 @D4nch3n @williballenthin
- core: enable descriptions on feature with arch flavors @mike-hunhoff
- core: update git links for non-SSH access #259 @recvfrom
### Changes
- ida plugin: better default display showing first level nesting @winniepe
- remove unused `characteristic(switch)` feature @ana06
- prepare testing infrastructure for multiple backends/py3 @williballenthin
- ci: zip build artifacts @ana06
- ci: build all supported python versions @ana06
- code style and formatting @mr-tz
### Raw diffs
- [capa v1.1.0...v1.2.0](https://github.com/fireeye/capa/compare/v1.1.0...v1.2.0)
- [capa-rules v1.1.0...v1.2.0](https://github.com/fireeye/capa-rules/compare/v1.1.0...v1.2.0)
## v1.1.0 (2020-08-05)
This release brings new rule format updates, such as adding `offset/x32` and negative offsets,

View File

@@ -1,7 +1,7 @@
![capa](.github/logo.png)
[![CI status](https://github.com/fireeye/capa/workflows/CI/badge.svg)](https://github.com/fireeye/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
[![Number of rules](https://img.shields.io/badge/rules-303-blue.svg)](https://github.com/fireeye/capa-rules)
[![Number of rules](https://img.shields.io/badge/rules-341-blue.svg)](https://github.com/fireeye/capa-rules)
[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt)
capa detects capabilities in executable files.

View File

@@ -75,7 +75,7 @@ class IdaFeatureExtractor(FeatureExtractor):
yield feature, ea
def get_basic_blocks(self, f):
for bb in idaapi.FlowChart(f, flags=idaapi.FC_PREDS):
for bb in capa.features.extractors.ida.helpers.get_function_blocks(f):
yield add_ea_int_cast(bb)
def extract_basic_block_features(self, f, bb):

View File

@@ -20,10 +20,10 @@ from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
def get_printable_len(op):
""" Return string length if all operand bytes are ascii or utf16-le printable
"""Return string length if all operand bytes are ascii or utf16-le printable
args:
op (IDA op_t)
args:
op (IDA op_t)
"""
op_val = capa.features.extractors.ida.helpers.mask_op_val(op)
@@ -62,10 +62,10 @@ def get_printable_len(op):
def is_mov_imm_to_stack(insn):
""" verify instruction moves immediate onto stack
"""verify instruction moves immediate onto stack
args:
insn (IDA insn_t)
args:
insn (IDA insn_t)
"""
if insn.Op2.type != idaapi.o_imm:
return False
@@ -80,13 +80,13 @@ def is_mov_imm_to_stack(insn):
def bb_contains_stackstring(f, bb):
""" check basic block for stackstring indicators
"""check basic block for stackstring indicators
true if basic block contains enough moves of constant bytes to the stack
true if basic block contains enough moves of constant bytes to the stack
args:
f (IDA func_t)
bb (IDA BasicBlock)
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
count = 0
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
@@ -98,33 +98,33 @@ def bb_contains_stackstring(f, bb):
def extract_bb_stackstring(f, bb):
""" extract stackstring indicators from basic block
"""extract stackstring indicators from basic block
args:
f (IDA func_t)
bb (IDA BasicBlock)
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
if bb_contains_stackstring(f, bb):
yield Characteristic("stack string"), bb.start_ea
def extract_bb_tight_loop(f, bb):
""" extract tight loop indicators from a basic block
"""extract tight loop indicators from a basic block
args:
f (IDA func_t)
bb (IDA BasicBlock)
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(bb):
yield Characteristic("tight loop"), bb.start_ea
def extract_features(f, bb):
""" extract basic block features
"""extract basic block features
args:
f (IDA func_t)
bb (IDA BasicBlock)
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
for bb_handler in BASIC_BLOCK_HANDLERS:
for (feature, ea) in bb_handler(f, bb):

View File

@@ -20,13 +20,13 @@ from capa.features.file import Export, Import, Section
def check_segment_for_pe(seg):
""" check segment for embedded PE
"""check segment for embedded PE
adapted for IDA from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
adapted for IDA from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
args:
seg (IDA segment_t)
args:
seg (IDA segment_t)
"""
seg_max = seg.end_ea
mz_xor = [
@@ -67,11 +67,11 @@ def check_segment_for_pe(seg):
def extract_file_embedded_pe():
""" extract embedded PE features
"""extract embedded PE features
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
for (ea, _) in check_segment_for_pe(seg):
@@ -85,15 +85,15 @@ def extract_file_export_names():
def extract_file_import_names():
""" extract function imports
"""extract function imports
1. imports by ordinal:
- modulename.#ordinal
1. imports by ordinal:
- modulename.#ordinal
2. imports by name, results in two features to support importname-only
matching:
- modulename.importname
- importname
2. imports by name, results in two features to support importname-only
matching:
- modulename.importname
- importname
"""
for (ea, info) in capa.features.extractors.ida.helpers.get_file_imports().items():
if info[1]:
@@ -104,22 +104,22 @@ def extract_file_import_names():
def extract_file_section_names():
""" extract section names
"""extract section names
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
yield Section(idaapi.get_segm_name(seg)), seg.start_ea
def extract_file_strings():
""" extract ASCII and UTF-16 LE strings
"""extract ASCII and UTF-16 LE strings
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments():
seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(seg)

View File

@@ -15,20 +15,20 @@ from capa.features.extractors import loops
def extract_function_calls_to(f):
""" extract callers to a function
"""extract callers to a function
args:
f (IDA func_t)
args:
f (IDA func_t)
"""
for ea in idautils.CodeRefsTo(f.start_ea, True):
yield Characteristic("calls to"), ea
def extract_function_loop(f):
""" extract loop indicators from a function
"""extract loop indicators from a function
args:
f (IDA func_t)
args:
f (IDA func_t)
"""
edges = []
@@ -42,20 +42,20 @@ def extract_function_loop(f):
def extract_recursive_call(f):
""" extract recursive function call
"""extract recursive function call
args:
f (IDA func_t)
args:
f (IDA func_t)
"""
if capa.features.extractors.ida.helpers.is_function_recursive(f):
yield Characteristic("recursive call"), f.start_ea
def extract_features(f):
""" extract function features
"""extract function features
arg:
f (IDA func_t)
arg:
f (IDA func_t)
"""
for func_handler in FUNCTION_HANDLERS:
for (feature, ea) in func_handler(f):

View File

@@ -15,12 +15,12 @@ import idautils
def find_byte_sequence(start, end, seq):
""" find byte sequence
"""find byte sequence
args:
start: min virtual address
end: max virtual address
seq: bytes to search e.g. b'\x01\x03'
args:
start: min virtual address
end: max virtual address
seq: bytes to search e.g. b'\x01\x03'
"""
if sys.version_info[0] >= 3:
return idaapi.find_binary(start, end, " ".join(["%02x" % b for b in seq]), 0, idaapi.SEARCH_DOWN)
@@ -29,14 +29,14 @@ def find_byte_sequence(start, end, seq):
def get_functions(start=None, end=None, skip_thunks=False, skip_libs=False):
""" get functions, range optional
"""get functions, range optional
args:
start: min virtual address
end: max virtual address
args:
start: min virtual address
end: max virtual address
ret:
yield func_t*
ret:
yield func_t*
"""
for ea in idautils.Functions(start=start, end=end):
f = idaapi.get_func(ea)
@@ -45,10 +45,10 @@ def get_functions(start=None, end=None, skip_thunks=False, skip_libs=False):
def get_segments(skip_header_segments=False):
""" get list of segments (sections) in the binary image
"""get list of segments (sections) in the binary image
args:
skip_header_segments: IDA may load header segments - skip if set
args:
skip_header_segments: IDA may load header segments - skip if set
"""
for n in range(idaapi.get_segm_qty()):
seg = idaapi.getnseg(n)
@@ -57,9 +57,9 @@ def get_segments(skip_header_segments=False):
def get_segment_buffer(seg):
""" return bytes stored in a given segment
"""return bytes stored in a given segment
decrease buffer size until IDA is able to read bytes from the segment
decrease buffer size until IDA is able to read bytes from the segment
"""
buff = b""
sz = seg.end_ea - seg.start_ea
@@ -97,13 +97,13 @@ def get_file_imports():
def get_instructions_in_range(start, end):
""" yield instructions in range
"""yield instructions in range
args:
start: virtual address (inclusive)
end: virtual address (exclusive)
yield:
(insn_t*)
args:
start: virtual address (inclusive)
end: virtual address (exclusive)
yield:
(insn_t*)
"""
for head in idautils.Heads(start, end):
insn = idautils.DecodeInstruction(head)
@@ -183,10 +183,10 @@ def find_string_at(ea, min=4):
def get_op_phrase_info(op):
""" parse phrase features from operand
"""parse phrase features from operand
Pretty much dup of sark's implementation:
https://github.com/tmr232/Sark/blob/master/sark/code/instruction.py#L28-L73
Pretty much dup of sark's implementation:
https://github.com/tmr232/Sark/blob/master/sark/code/instruction.py#L28-L73
"""
if op.type not in (idaapi.o_phrase, idaapi.o_displ):
return {}
@@ -269,15 +269,15 @@ def is_op_stack_var(ea, index):
def mask_op_val(op):
""" mask value by data type
"""mask value by data type
necessary due to a bug in AMD64
necessary due to a bug in AMD64
Example:
.rsrc:0054C12C mov [ebp+var_4], 0FFFFFFFFh
Example:
.rsrc:0054C12C mov [ebp+var_4], 0FFFFFFFFh
insn.Op2.dtype == idaapi.dt_dword
insn.Op2.value == 0xffffffffffffffff
insn.Op2.dtype == idaapi.dt_dword
insn.Op2.value == 0xffffffffffffffff
"""
masks = {
idaapi.dt_byte: 0xFF,
@@ -289,10 +289,10 @@ def mask_op_val(op):
def is_function_recursive(f):
""" check if function is recursive
"""check if function is recursive
args:
f (IDA func_t)
args:
f (IDA func_t)
"""
for ref in idautils.CodeRefsTo(f.start_ea, True):
if f.contains(ref):
@@ -301,13 +301,13 @@ def is_function_recursive(f):
def is_basic_block_tight_loop(bb):
""" check basic block loops to self
"""check basic block loops to self
true if last instruction in basic block branches to basic block start
true if last instruction in basic block branches to basic block start
args:
f (IDA func_t)
bb (IDA BasicBlock)
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
bb_end = idc.prev_head(bb.end_ea)
if bb.start_ea < bb_end:
@@ -341,3 +341,21 @@ def find_data_reference_from_insn(insn, max_depth=10):
ea = data_refs[0]
return ea
def get_function_blocks(f):
"""yield basic blocks contained in specified function
args:
f (IDA func_t)
yield:
block (IDA BasicBlock)
"""
# leverage idaapi.FC_NOEXT flag to ignore useless external blocks referenced by the function
for block in idaapi.FlowChart(f, flags=(idaapi.FC_PREDS | idaapi.FC_NOEXT)):
yield block
def is_basic_block_return(bb):
""" check if basic block is return block """
return bb.type == idaapi.fcb_ret

View File

@@ -15,6 +15,10 @@ import capa.features.extractors.ida.helpers
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
from capa.features.insn import Number, Offset, Mnemonic
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def get_arch(ctx):
"""
@@ -62,15 +66,15 @@ def check_for_api_call(ctx, insn):
def extract_insn_api_features(f, bb, insn):
""" parse instruction API features
"""parse instruction API features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
example:
call dword [0x00473038]
example:
call dword [0x00473038]
"""
for api in check_for_api_call(f.ctx, insn):
for (feature, ea) in capa.features.extractors.helpers.generate_api_features(api, insn.ea):
@@ -78,15 +82,15 @@ def extract_insn_api_features(f, bb, insn):
def extract_insn_number_features(f, bb, insn):
""" parse instruction number features
"""parse instruction number features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
example:
push 3136B0h ; dwControlCode
example:
push 3136B0h ; dwControlCode
"""
if idaapi.is_ret_insn(insn):
# skip things like:
@@ -109,15 +113,15 @@ def extract_insn_number_features(f, bb, insn):
def extract_insn_bytes_features(f, bb, insn):
""" parse referenced byte sequences
"""parse referenced byte sequences
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
example:
push offset iid_004118d4_IShellLinkA ; riid
example:
push offset iid_004118d4_IShellLinkA ; riid
"""
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref != insn.ea:
@@ -127,15 +131,15 @@ def extract_insn_bytes_features(f, bb, insn):
def extract_insn_string_features(f, bb, insn):
""" parse instruction string features
"""parse instruction string features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
example:
push offset aAcr ; "ACR > "
example:
push offset aAcr ; "ACR > "
"""
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref != insn.ea:
@@ -145,15 +149,15 @@ def extract_insn_string_features(f, bb, insn):
def extract_insn_offset_features(f, bb, insn):
""" parse instruction structure offset features
"""parse instruction structure offset features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
example:
.text:0040112F cmp [esi+4], ebx
example:
.text:0040112F cmp [esi+4], ebx
"""
for op in capa.features.extractors.ida.helpers.get_insn_ops(insn, target_ops=(idaapi.o_phrase, idaapi.o_displ)):
if capa.features.extractors.ida.helpers.is_op_stack_var(insn.ea, op.n):
@@ -175,11 +179,11 @@ def extract_insn_offset_features(f, bb, insn):
def contains_stack_cookie_keywords(s):
""" check if string contains stack cookie keywords
"""check if string contains stack cookie keywords
Examples:
xor ecx, ebp ; StackCookie
mov eax, ___security_cookie
Examples:
xor ecx, ebp ; StackCookie
mov eax, ___security_cookie
"""
if not s:
return False
@@ -190,30 +194,30 @@ def contains_stack_cookie_keywords(s):
def bb_stack_cookie_registers(bb):
""" scan basic block for stack cookie operations
"""scan basic block for stack cookie operations
yield registers ids that may have been used for stack cookie operations
yield registers ids that may have been used for stack cookie operations
assume instruction that sets stack cookie and nzxor exist in same block
and stack cookie register is not modified prior to nzxor
assume instruction that sets stack cookie and nzxor exist in same block
and stack cookie register is not modified prior to nzxor
Example:
.text:004062DA mov eax, ___security_cookie <-- stack cookie
.text:004062DF mov ecx, eax
.text:004062E1 mov ebx, [esi]
.text:004062E3 and ecx, 1Fh
.text:004062E6 mov edi, [esi+4]
.text:004062E9 xor ebx, eax
.text:004062EB mov esi, [esi+8]
.text:004062EE xor edi, eax <-- ignore
.text:004062F0 xor esi, eax <-- ignore
.text:004062F2 ror edi, cl
.text:004062F4 ror esi, cl
.text:004062F6 ror ebx, cl
.text:004062F8 cmp edi, esi
.text:004062FA jnz loc_40639D
Example:
.text:004062DA mov eax, ___security_cookie <-- stack cookie
.text:004062DF mov ecx, eax
.text:004062E1 mov ebx, [esi]
.text:004062E3 and ecx, 1Fh
.text:004062E6 mov edi, [esi+4]
.text:004062E9 xor ebx, eax
.text:004062EB mov esi, [esi+8]
.text:004062EE xor edi, eax <-- ignore
.text:004062F0 xor esi, eax <-- ignore
.text:004062F2 ror edi, cl
.text:004062F4 ror esi, cl
.text:004062F6 ror ebx, cl
.text:004062F8 cmp edi, esi
.text:004062FA jnz loc_40639D
TODO: this is expensive, but necessary?...
TODO: this is expensive, but necessary?...
"""
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
if contains_stack_cookie_keywords(idc.GetDisasm(insn.ea)):
@@ -223,12 +227,37 @@ def bb_stack_cookie_registers(bb):
yield op.reg
def is_nzxor_stack_cookie_delta(f, bb, insn):
""" check if nzxor exists within stack cookie delta """
# security cookie check should use SP or BP
if not capa.features.extractors.ida.helpers.is_frame_register(insn.Op2.reg):
return False
f_bbs = tuple(capa.features.extractors.ida.helpers.get_function_blocks(f))
# expect security cookie init in first basic block within first bytes (instructions)
if capa.features.extractors.ida.helpers.is_basic_block_equal(bb, f_bbs[0]) and insn.ea < (
bb.start_ea + SECURITY_COOKIE_BYTES_DELTA
):
return True
# ... or within last bytes (instructions) before a return
if capa.features.extractors.ida.helpers.is_basic_block_return(bb) and insn.ea > (
bb.start_ea + capa.features.extractors.ida.helpers.basic_block_size(bb) - SECURITY_COOKIE_BYTES_DELTA
):
return True
return False
def is_nzxor_stack_cookie(f, bb, insn):
""" check if nzxor is related to stack cookie """
if contains_stack_cookie_keywords(idaapi.get_cmt(insn.ea, False)):
# Example:
# xor ecx, ebp ; StackCookie
return True
if is_nzxor_stack_cookie_delta(f, bb, insn):
return True
stack_cookie_regs = tuple(bb_stack_cookie_registers(bb))
if any(op_reg in stack_cookie_regs for op_reg in (insn.Op1.reg, insn.Op2.reg)):
# Example:
@@ -239,14 +268,14 @@ def is_nzxor_stack_cookie(f, bb, insn):
def extract_insn_nzxor_characteristic_features(f, bb, insn):
""" parse instruction non-zeroing XOR instruction
"""parse instruction non-zeroing XOR instruction
ignore expected non-zeroing XORs, e.g. security cookies
ignore expected non-zeroing XORs, e.g. security cookies
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
if insn.itype != idaapi.NN_xor:
return
@@ -258,23 +287,23 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn):
def extract_insn_mnemonic_features(f, bb, insn):
""" parse instruction mnemonic features
"""parse instruction mnemonic features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
yield Mnemonic(insn.get_canon_mnem()), insn.ea
def extract_insn_peb_access_characteristic_features(f, bb, insn):
""" parse instruction peb access
"""parse instruction peb access
fs:[0x30] on x86, gs:[0x60] on x64
fs:[0x30] on x86, gs:[0x60] on x64
TODO:
IDA should be able to do this..
TODO:
IDA should be able to do this..
"""
if insn.itype not in (idaapi.NN_push, idaapi.NN_mov):
return
@@ -291,10 +320,10 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
def extract_insn_segment_access_features(f, bb, insn):
""" parse instruction fs or gs access
"""parse instruction fs or gs access
TODO:
IDA should be able to do this...
TODO:
IDA should be able to do this...
"""
if all(map(lambda op: op.type != idaapi.o_mem, insn.ops)):
# try to optimize for only memory references
@@ -312,12 +341,12 @@ def extract_insn_segment_access_features(f, bb, insn):
def extract_insn_cross_section_cflow(f, bb, insn):
""" inspect the instruction for a CALL or JMP that crosses section boundaries
"""inspect the instruction for a CALL or JMP that crosses section boundaries
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
for ref in idautils.CodeRefsFrom(insn.ea, False):
if ref in get_imports(f.ctx).keys():
@@ -332,14 +361,14 @@ def extract_insn_cross_section_cflow(f, bb, insn):
def extract_function_calls_from(f, bb, insn):
""" extract functions calls from features
"""extract functions calls from features
most relevant at the function scope, however, its most efficient to extract at the instruction scope
most relevant at the function scope, however, its most efficient to extract at the instruction scope
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
if idaapi.is_call_insn(insn):
for ref in idautils.CodeRefsFrom(insn.ea, False):
@@ -347,28 +376,28 @@ def extract_function_calls_from(f, bb, insn):
def extract_function_indirect_call_characteristic_features(f, bb, insn):
""" extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
most relevant at the function or basic block scope;
however, its most efficient to extract at the instruction scope
most relevant at the function or basic block scope;
however, its most efficient to extract at the instruction scope
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
if idaapi.is_call_insn(insn) and idc.get_operand_type(insn.ea, 0) in (idc.o_reg, idc.o_phrase, idc.o_displ):
yield Characteristic("indirect call"), insn.ea
def extract_features(f, bb, insn):
""" extract instruction features
"""extract instruction features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
for inst_handler in INSTRUCTION_HANDLERS:
for (feature, ea) in inst_handler(f, bb, insn):

View File

@@ -1,92 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import lancelot
import capa.features.extractors
import capa.features.extractors.lancelot.file
import capa.features.extractors.lancelot.insn
import capa.features.extractors.lancelot.function
import capa.features.extractors.lancelot.basicblock
__all__ = ["file", "function", "basicblock", "insn"]
logger = logging.getLogger(__name__)
class BB(object):
"""extend the lancelot.BasicBlock with an __int__ method to access the address"""
def __init__(self, ws, bb):
super(BB, self).__init__()
self.ws = ws
self.address = bb.address
self.length = bb.length
self.predecessors = bb.predecessors
self.successors = bb.successors
def __int__(self):
return self.address
@property
def instructions(self):
va = self.address
while va < self.address + self.length:
try:
insn = self.ws.read_insn(va)
except ValueError:
logger.warning("failed to read instruction at 0x%x", va)
return
yield insn
va += insn.length
class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor):
def __init__(self, buf):
super(LancelotFeatureExtractor, self).__init__()
self.buf = buf
self.ws = lancelot.from_bytes(buf)
self.ctx = {}
def get_base_address(self):
return self.ws.base_address
def extract_file_features(self):
for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf):
yield feature, va
def get_functions(self):
for va in self.ws.get_functions():
# this is just the address of the function
yield va
def extract_function_features(self, f):
for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f):
yield feature, va
def get_basic_blocks(self, f):
try:
cfg = self.ws.build_cfg(f)
except:
logger.warning("failed to build CFG for 0x%x", f)
return
else:
for bb in cfg.basic_blocks.values():
yield BB(self.ws, bb)
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb):
yield feature, va
def get_instructions(self, f, bb):
return bb.instructions
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn):
yield feature, va

View File

@@ -1,120 +0,0 @@
import string
import struct
import logging
from lancelot import (
FLOW_VA,
OPERAND_SIZE,
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
)
from capa.features import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
logger = logging.getLogger(__name__)
def extract_bb_tight_loop(ws, bb):
""" check basic block for tight loop indicators """
if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors):
yield Characteristic("tight loop"), bb.address
def is_mov_imm_to_stack(insn):
if not insn.mnemonic.startswith("mov"):
return False
try:
dst, src = insn.operands
except ValueError:
# not two operands
return False
if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
return False
if src[IMMEDIATE_OPERAND_VALUE] < 0:
return False
if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
return False
if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"):
return False
return True
def is_printable_ascii(chars):
return all(c < 127 and chr(c) in string.printable for c in chars)
def is_printable_utf16le(chars):
if all(c == b"\x00" for c in chars[1::2]):
return is_printable_ascii(chars[::2])
def get_printable_len(operand):
"""
Return string length if all operand bytes are ascii or utf16-le printable
"""
operand_size = operand[OPERAND_SIZE]
if operand_size == 8:
chars = struct.pack("<B", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 16:
chars = struct.pack("<H", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 32:
chars = struct.pack("<I", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 64:
chars = struct.pack("<Q", operand[IMMEDIATE_OPERAND_VALUE])
else:
raise ValueError("unexpected operand size: " + str(operand_size))
if is_printable_ascii(chars):
return operand_size / 8
if is_printable_utf16le(chars):
return operand_size / 16
return 0
def _bb_has_stackstring(ws, bb):
"""
extract potential stackstring creation, using the following heuristics:
- basic block contains enough moves of constant bytes to the stack
"""
count = 0
for insn in bb.instructions:
if is_mov_imm_to_stack(insn):
# add number of operand bytes
src = insn.operands[1]
count += get_printable_len(src)
if count > MIN_STACKSTRING_LEN:
return True
return False
def extract_stackstring(ws, bb):
""" check basic block for stackstring indicators """
if _bb_has_stackstring(ws, bb):
yield Characteristic("stack string"), bb.address
def extract_basic_block_features(ws, bb):
yield BasicBlock(), bb.address
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, va in bb_handler(ws, bb):
yield feature, va
BASIC_BLOCK_HANDLERS = (
extract_bb_tight_loop,
extract_stackstring,
)

View File

@@ -1,81 +0,0 @@
import pefile
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section
def extract_file_embedded_pe(buf, pe):
buf = buf[2:]
total_offset = 2
while True:
try:
offset = buf.index(b"MZ")
except ValueError:
return
else:
rest = buf[offset:]
total_offset += offset
try:
_ = pefile.PE(data=rest)
except:
pass
else:
yield Characteristic("embedded pe"), total_offset
buf = rest[2:]
total_offset += 2
def extract_file_export_names(buf, pe):
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return
base_address = pe.OPTIONAL_HEADER.ImageBase
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
yield Export(exp.name.decode("ascii")), base_address + exp.address
def extract_file_import_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address
else:
impname = imp.name.decode("ascii")
yield Import("%s.%s" % (libname, impname)), imp.address
yield Import("%s" % (impname)), imp.address
def extract_file_section_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for section in pe.sections:
yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress
def extract_file_strings(buf, pe):
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
yield String(s.s), s.offset
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
yield String(s.s), s.offset
def extract_file_features(buf):
pe = pefile.PE(data=buf)
for file_handler in FILE_HANDLERS:
for feature, va in file_handler(buf, pe):
yield feature, va
FILE_HANDLERS = (
extract_file_embedded_pe,
extract_file_export_names,
extract_file_import_names,
extract_file_section_names,
extract_file_strings,
)

View File

@@ -1,64 +0,0 @@
import logging
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
FLOW_VA,
FLOW_TYPE,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
FLOW_TYPE_UNCONDITIONAL_JUMP,
)
from capa.features import Characteristic
from capa.features.extractors import loops
logger = logging.getLogger(__name__)
@lru_cache
def get_call_graph(ws):
return ws.build_call_graph()
def extract_function_calls_to(ws, f):
cg = get_call_graph(ws)
for caller in cg.calls_to.get(f, []):
yield Characteristic("calls to"), caller
def extract_function_loop(ws, f):
edges = []
for bb in ws.build_cfg(f).basic_blocks.values():
for flow in bb.successors:
if flow[FLOW_TYPE] in (
FLOW_TYPE_UNCONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
):
edges.append((bb.address, flow[FLOW_VA]))
continue
if edges and loops.has_loop(edges):
yield Characteristic("loop"), f
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
_not_implemented = set([])
def extract_function_features(ws, f):
for func_handler in FUNCTION_HANDLERS:
try:
for feature, va in func_handler(ws, f):
yield feature, va
except NotImplementedError:
if func_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", func_handler.__name__)
_not_implemented.add(func_handler.__name__)

View File

@@ -1,33 +0,0 @@
from lancelot import (
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
def get_operand_target(insn, op):
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
# call direct, x64
# rip relative
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
if op[MEMORY_OPERAND_BASE] == "rip":
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
# call direct, x32
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
elif op[MEMORY_OPERAND_BASE] == None:
return op[MEMORY_OPERAND_DISP]
# call via thunk
# mimikatz:0x455A41 call LsaQueryInformationPolicy
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
return op[IMMEDIATE_OPERAND_VALUE]
raise ValueError("memory operand has no target")

View File

@@ -1,149 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import collections
from lancelot import (
FLOW_VA,
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
from capa.features.extractors.lancelot.helpers import get_operand_target
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
class NotFoundError(Exception):
pass
def read_instructions(ws, bb):
va = bb.address
while va < bb.address + bb.length:
try:
insn = ws.read_insn(va)
except ValueError:
return
yield insn
va += insn.length
def build_instruction_predecessors(ws, cfg):
preds = collections.defaultdict(set)
for bb in cfg.basic_blocks.values():
insns = list(read_instructions(ws, bb))
for i, insn in enumerate(insns):
if i == 0:
for pred in bb.predecessors:
pred_bb = cfg.basic_blocks[pred[FLOW_VA]]
preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address)
else:
preds[insn.address].add(insns[i - 1].address)
return preds
def find_definition(ws, f, insn):
"""
scan backwards from the given address looking for assignments to the given register.
if a constant, return that value.
args:
ws (lancelot.PE)
f (int): the function start address
insn (lancelot.Instruction): call instruction to resolve
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert insn.mnemonic == "call"
op0 = insn.operands[0]
assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
reg = op0[REGISTER_OPERAND_REGISTER]
cfg = ws.build_cfg(f)
preds = build_instruction_predecessors(ws, cfg)
q = collections.deque()
seen = set([])
q.extend(preds[insn.address])
while q:
cur = q.popleft()
# skip if we've already processed this location
if cur in seen:
continue
seen.add(cur)
insn = ws.read_insn(cur)
operands = insn.operands
if len(operands) == 0:
q.extend(preds[cur])
continue
op0 = operands[0]
if not (
op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and op0[REGISTER_OPERAND_REGISTER] == reg
and insn.mnemonic in DESTRUCTIVE_MNEMONICS
):
q.extend(preds[cur])
continue
# if we reach here, the instruction is destructive to our target register.
# we currently only support extracting the constant from something like: `mov $reg, IAT`
# so, any other pattern results in an unknown value, represented by None.
# this is a good place to extend in the future, if we need more robust support.
if insn.mnemonic != "mov":
return (cur, None)
else:
op1 = operands[1]
try:
target = get_operand_target(insn, op1)
except ValueError:
return (cur, None)
else:
return (cur, target)
raise NotFoundError()
def is_indirect_call(insn):
return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
def resolve_indirect_call(ws, f, insn):
"""
inspect the given indirect call instruction and attempt to resolve the target address.
args:
ws (lancelot.PE): the analysis workspace
f (int): the address of the function to analyze
insn (lancelot.Instruction): the instruction at which to start analysis
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert is_indirect_call(insn)
return find_definition(ws, f, insn)

View File

@@ -1,487 +0,0 @@
import logging
import itertools
import pefile
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
import capa.features.extractors.helpers
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
from capa.features.insn import Number, Offset, Mnemonic
from capa.features.extractors.lancelot.helpers import get_operand_target
from capa.features.extractors.lancelot.function import get_call_graph
from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call
logger = logging.getLogger(__name__)
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def get_arch(ws):
if ws.arch == "x32":
return ARCH_X32
elif ws.arch == "x64":
return ARCH_X64
else:
raise ValueError("unexpected architecture")
@lru_cache
def get_pefile(xtor):
return pefile.PE(data=xtor.buf)
@lru_cache
def get_imports(xtor):
pe = get_pefile(xtor)
imports = {}
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
imports[imp.address] = "%s.#%s" % (libname, imp.ordinal)
else:
impname = imp.name.decode("ascii")
imports[imp.address] = "%s.%s" % (libname, impname)
return imports
@lru_cache
def get_thunks(xtor):
thunks = {}
for va in xtor.ws.get_functions():
try:
insn = xtor.ws.read_insn(va)
except ValueError:
continue
if insn.mnemonic != "jmp":
continue
op0 = insn.operands[0]
try:
target = get_operand_target(insn, op0)
except ValueError:
continue
imports = get_imports(xtor)
if target not in imports:
continue
thunks[va] = imports[target]
return thunks
def extract_insn_api_features(xtor, f, bb, insn):
"""parse API features from the given instruction."""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
try:
(_, target) = resolve_indirect_call(xtor.ws, f, insn)
except NotFoundError:
return
if target is None:
return
else:
try:
target = get_operand_target(insn, op0)
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
yield feature, va
return
thunks = get_thunks(xtor)
if target in thunks:
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
yield feature, va
def extract_insn_mnemonic_features(xtor, f, bb, insn):
"""parse mnemonic features from the given instruction."""
yield Mnemonic(insn.mnemonic), insn.address
def extract_insn_number_features(xtor, f, bb, insn):
"""parse number features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
continue
v = operand[IMMEDIATE_OPERAND_VALUE]
if xtor.ws.probe(v) & PERMISSION_READ:
# v is a valid address
# therefore, assume its not also a constant.
continue
if (
insn.mnemonic == "add"
and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and operands[0][REGISTER_OPERAND_REGISTER] == "esp"
):
# skip things like:
#
# .text:00401140 call sub_407E2B
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.address
yield Number(v, arch=get_arch(xtor.ws)), insn.address
def extract_insn_offset_features(xtor, f, bb, insn):
"""parse structure offset features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
continue
if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"):
continue
# lancelot provides `None` when the displacement is not present.
v = operand[MEMORY_OPERAND_DISP] or 0
yield Offset(v), insn.address
yield Offset(v, arch=get_arch(xtor.ws)), insn.address
def derefs(xtor, p):
"""
recursively follow the given pointer, yielding the valid memory addresses along the way.
useful when you may have a pointer to string, or pointer to pointer to string, etc.
this is a "do what i mean" type of helper function.
"""
depth = 0
while True:
if not xtor.ws.probe(p) & PERMISSION_READ:
return
yield p
next = xtor.ws.read_pointer(p)
# sanity: pointer points to self
if next == p:
return
# sanity: avoid chains of pointers that are unreasonably deep
depth += 1
if depth > 10:
return
p = next
def read_bytes(xtor, va):
"""
read up to MAX_BYTES_FEATURE_SIZE from the given address.
raises:
ValueError: if the given address is not valid.
"""
start = va
end = va + MAX_BYTES_FEATURE_SIZE
pe = get_pefile(xtor)
for section in pe.sections:
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= start < section_end:
end = min(end, section_end)
return xtor.ws.read_bytes(start, end - start)
raise ValueError("invalid address")
# these are mnemonics that may flow (jump) elsewhere
FLOW_MNEMONICS = set(
[
"call",
"jb",
"jbe",
"jcxz",
"jecxz",
"jknzd",
"jkzd",
"jl",
"jle",
"jmp",
"jnb",
"jnbe",
"jnl",
"jnle",
"jno",
"jnp",
"jns",
"jnz",
"jo",
"jp",
"jrcxz",
"js",
"jz",
]
)
def extract_insn_bytes_features(xtor, f, bb, insn):
"""
parse byte sequence features from the given instruction.
"""
if insn.mnemonic in FLOW_MNEMONICS:
return
for operand in insn.operands:
try:
target = get_operand_target(insn, operand)
except ValueError:
continue
for ptr in derefs(xtor, target):
try:
buf = read_bytes(xtor, ptr)
except ValueError:
continue
if capa.features.extractors.helpers.all_zeros(buf):
continue
yield Bytes(buf), insn.address
def first(s):
"""enumerate the first element in the sequence"""
for i in s:
yield i
break
def extract_insn_string_features(xtor, f, bb, insn):
"""parse string features from the given instruction."""
for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn):
buf = bytez.value
for s in itertools.chain(
first(capa.features.extractors.strings.extract_ascii_strings(buf)),
first(capa.features.extractors.strings.extract_unicode_strings(buf)),
):
if s.offset == 0:
yield String(s.s), va
def is_security_cookie(xtor, f, bb, insn):
"""
check if an instruction is related to security cookie checks
"""
op1 = insn.operands[1]
if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in (
"esp",
"ebp",
"rbp",
"rsp",
):
return False
# expect security cookie init in first basic block within first bytes (instructions)
if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
insns = list(xtor.get_instructions(f, bb))
if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA):
return True
return False
def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn):
"""
parse non-zeroing XOR instruction from the given instruction.
ignore expected non-zeroing XORs, e.g. security cookies.
"""
if insn.mnemonic != "xor":
return
operands = insn.operands
if operands[0] == operands[1]:
return
if is_security_cookie(xtor, f, bb, insn):
return
yield Characteristic("nzxor"), insn.address
def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn):
"""
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
"""
for operand in insn.operands:
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "gs"
and operand[MEMORY_OPERAND_DISP] == 0x60
):
yield Characteristic("peb access"), insn.address
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "fs"
and operand[MEMORY_OPERAND_DISP] == 0x30
):
yield Characteristic("peb access"), insn.address
def extract_insn_segment_access_features(xtor, f, bb, insn):
""" parse the instruction for access to fs or gs """
for operand in insn.operands:
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs":
yield Characteristic("gs access"), insn.address
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs":
yield Characteristic("fs access"), insn.address
def get_section(xtor, va):
pe = get_pefile(xtor)
for i, section in enumerate(pe.sections):
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= va < section_end:
return i
raise ValueError("invalid address")
def extract_insn_cross_section_cflow(xtor, f, bb, insn):
"""
inspect the instruction for a CALL or JMP that crosses section boundaries.
"""
if insn.mnemonic not in FLOW_MNEMONICS:
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
if target in get_imports(xtor):
return
try:
if get_section(xtor, insn.address) != get_section(xtor, target):
yield Characteristic("cross section flow"), insn.address
except ValueError:
return
def extract_function_calls_from(xtor, f, bb, insn):
cg = get_call_graph(xtor.ws)
for callee in cg.calls_from.get(insn.address, []):
yield Characteristic("calls from"), callee
if callee == f:
yield Characteristic("recursive call"), insn.address
# lancelot doesn't count API calls when constructing the call graph
# so we still have to scan for calls to an import
if insn.mnemonic != "call":
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
yield Characteristic("calls from"), target
# this is a feature that's most relevant at the function or basic block scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn):
"""
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None:
yield Characteristic("indirect call"), insn.address
_not_implemented = set([])
def extract_insn_features(xtor, f, bb, insn):
for insn_handler in INSTRUCTION_HANDLERS:
try:
for feature, va in insn_handler(xtor, f, bb, insn):
yield feature, va
except NotImplementedError:
if insn_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", insn_handler.__name__)
_not_implemented.add(insn_handler.__name__)
INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_string_features,
extract_insn_bytes_features,
extract_insn_offset_features,
extract_insn_nzxor_characteristic_features,
extract_insn_mnemonic_features,
extract_insn_peb_access_characteristic_features,
extract_insn_cross_section_cflow,
extract_insn_segment_access_features,
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
)

View File

@@ -11,14 +11,14 @@ from networkx.algorithms.components import strongly_connected_components
def has_loop(edges, threshold=2):
""" check if a list of edges representing a directed graph contains a loop
"""check if a list of edges representing a directed graph contains a loop
args:
edges: list of edge sets representing a directed graph i.e. [(1, 2), (2, 1)]
threshold: min number of nodes contained in loop
args:
edges: list of edge sets representing a directed graph i.e. [(1, 2), (2, 1)]
threshold: min number of nodes contained in loop
returns:
bool
returns:
bool
"""
g = nx.DiGraph()
g.add_edges_from(edges)

View File

@@ -8,7 +8,11 @@
import types
import file
import insn
import function
import viv_utils
import basicblock
import capa.features.extractors
import capa.features.extractors.viv.file

View File

@@ -84,7 +84,16 @@ def dumps(extractor):
returns:
str: the serialized features.
"""
ret = {"version": 1, "functions": {}, "scopes": {"file": [], "function": [], "basic block": [], "instruction": [],}}
ret = {
"version": 1,
"functions": {},
"scopes": {
"file": [],
"function": [],
"basic block": [],
"instruction": [],
},
}
for feature, va in extractor.extract_file_features():
ret["scopes"]["file"].append(serialize_feature(feature) + (hex(va), ()))
@@ -99,7 +108,16 @@ def dumps(extractor):
ret["functions"][hex(f)][hex(bb)] = []
for feature, va in extractor.extract_basic_block_features(f, bb):
ret["scopes"]["basic block"].append(serialize_feature(feature) + (hex(va), (hex(f), hex(bb),)))
ret["scopes"]["basic block"].append(
serialize_feature(feature)
+ (
hex(va),
(
hex(f),
hex(bb),
),
)
)
for insnva, insn in sorted(
[(insn.__int__(), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0]
@@ -108,7 +126,15 @@ def dumps(extractor):
for feature, va in extractor.extract_insn_features(f, bb, insn):
ret["scopes"]["instruction"].append(
serialize_feature(feature) + (hex(va), (hex(f), hex(bb), hex(insnva),))
serialize_feature(feature)
+ (
hex(va),
(
hex(f),
hex(bb),
hex(insnva),
),
)
)
return json.dumps(ret)

View File

@@ -24,10 +24,7 @@ class Number(Feature):
super(Number, self).__init__(value, arch=arch, description=description)
def get_value_str(self):
if self.value < 0:
return "-0x%X" % (-self.value)
else:
return "0x%X" % self.value
return "0x%X" % self.value
class Offset(Feature):

View File

@@ -17,9 +17,9 @@ import capa.ida.helpers
def info_to_name(display):
""" extract root value from display name
"""extract root value from display name
e.g. function(my_function) => my_function
e.g. function(my_function) => my_function
"""
try:
return display.split("(")[1].rstrip(")")
@@ -68,16 +68,16 @@ class CapaExplorerDataItem(object):
return self._checked
def appendChild(self, item):
""" add child item
"""add child item
@param item: CapaExplorerDataItem*
@param item: CapaExplorerDataItem*
"""
self.children.append(item)
def child(self, row):
""" get child row
"""get child row
@param row: TODO
@param row: TODO
"""
return self.children[row]

View File

@@ -65,11 +65,11 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
self.endResetModel()
def columnCount(self, model_index):
""" get the number of columns for the children of the given parent
"""get the number of columns for the children of the given parent
@param model_index: QModelIndex*
@param model_index: QModelIndex*
@retval column count
@retval column count
"""
if model_index.isValid():
return model_index.internalPointer().columnCount()
@@ -77,12 +77,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return self.root_node.columnCount()
def data(self, model_index, role):
""" get data stored under the given role for the item referred to by the index
"""get data stored under the given role for the item referred to by the index
@param model_index: QModelIndex*
@param role: QtCore.Qt.*
@param model_index: QModelIndex*
@param role: QtCore.Qt.*
@retval data to be displayed
@retval data to be displayed
"""
if not model_index.isValid():
return None
@@ -151,11 +151,11 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return None
def flags(self, model_index):
""" get item flags for given index
"""get item flags for given index
@param model_index: QModelIndex*
@param model_index: QModelIndex*
@retval QtCore.Qt.ItemFlags
@retval QtCore.Qt.ItemFlags
"""
if not model_index.isValid():
return QtCore.Qt.NoItemFlags
@@ -163,13 +163,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return model_index.internalPointer().flags
def headerData(self, section, orientation, role):
""" get data for the given role and section in the header with the specified orientation
"""get data for the given role and section in the header with the specified orientation
@param section: int
@param orientation: QtCore.Qt.Orientation
@param role: QtCore.Qt.DisplayRole
@param section: int
@param orientation: QtCore.Qt.Orientation
@param role: QtCore.Qt.DisplayRole
@retval header data list()
@retval header data list()
"""
if orientation == QtCore.Qt.Horizontal and role == QtCore.Qt.DisplayRole:
return self.root_node.data(section)
@@ -177,13 +177,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return None
def index(self, row, column, parent):
""" get index of the item in the model specified by the given row, column and parent index
"""get index of the item in the model specified by the given row, column and parent index
@param row: int
@param column: int
@param parent: QModelIndex*
@param row: int
@param column: int
@param parent: QModelIndex*
@retval QModelIndex*
@retval QModelIndex*
"""
if not self.hasIndex(row, column, parent):
return QtCore.QModelIndex()
@@ -201,13 +201,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return QtCore.QModelIndex()
def parent(self, model_index):
""" get parent of the model item with the given index
"""get parent of the model item with the given index
if the item has no parent, an invalid QModelIndex* is returned
if the item has no parent, an invalid QModelIndex* is returned
@param model_index: QModelIndex*
@param model_index: QModelIndex*
@retval QModelIndex*
@retval QModelIndex*
"""
if not model_index.isValid():
return QtCore.QModelIndex()
@@ -221,12 +221,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return self.createIndex(parent.row(), 0, parent)
def iterateChildrenIndexFromRootIndex(self, model_index, ignore_root=True):
""" depth-first traversal of child nodes
"""depth-first traversal of child nodes
@param model_index: QModelIndex*
@param ignore_root: if set, do not return root index
@param model_index: QModelIndex*
@param ignore_root: if set, do not return root index
@retval yield QModelIndex*
@retval yield QModelIndex*
"""
visited = set()
stack = deque((model_index,))
@@ -248,10 +248,10 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
stack.append(child_index.child(idx, 0))
def reset_ida_highlighting(self, item, checked):
""" reset IDA highlight for an item
"""reset IDA highlight for an item
@param item: capa explorer item
@param checked: indicates item is or not checked
@param item: capa explorer item
@param checked: indicates item is or not checked
"""
if not isinstance(
item, (CapaExplorerStringViewItem, CapaExplorerInstructionViewItem, CapaExplorerByteViewItem)
@@ -275,13 +275,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
idc.set_color(item.location, idc.CIC_ITEM, item.ida_highlight)
def setData(self, model_index, value, role):
""" set the role data for the item at index to value
"""set the role data for the item at index to value
@param model_index: QModelIndex*
@param value: QVariant*
@param role: QtCore.Qt.EditRole
@param model_index: QModelIndex*
@param value: QVariant*
@param role: QtCore.Qt.EditRole
@retval True/False
@retval True/False
"""
if not model_index.isValid():
return False
@@ -316,14 +316,14 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return False
def rowCount(self, model_index):
""" get the number of rows under the given parent
"""get the number of rows under the given parent
when the parent is valid it means that is returning the number of
children of parent
when the parent is valid it means that is returning the number of
children of parent
@param model_index: QModelIndex*
@param model_index: QModelIndex*
@retval row count
@retval row count
"""
if model_index.column() > 0:
return 0
@@ -336,24 +336,30 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return item.childCount()
def render_capa_doc_statement_node(self, parent, statement, locations, doc):
""" render capa statement read from doc
"""render capa statement read from doc
@param parent: parent to which new child is assigned
@param statement: statement read from doc
@param locations: locations of children (applies to range only?)
@param doc: capa result doc
@param parent: parent to which new child is assigned
@param statement: statement read from doc
@param locations: locations of children (applies to range only?)
@param doc: capa result doc
"statement": {
"type": "or"
},
"statement": {
"type": "or"
},
"""
if statement["type"] in ("and", "or", "optional"):
return CapaExplorerDefaultItem(parent, statement["type"])
display = statement["type"]
if statement.get("description"):
display += " (%s)" % statement["description"]
return CapaExplorerDefaultItem(parent, display)
elif statement["type"] == "not":
# TODO: do we display 'not'
pass
elif statement["type"] == "some":
return CapaExplorerDefaultItem(parent, str(statement["count"]) + " or more")
display = "%d or more" % statement["count"]
if statement.get("description"):
display += " (%s)" % statement["description"]
return CapaExplorerDefaultItem(parent, display)
elif statement["type"] == "range":
# `range` is a weird node, its almost a hybrid of statement + feature.
# it is a specific feature repeated multiple times.
@@ -370,6 +376,9 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
else:
display += "between %d and %d" % (statement["min"], statement["max"])
if statement.get("description"):
display += " (%s)" % statement["description"]
parent2 = CapaExplorerFeatureItem(parent, display=display)
for location in locations:
@@ -378,33 +387,36 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return parent2
elif statement["type"] == "subscope":
return CapaExplorerSubscopeItem(parent, statement[statement["type"]])
display = statement[statement["type"]]
if statement.get("description"):
display += " (%s)" % statement["description"]
return CapaExplorerSubscopeItem(parent, display)
else:
raise RuntimeError("unexpected match statement type: " + str(statement))
def render_capa_doc_match(self, parent, match, doc):
""" render capa match read from doc
"""render capa match read from doc
@param parent: parent node to which new child is assigned
@param match: match read from doc
@param doc: capa result doc
@param parent: parent node to which new child is assigned
@param match: match read from doc
@param doc: capa result doc
"matches": {
"0": {
"children": [],
"locations": [
4317184
],
"node": {
"feature": {
"section": ".rsrc",
"type": "section"
},
"type": "feature"
"matches": {
"0": {
"children": [],
"locations": [
4317184
],
"node": {
"feature": {
"section": ".rsrc",
"type": "section"
},
"success": true
}
},
"type": "feature"
},
"success": true
}
},
"""
if not match["success"]:
# TODO: display failed branches at some point? Help with debugging rules?
@@ -431,9 +443,9 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
self.render_capa_doc_match(parent2, child, doc)
def render_capa_doc(self, doc):
""" render capa features specified in doc
"""render capa features specified in doc
@param doc: capa result doc
@param doc: capa result doc
"""
# inform model that changes are about to occur
self.beginResetModel()
@@ -457,18 +469,18 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
self.endResetModel()
def capa_doc_feature_to_display(self, feature):
""" convert capa doc feature type string to display string for ui
"""convert capa doc feature type string to display string for ui
@param feature: capa feature read from doc
@param feature: capa feature read from doc
Example:
"feature": {
"bytes": "01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46",
"description": "CLSID_ShellLink",
"type": "bytes"
}
Example:
"feature": {
"bytes": "01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46",
"description": "CLSID_ShellLink",
"type": "bytes"
}
bytes(01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_ShellLink)
bytes(01 14 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_ShellLink)
"""
if feature[feature["type"]]:
if feature.get("description", ""):
@@ -479,25 +491,31 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return "%s" % feature["type"]
def render_capa_doc_feature_node(self, parent, feature, locations, doc):
""" process capa doc feature node
"""process capa doc feature node
@param parent: parent node to which child is assigned
@param feature: capa doc feature node
@param locations: locations identified for feature
@param doc: capa doc
@param parent: parent node to which child is assigned
@param feature: capa doc feature node
@param locations: locations identified for feature
@param doc: capa doc
Example:
"feature": {
"description": "FILE_WRITE_DATA",
"number": "0x2",
"type": "number"
}
Example:
"feature": {
"description": "FILE_WRITE_DATA",
"number": "0x2",
"type": "number"
}
"""
display = self.capa_doc_feature_to_display(feature)
if len(locations) == 1:
# only one location for feature so no need to nest children
parent2 = self.render_capa_doc_feature(parent, feature, next(iter(locations)), doc, display=display,)
parent2 = self.render_capa_doc_feature(
parent,
feature,
next(iter(locations)),
doc,
display=display,
)
else:
# feature has multiple children, nest under one parent feature node
parent2 = CapaExplorerFeatureItem(parent, display)
@@ -508,20 +526,20 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
return parent2
def render_capa_doc_feature(self, parent, feature, location, doc, display="-"):
""" render capa feature read from doc
"""render capa feature read from doc
@param parent: parent node to which new child is assigned
@param feature: feature read from doc
@param doc: capa feature doc
@param location: address of feature
@param display: text to display in plugin ui
@param parent: parent node to which new child is assigned
@param feature: feature read from doc
@param doc: capa feature doc
@param location: address of feature
@param display: text to display in plugin ui
Example:
"feature": {
"description": "FILE_WRITE_DATA",
"number": "0x2",
"type": "number"
}
Example:
"feature": {
"description": "FILE_WRITE_DATA",
"number": "0x2",
"type": "number"
}
"""
# special handling for characteristic pending type
if feature["type"] == "characteristic":
@@ -575,10 +593,10 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
raise RuntimeError("unexpected feature type: " + str(feature["type"]))
def update_function_name(self, old_name, new_name):
""" update all instances of old function name with new function name
"""update all instances of old function name with new function name
@param old_name: previous function name
@param new_name: new function name
@param old_name: previous function name
@param new_name: new function name
"""
# create empty root index for search
root_index = self.index(0, 0, QtCore.QModelIndex())

View File

@@ -16,13 +16,16 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
""" """
super(CapaExplorerSortFilterProxyModel, self).__init__(parent)
self.min_ea = None
self.max_ea = None
def lessThan(self, left, right):
""" true if the value of the left item is less than value of right item
"""true if the value of the left item is less than value of right item
@param left: QModelIndex*
@param right: QModelIndex*
@param left: QModelIndex*
@param right: QModelIndex*
@retval True/False
@retval True/False
"""
ldata = left.internalPointer().data(left.column())
rdata = right.internalPointer().data(right.column())
@@ -40,13 +43,13 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
return ldata.lower() < rdata.lower()
def filterAcceptsRow(self, row, parent):
""" true if the item in the row indicated by the given row and parent
should be included in the model; otherwise returns false
"""true if the item in the row indicated by the given row and parent
should be included in the model; otherwise returns false
@param row: int
@param parent: QModelIndex*
@param row: int
@param parent: QModelIndex*
@retval True/False
@retval True/False
"""
if self.filter_accepts_row_self(row, parent):
return True
@@ -62,15 +65,6 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
return False
def add_single_string_filter(self, column, string):
""" add fixed string filter
@param column: key column
@param string: string to sort
"""
self.setFilterKeyColumn(column)
self.setFilterFixedString(string)
def index_has_accepted_children(self, row, parent):
""" """
model_index = self.sourceModel().index(row, 0, parent)
@@ -86,4 +80,33 @@ class CapaExplorerSortFilterProxyModel(QtCore.QSortFilterProxyModel):
def filter_accepts_row_self(self, row, parent):
""" """
return super(CapaExplorerSortFilterProxyModel, self).filterAcceptsRow(row, parent)
# filter not set
if self.min_ea is None and self.max_ea is None:
return True
index = self.sourceModel().index(row, 0, parent)
data = index.internalPointer().data(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS)
if not data:
return False
ea = int(data, 16)
if self.min_ea <= ea and ea < self.max_ea:
return True
return False
def add_address_range_filter(self, min_ea, max_ea):
""" """
self.min_ea = min_ea
self.max_ea = max_ea
self.setFilterKeyColumn(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS)
self.invalidateFilter()
def reset_address_range_filter(self):
""" """
self.min_ea = None
self.max_ea = None
self.invalidateFilter()

View File

@@ -15,13 +15,13 @@ from capa.ida.explorer.model import CapaExplorerDataModel
class CapaExplorerQtreeView(QtWidgets.QTreeView):
""" capa explorer QTreeView implementation
"""capa explorer QTreeView implementation
view controls UI action responses and displays data from
CapaExplorerDataModel
view controls UI action responses and displays data from
CapaExplorerDataModel
view does not modify CapaExplorerDataModel directly - data
modifications should be implemented in CapaExplorerDataModel
view does not modify CapaExplorerDataModel directly - data
modifications should be implemented in CapaExplorerDataModel
"""
def __init__(self, model, parent=None):
@@ -54,12 +54,12 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
self.setStyleSheet("QTreeView::item {padding-right: 15 px;padding-bottom: 2 px;}")
def reset(self):
""" reset user interface changes
"""reset user interface changes
called when view should reset any user interface changes
made since the last reset e.g. IDA window highlighting
called when view should reset any user interface changes
made since the last reset e.g. IDA window highlighting
"""
self.collapseAll()
self.expandToDepth(0)
self.resize_columns_to_content()
def resize_columns_to_content(self):
@@ -67,31 +67,31 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
self.header().resizeSections(QtWidgets.QHeaderView.ResizeToContents)
def map_index_to_source_item(self, model_index):
""" map proxy model index to source model item
"""map proxy model index to source model item
@param model_index: QModelIndex*
@param model_index: QModelIndex*
@retval QObject*
@retval QObject*
"""
return self.model.mapToSource(model_index).internalPointer()
def send_data_to_clipboard(self, data):
""" copy data to the clipboard
"""copy data to the clipboard
@param data: data to be copied
@param data: data to be copied
"""
clip = QtWidgets.QApplication.clipboard()
clip.clear(mode=clip.Clipboard)
clip.setText(data, mode=clip.Clipboard)
def new_action(self, display, data, slot):
""" create action for context menu
"""create action for context menu
@param display: text displayed to user in context menu
@param data: data passed to slot
@param slot: slot to connect
@param display: text displayed to user in context menu
@param data: data passed to slot
@param slot: slot to connect
@retval QAction*
@retval QAction*
"""
action = QtWidgets.QAction(display, self.parent)
action.setData(data)
@@ -100,11 +100,11 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
return action
def load_default_context_menu_actions(self, data):
""" yield actions specific to function custom context menu
"""yield actions specific to function custom context menu
@param data: tuple
@param data: tuple
@yield QAction*
@yield QAction*
"""
default_actions = (
("Copy column", data, self.slot_copy_column),
@@ -116,11 +116,11 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
yield self.new_action(*action)
def load_function_context_menu_actions(self, data):
""" yield actions specific to function custom context menu
"""yield actions specific to function custom context menu
@param data: tuple
@param data: tuple
@yield QAction*
@yield QAction*
"""
function_actions = (("Rename function", data, self.slot_rename_function),)
@@ -133,15 +133,15 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
yield action
def load_default_context_menu(self, pos, item, model_index):
""" create default custom context menu
"""create default custom context menu
creates custom context menu containing default actions
creates custom context menu containing default actions
@param pos: TODO
@param item: TODO
@param model_index: TODO
@param pos: TODO
@param item: TODO
@param model_index: TODO
@retval QMenu*
@retval QMenu*
"""
menu = QtWidgets.QMenu()
@@ -151,16 +151,16 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
return menu
def load_function_item_context_menu(self, pos, item, model_index):
""" create function custom context menu
"""create function custom context menu
creates custom context menu containing actions specific to functions
and the default actions
creates custom context menu containing actions specific to functions
and the default actions
@param pos: TODO
@param item: TODO
@param model_index: TODO
@param pos: TODO
@param item: TODO
@param model_index: TODO
@retval QMenu*
@retval QMenu*
"""
menu = QtWidgets.QMenu()
@@ -170,43 +170,43 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
return menu
def show_custom_context_menu(self, menu, pos):
""" display custom context menu in view
"""display custom context menu in view
@param menu: TODO
@param pos: TODO
@param menu: TODO
@param pos: TODO
"""
if menu:
menu.exec_(self.viewport().mapToGlobal(pos))
def slot_copy_column(self, action):
""" slot connected to custom context menu
"""slot connected to custom context menu
allows user to select a column and copy the data
to clipboard
allows user to select a column and copy the data
to clipboard
@param action: QAction*
@param action: QAction*
"""
_, item, model_index = action.data()
self.send_data_to_clipboard(item.data(model_index.column()))
def slot_copy_row(self, action):
""" slot connected to custom context menu
"""slot connected to custom context menu
allows user to select a row and copy the space-delimited
data to clipboard
allows user to select a row and copy the space-delimited
data to clipboard
@param action: QAction*
@param action: QAction*
"""
_, item, _ = action.data()
self.send_data_to_clipboard(str(item))
def slot_rename_function(self, action):
""" slot connected to custom context menu
"""slot connected to custom context menu
allows user to select a edit a function name and push
changes to IDA
allows user to select a edit a function name and push
changes to IDA
@param action: QAction*
@param action: QAction*
"""
_, item, model_index = action.data()
@@ -216,12 +216,12 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
item.setIsEditable(False)
def slot_custom_context_menu_requested(self, pos):
""" slot connected to custom context menu request
"""slot connected to custom context menu request
displays custom context menu to user containing action
relevant to the data item selected
displays custom context menu to user containing action
relevant to the data item selected
@param pos: TODO
@param pos: TODO
"""
model_index = self.indexAt(pos)
@@ -243,9 +243,9 @@ class CapaExplorerQtreeView(QtWidgets.QTreeView):
self.show_custom_context_menu(menu, pos)
def slot_double_click(self, model_index):
""" slot connected to double click event
"""slot connected to double click event
@param model_index: QModelIndex*
@param model_index: QModelIndex*
"""
if not model_index.isValid():
return

View File

@@ -102,6 +102,9 @@ def collect_metadata():
"sha256": sha256,
"path": idaapi.get_input_file_path(),
},
"analysis": {"format": idaapi.get_file_type_name(), "extractor": "ida",},
"analysis": {
"format": idaapi.get_file_type_name(),
"extractor": "ida",
},
"version": capa.version.__version__,
}

View File

@@ -30,10 +30,10 @@ logger = logging.getLogger("capa")
class CapaExplorerIdaHooks(idaapi.UI_Hooks):
def __init__(self, screen_ea_changed_hook, action_hooks):
""" facilitate IDA UI hooks
"""facilitate IDA UI hooks
@param screen_ea_changed_hook: function hook for IDA screen ea changed
@param action_hooks: dict of IDA action handles
@param screen_ea_changed_hook: function hook for IDA screen ea changed
@param action_hooks: dict of IDA action handles
"""
super(CapaExplorerIdaHooks, self).__init__()
@@ -43,11 +43,11 @@ class CapaExplorerIdaHooks(idaapi.UI_Hooks):
self.process_action_meta = {}
def preprocess_action(self, name):
""" called prior to action completed
"""called prior to action completed
@param name: name of action defined by idagui.cfg
@param name: name of action defined by idagui.cfg
@retval must be 0
@retval must be 0
"""
self.process_action_handle = self.process_action_hooks.get(name, None)
@@ -66,10 +66,10 @@ class CapaExplorerIdaHooks(idaapi.UI_Hooks):
self.reset()
def screen_ea_changed(self, curr_ea, prev_ea):
""" called after screen location is changed
"""called after screen location is changed
@param curr_ea: current location
@param prev_ea: prev location
@param curr_ea: current location
@param prev_ea: prev location
"""
self.screen_ea_changed_hook(idaapi.get_current_widget(), curr_ea, prev_ea)
@@ -300,13 +300,13 @@ class CapaExplorerForm(idaapi.PluginForm):
self.ida_hooks.unhook()
def ida_hook_rename(self, meta, post=False):
""" hook for IDA rename action
"""hook for IDA rename action
called twice, once before action and once after
action completes
called twice, once before action and once after
action completes
@param meta: metadata cache
@param post: indicates pre or post action
@param meta: metadata cache
@param post: indicates pre or post action
"""
location = idaapi.get_screen_ea()
if not location or not capa.ida.helpers.is_func_start(location):
@@ -322,37 +322,27 @@ class CapaExplorerForm(idaapi.PluginForm):
meta["prev_name"] = curr_name
def ida_hook_screen_ea_changed(self, widget, new_ea, old_ea):
""" hook for IDA screen ea changed
"""hook for IDA screen ea changed
@param widget: IDA widget type
@param new_ea: destination ea
@param old_ea: source ea
"""
this hook is currently only relevant for limiting results displayed in the UI
@param widget: IDA widget type
@param new_ea: destination ea
@param old_ea: source ea
"""
if not self.view_limit_results_by_function.isChecked():
# ignore if checkbox not selected
# ignore if limit checkbox not selected
return
if idaapi.get_widget_type(widget) != idaapi.BWN_DISASM:
# ignore views other than asm
# ignore views not the assembly view
return
# attempt to map virtual addresses to function start addresses
new_func_start = capa.ida.helpers.get_func_start_ea(new_ea)
old_func_start = capa.ida.helpers.get_func_start_ea(old_ea)
if new_func_start and new_func_start == old_func_start:
# navigated within the same function - do nothing
if idaapi.get_func(new_ea) == idaapi.get_func(old_ea):
# user navigated same function - ignore
return
if new_func_start:
# navigated to new function - filter for function start virtual address
match = capa.ida.explorer.item.location_to_hex(new_func_start)
else:
# navigated to virtual address not in valid function - clear filter
match = ""
# filter on virtual address to avoid updating filter string if function name is changed
self.model_proxy.add_single_string_filter(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS, match)
self.limit_results_to_function(idaapi.get_func(new_ea))
self.view_tree.resize_columns_to_content()
def load_capa_results(self):
@@ -508,9 +498,9 @@ class CapaExplorerForm(idaapi.PluginForm):
idaapi.info("%s reload completed." % PLUGIN_NAME)
def reset(self):
""" reset UI elements
"""reset UI elements
e.g. checkboxes and IDA highlighting
e.g. checkboxes and IDA highlighting
"""
self.ida_reset()
@@ -518,31 +508,39 @@ class CapaExplorerForm(idaapi.PluginForm):
idaapi.info("%s reset completed." % PLUGIN_NAME)
def slot_menu_bar_hovered(self, action):
""" display menu action tooltip
"""display menu action tooltip
@param action: QtWidgets.QAction*
@param action: QtWidgets.QAction*
@reference: https://stackoverflow.com/questions/21725119/why-wont-qtooltips-appear-on-qactions-within-a-qmenu
@reference: https://stackoverflow.com/questions/21725119/why-wont-qtooltips-appear-on-qactions-within-a-qmenu
"""
QtWidgets.QToolTip.showText(
QtGui.QCursor.pos(), action.toolTip(), self.view_menu_bar, self.view_menu_bar.actionGeometry(action)
)
def slot_checkbox_limit_by_changed(self):
""" slot activated if checkbox clicked
"""slot activated if checkbox clicked
if checked, configure function filter if screen location is located
in function, otherwise clear filter
if checked, configure function filter if screen location is located
in function, otherwise clear filter
"""
match = ""
if self.view_limit_results_by_function.isChecked():
location = capa.ida.helpers.get_func_start_ea(idaapi.get_screen_ea())
if location:
match = capa.ida.explorer.item.location_to_hex(location)
self.limit_results_to_function(idaapi.get_func(idaapi.get_screen_ea()))
else:
self.model_proxy.reset_address_range_filter()
self.model_proxy.add_single_string_filter(CapaExplorerDataModel.COLUMN_INDEX_VIRTUAL_ADDRESS, match)
self.view_tree.reset()
self.view_tree.resize_columns_to_content()
def limit_results_to_function(self, f):
"""add filter to limit results to current function
@param f: (IDA func_t)
"""
if f:
self.model_proxy.add_address_range_filter(f.start_ea, f.end_ea)
else:
# if function not exists don't display any results (address should not be -1)
self.model_proxy.add_address_range_filter(-1, -1)
def main():

View File

@@ -32,7 +32,7 @@ import capa.features.extractors
from capa.helpers import oint, get_file_taste
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SUPPORTED_FILE_MAGIC = set([b"MZ"])
SUPPORTED_FILE_MAGIC = set(["MZ"])
logger = logging.getLogger("capa")
@@ -105,7 +105,12 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
all_function_matches = collections.defaultdict(list)
all_bb_matches = collections.defaultdict(list)
meta = {"feature_counts": {"file": 0, "functions": {},}}
meta = {
"feature_counts": {
"file": 0,
"functions": {},
}
}
for f in tqdm.tqdm(list(extractor.get_functions()), disable=disable_progress, desc="matching", unit=" functions"):
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
@@ -290,24 +295,7 @@ class UnsupportedRuntimeError(RuntimeError):
def get_extractor_py3(path, format, disable_progress=False):
try:
import lancelot
import capa.features.extractors.lancelot
except ImportError:
logger.warning("lancelot not installed")
raise UnsupportedRuntimeError()
if format not in ("pe", "auto"):
raise UnsupportedFormatError(format)
if not is_supported_file_type(path):
raise UnsupportedFormatError()
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
raise UnsupportedRuntimeError()
def get_extractor(path, format, disable_progress=False):

View File

@@ -16,15 +16,15 @@ import capa.engine
def convert_statement_to_result_document(statement):
"""
"statement": {
"type": "or"
},
"statement": {
"type": "or"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"""
statement_type = statement.name.lower()
result = {"type": statement_type}
@@ -47,28 +47,28 @@ def convert_statement_to_result_document(statement):
def convert_feature_to_result_document(feature):
"""
"feature": {
"number": 6,
"type": "number"
},
"feature": {
"number": 6,
"type": "number"
},
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"""
result = {"type": feature.name, feature.name: feature.get_value_str()}
if feature.description:
@@ -80,15 +80,15 @@ def convert_feature_to_result_document(feature):
def convert_node_to_result_document(node):
"""
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"""
if isinstance(node, capa.engine.Statement):
@@ -152,7 +152,10 @@ def convert_match_to_result_document(rules, capabilities, result):
scope = rule.meta["scope"]
doc["node"] = {
"type": "statement",
"statement": {"type": "subscope", "subscope": scope,},
"statement": {
"type": "subscope",
"subscope": scope,
},
}
for location in doc["locations"]:
@@ -257,5 +260,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
def render_json(meta, rules, capabilities):
return json.dumps(
convert_capabilities_to_result_document(meta, rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
convert_capabilities_to_result_document(meta, rules, capabilities),
cls=CapaJsonObjectEncoder,
sort_keys=True,
)

View File

@@ -109,7 +109,12 @@ def render_attack(doc, ostream):
inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id))
else:
raise RuntimeError("unexpected ATT&CK spec format")
rows.append((rutils.bold(tactic.upper()), "\n".join(inner_rows),))
rows.append(
(
rutils.bold(tactic.upper()),
"\n".join(inner_rows),
)
)
if rows:
ostream.write(

View File

@@ -262,7 +262,7 @@ def parse_description(s, value_type, description=None):
raise InvalidRule(
"unexpected bytes value: byte sequences must be no larger than %s bytes" % MAX_BYTES_FEATURE_SIZE
)
elif value_type in {"number", "offset"}:
elif value_type in ("number", "offset") or value_type.startswith(("number/", "offset/")):
try:
value = parse_int(value)
except ValueError:

View File

@@ -1,47 +0,0 @@
import sys
import logging
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
logger = logging.getLogger(__name__)
class NotPackedError(ValueError):
def __init__(self):
super(NotPackedError, self).__init__("not packed")
def can_unpack():
# the unpacking backend is based on Speakeasy, which supports python 3.6+
return sys.version_info >= (3, 6)
@lru_cache
def get_unpackers():
# break import loop
import capa.unpack.aspack
return {p.name: p for p in [capa.unpack.aspack.AspackUnpacker]}
def detect_packer(buf):
for unpacker in get_unpackers().values():
if unpacker.is_packed(buf):
return unpacker.name
raise NotPackedError()
def is_packed(buf):
try:
detect_packer(buf)
return True
except NotPackedError:
return False
def unpack_pe(packer, buf):
return get_unpackers()[packer].unpack_pe(buf)

View File

@@ -1,459 +0,0 @@
import io
import struct
import logging
import contextlib
import collections
import pefile
import speakeasy
import speakeasy.common as se_common
import speakeasy.profiler
import speakeasy.windows.objman
logger = logging.getLogger(__name__)
def pefile_get_section_by_name(pe, section_name):
for section in pe.sections:
try:
if section.Name.partition(b"\x00")[0].decode("ascii") == section_name:
return section
except:
continue
raise ValueError("section not found")
def prepare_emu_context(se, module):
"""
prepare an Speakeasy instance for emulating the given module, without running it.
this is useful when planning to manually control the emulator,
such as via `Speakeasy.emu.emu_eng.start(...)`.
typically, Speakeasy expects to do "Run based" analysis,
which doesn't give us too much control.
much of this was derived from win32::Win32Emulator::run_module.
hopefully this can eventually be merged into Speakeasy.
args:
se (speakeasy.Speakeasy): the instance to prepare
module (speakeasy.Module): the module that will be emulated
"""
se._init_hooks()
main_exe = None
if not module.is_exe():
container = se.emu.init_container_process()
if container:
se.emu.processes.append(container)
se.emu.curr_process = container
else:
main_exe = module
if main_exe:
se.emu.user_modules = [main_exe] + se.emu.user_modules
# Create an empty process object for the module if none is supplied
if len(se.emu.processes) == 0:
p = speakeasy.windows.objman.Process(se.emu, path=module.get_emu_path(), base=module.base, pe=module)
se.emu.curr_process = p
t = speakeasy.windows.objman.Thread(se.emu, stack_base=se.emu.stack_base, stack_commit=module.stack_commit)
se.emu.om.objects.update({t.address: t})
se.emu.curr_process.threads.append(t)
se.emu.curr_thread = t
peb = se.emu.alloc_peb(se.emu.curr_process)
se.emu.init_teb(t, peb)
INSN_PUSHA = 0x60
INSN_POPA = 0x61
class AspackUnpacker(speakeasy.Speakeasy):
name = "aspack"
def __init__(self, buf, debug=False):
super(AspackUnpacker, self).__init__(debug=debug)
self.module = self.load_module(data=buf)
prepare_emu_context(self, self.module)
@staticmethod
def detect_aspack(buf):
"""
return True if the given buffer contains an ASPack'd PE file.
we detect aspack by looking at the section names for .aspack.
the unpacking routine contains further validation and will raise an exception if necessary.
args:
buf (bytes): the contents of a PE file.
returns: bool
"""
try:
pe = pefile.PE(data=buf, fast_load=True)
except:
return False
try:
pefile_get_section_by_name(pe, ".aspack")
except ValueError:
pass
else:
return True
return False
@classmethod
def unpack_pe(cls, buf):
"""
unpack the given buffer that contains an ASPack'd PE file.
return the contents of a reconstructed PE file.
args:
buf (bytes): the contents of an ASPack'd PE file.
returns: bytes
"""
unpacker = cls(buf)
return unpacker.unpack()
def stepi(self):
self.emu.emu_eng.start(self.emu.get_pc(), count=1)
def remove_hook(self, hook_type, hook_handle):
# TODO: this should be part of speakeasy
self.emu.hooks[hook_type].remove(hook_handle)
self.emu.emu_eng.hook_remove(hook_handle.handle)
def remove_mem_read_hook(self, hook_handle):
# TODO: this should be part of speakeasy
self.remove_hook(se_common.HOOK_MEM_READ, hook_handle)
@contextlib.contextmanager
def mem_read_hook(self, hook):
"""
context manager for temporarily installing a hook on the emulator.
example:
with self.mem_read_hook(lambda emu, access, addr, size, ctx: emu.stop()):
self.emu.emu_eng.start(0x401000)
args:
hook (speakeasy.common.MemReadHook): the hook to install
"""
handle = self.add_mem_read_hook(hook)
# if this fails, then there's still an unfixed bug in Speakeasy
assert handle.handle != 0
try:
yield
finally:
self.remove_mem_read_hook(handle)
def remove_code_hook(self, hook_handle):
# TODO: this should be part of speakeasy
self.remove_hook(se_common.HOOK_CODE, hook_handle)
@contextlib.contextmanager
def code_hook(self, hook):
"""
context manager for temporarily installing a hook on the emulator.
example:
with self.code_hook(lambda emu, addr, size, ctx: emu.stop()):
self.emu.emu_eng.start(0x401000)
args:
hook (speakeasy.common.CodeHook): the hook to install
"""
handle = self.add_code_hook(hook)
assert handle.handle != 0
try:
yield
finally:
self.remove_code_hook(handle)
def read_ptr(self, va):
endian = "little"
val = self.mem_read(va, self.emu.ptr_size)
return int.from_bytes(val, endian)
def dump(self):
"""
emulate the loaded module, pausing after an appropriate section hop.
then, dump and return the module's memory and OEP.
this routine is specific to aspack. it makes the following assumptions:
- aspack starts with a PUSHA to save off the CPU context
- aspeck then runs its unpacking stub
- aspeck executes POPA to restore the CPU context
- aspack section hops to the OEP
we'll emulate in a few phases:
1. single step over PUSHA at the entrypoint
2. extract the address of the saved CPU context
3. emulate until the saved CPU context is read
4. assert this is a POPA instruction
5. emulate until a section hop
6. profit!
return the module's memory segment and the OEP.
returns: Tuple[byte, int]
"""
# prime the emulator.
# this is derived from winemu::WindowsEmulator::start()
self.emu.curr_run = speakeasy.profiler.Run()
self.emu.curr_mod = self.module
self.emu.set_hooks()
self.emu._set_emu_hooks()
# 0. sanity checking: assert entrypoint is a PUSHA instruction
entrypoint = self.module.base + self.module.ep
opcode = self.emu.mem_read(entrypoint, 1)[0]
if opcode != INSN_PUSHA:
raise ValueError("not packed with supported ASPack")
# 1. single step over PUSHA
self.emu.set_pc(entrypoint)
self.stepi()
# 2. extract address of saved CPU context
saved_cpu_context = self.emu.get_stack_ptr()
# 3. emulate until saved CPU context is accessed
def until_read(target):
"""return a mem_read hook that stops the emulator when an address is read."""
def inner(emu, _access, addr, _size, _value, _ctx):
if addr == target:
emu.stop()
return True
return inner
with self.mem_read_hook(until_read(saved_cpu_context)):
self.emu.emu_eng.start(self.emu.get_pc())
# 4. assert this is a POPA instruction
opcode = self.emu.mem_read(self.emu.get_pc(), 1)[0]
if opcode != INSN_POPA:
raise ValueError("not packed with supported ASPack")
logger.debug("POPA: 0x%x", self.emu.get_pc())
# 5. emulate until a section hop
aspack_section = self.module.get_section_by_name(".aspack")
start = self.module.base + aspack_section.VirtualAddress
end = start + aspack_section.Misc_VirtualSize
def until_section_hop(start, end):
def inner(emu, addr, _size, _ctx):
if addr < start or addr >= end:
emu.stop()
return True
return inner
with self.code_hook(until_section_hop(start, end)):
self.emu.emu_eng.start(self.emu.get_pc())
# 6. dump and return
oep = self.emu.get_pc()
logger.debug("OEP: 0x%x", oep)
mm = self.get_address_map(self.module.base)
buf = self.mem_read(mm.base, mm.size)
return buf, oep
def fixup(self, buf, oep):
"""
fixup a PE image that's been dumped from memory after unpacking aspack.
there are two big fixes that need to happen:
1. update the section pointers and sizes
2. rebuild the import table
for (1) updating the section pointers, we'll just update the
physical pointers to match the virtual pointers, since this is a loaded image.
for (2) rebuilding the import table, we'll:
(a) inspect the emulation results for resolved imports, which tells us dll/symbol names
(b) scan the dumped image for the unpacked import thunks (Import Address Table/Thunk Table)
(c) match the import thunks with resolved imports
(d) build the import table structures
(e) write the reconstructed table into the .aspack section
since the .aspack section contains the unpacking stub, which is no longer used,
then we'll write the reconstructed IAT there. hopefully its big enough.
"""
pe = pefile.PE(data=buf)
pe.OPTIONAL_HEADER.AddressOfEntryPoint = oep - self.module.base
# 1. update section pointers and sizes.
for section in pe.sections:
section.PointerToRawData = section.VirtualAddress
section.SizeOfRawData = section.Misc_VirtualSize
# 2. rebuild the import table
# place the reconstructed import table in the .aspack section (unpacking stub)
reconstruction_target = pefile_get_section_by_name(pe, ".aspack").VirtualAddress
# mapping from import pointer to (dll name, symbol name).
# the import pointer is generated by speakeasy and is not mapped.
# it often looks something like 0xfeedf008.
# as we encounter pointers with values like this, we can resolve the symbol.
imports = {}
# 2a. find resolved imports
for addr, (dll, sym) in self.module.import_table.items():
# these are items in the original import table.
logger.debug(f"found static import {dll}.{sym}")
imports[addr] = (dll, sym)
for (addr, dll, sym) in self.emu.dyn_imps:
# these are imports that have been resolved at runtime by the unpacking stub.
logger.debug(f"found dynamic import {dll}.{sym}")
imports[addr] = (dll, sym)
# 2b. find the existing thunk tables
# these are pointer-aligned tables of import pointers.
# in my test sample, its found at the start of the first section.
# ordered list of tuples (VA, import pointer)
# look up the symbol using the import pointer and the `imports` mapping.
thunks = []
# scan from the start of the first section
# until we reach values that don't look like thunk tables.
for va in range(pe.sections[0].VirtualAddress + self.module.base, 0xFFFFFFFFFFFFFFFF, self.emu.ptr_size):
ptr = self.read_ptr(va)
if ptr == 0:
# probably padding/terminating entry
continue
if ptr in imports:
thunks.append((va, ptr,))
logger.debug(f"found import thunk at {va:08x} to {ptr:08x} for {imports[ptr][0]}\t{imports[ptr][1]}")
continue
# otherwise, at the end of the thunk tables
break
# collect the thunk entries into contiguous tables, grouped by dll name.
#
# list of thunk tuples that are contiguous and have the same dll name:
# (VA, import pointer, dll name, symbol name)
curr_idt_table = []
# list of list of thunk tuples, like above
idt_tables = []
for thunk in thunks:
va, imp = thunk
dll, sym = imports[imp]
if not curr_idt_table:
curr_idt_table.append((va, imp, dll, sym))
elif curr_idt_table[0][2] == dll:
curr_idt_table.append((va, imp, dll, sym))
else:
idt_tables.append(curr_idt_table)
curr_idt_table = [(va, imp, dll, sym)]
idt_tables.append(curr_idt_table)
# 2d. build the import table structures
# mapping from the data identifier to its RVA (which will be found within the reconstruction blob)
locations = {}
# the raw bytes of the reconstructed import structures.
# it will have the following layout:
# 1. DLL name strings and Hint/Name table entries
# 2. Import Lookup Tables (points into (1))
# 3. Import Directory Tables (points into (1), (2), and original Thunk Tables)
reconstruction = io.BytesIO()
# list of dll names
dlls = list(sorted(set(map(lambda pair: pair[0], imports.values()))))
# mapping from dll name to list of symbols
symbols = collections.defaultdict(set)
for dll, sym in imports.values():
symbols[dll].add(sym)
# emit strings into the reconstruction blob
for dll in dlls:
locations[("dll", dll)] = reconstruction_target + reconstruction.tell()
reconstruction.write(dll.encode("ascii") + b"\x00")
if reconstruction.tell() % 2 == 1:
# padding
reconstruction.write(b"\x00")
for sym in sorted(symbols[dll]):
locations[("hint", dll, sym)] = reconstruction_target + reconstruction.tell()
# export name pointer table hint == 0
reconstruction.write(b"\x00\x00")
# name
reconstruction.write(sym.encode("ascii") + b"\x00")
if reconstruction.tell() % 2 == 1:
# padding
reconstruction.write(b"\x00")
# emit Import Lookup Tables for each recovered thunk table
ptr_format = "<I" if self.emu.ptr_size == 4 else "<Q"
for i, idt_entry in enumerate(idt_tables):
locations[("import lookup table", i)] = reconstruction_target + reconstruction.tell()
for (va, imp, dll, sym) in idt_entry:
reconstruction.write(struct.pack(ptr_format, locations[("hint", dll, sym)]))
reconstruction.write(b"\x00" * 8)
# emit Import Descriptor Tables for each recovered thunk table
IDT_ENTRY_SIZE = 0x20
for i, idt_entry in enumerate(idt_tables):
va, _, dll, _ = idt_entry[0]
rva = va - self.module.base
locations[("import descriptor table", i)] = reconstruction_target + reconstruction.tell()
# import lookup table rva
reconstruction.write(struct.pack("<I", locations[("import lookup table", i)]))
# date stamp
reconstruction.write(struct.pack("<I", 0x0))
# forwarder chain
reconstruction.write(struct.pack("<I", 0x0))
# name rva
reconstruction.write(struct.pack("<I", locations[("dll", dll)]))
# import address table rva
reconstruction.write(struct.pack("<I", rva))
# empty last entry
reconstruction.write(b"\x00" * IDT_ENTRY_SIZE)
# if the reconstructed import structures are larger than the unpacking stub...
# i'm not sure what we'll do. probably need to add a section.
assert len(reconstruction.getvalue()) <= pefile_get_section_by_name(pe, ".aspack").Misc_VirtualSize
pe.set_bytes_at_rva(reconstruction_target, reconstruction.getvalue())
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = locations[("import descriptor table", 0)]
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = IDT_ENTRY_SIZE * len(idt_tables)
return pe.write()
def unpack(self):
buf, oep = self.dump()
buf = self.fixup(buf, oep)
return buf
if __name__ == "__main__":
import sys
input = sys.argv[1]
output = sys.argv[1]
with open(sys.argv[1], "rb") as f:
buf = f.read()
with open(sys.argv[2], "wb") as f:
f.write(AspackUnpacker.unpack_pe(buf))

View File

@@ -1 +1 @@
__version__ = "1.1.0"
__version__ = "1.2.0"

2
rules

Submodule rules updated: 7ae5ae215f...fab22deeb1

View File

@@ -13,8 +13,7 @@ It will mark up functions with their capa matches, like:
UninstallService proc near
...
To use, invoke from the Binary Ninja Tools menu, or from the
command-palette.
To use, invoke from the Binary Ninja Tools menu, or from the command-palette.
Adapted for Binary Ninja by @psifertex
@@ -31,7 +30,7 @@ from binaryninja import *
def append_func_cmt(bv, va, cmt):
"""
add the given comment to the given function,
add the given comment to the given function,
if it doesn't already exist.
"""
func = bv.get_function_at(va)

View File

@@ -41,7 +41,7 @@ logger = logging.getLogger("capa")
def append_func_cmt(va, cmt, repeatable=False):
"""
add the given comment to the given function,
add the given comment to the given function,
if it doesn't already exist.
"""
func = ida_funcs.get_func(va)

View File

@@ -399,7 +399,11 @@ def lint_rule(ctx, rule):
print("")
print(
"%s%s %s"
% (" (nursery) " if is_nursery_rule(rule) else "", rule.name, ("(%s)" % category) if category else "",)
% (
" (nursery) " if is_nursery_rule(rule) else "",
rule.name,
("(%s)" % category) if category else "",
)
)
level = "WARN" if is_nursery_rule(rule) else "FAIL"
@@ -407,7 +411,12 @@ def lint_rule(ctx, rule):
for violation in violations:
print(
"%s %s: %s: %s"
% (" " if is_nursery_rule(rule) else "", level, violation.name, violation.recommendation,)
% (
" " if is_nursery_rule(rule) else "",
level,
violation.name,
violation.recommendation,
)
)
elif len(violations) == 0 and is_nursery_rule(rule):
@@ -487,7 +496,9 @@ def main(argv=None):
parser.add_argument("rules", type=str, help="Path to rules")
parser.add_argument("--samples", type=str, default=samples_path, help="Path to samples")
parser.add_argument(
"--thorough", action="store_true", help="Enable thorough linting - takes more time, but does a better job",
"--thorough",
action="store_true",
help="Enable thorough linting - takes more time, but does a better job",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")

View File

@@ -71,22 +71,22 @@ logger = logging.getLogger("capa.show-capabilities-by-function")
def render_matches_by_function(doc):
"""
like:
like:
function at 0x1000321a with 33 features:
- get hostname
- initialize Winsock library
function at 0x10003286 with 63 features:
- create thread
- terminate thread
function at 0x10003415 with 116 features:
- write file
- send data
- link function at runtime
- create HTTP request
- get common file path
- send HTTP request
- connect to HTTP server
function at 0x1000321a with 33 features:
- get hostname
- initialize Winsock library
function at 0x10003286 with 63 features:
- create thread
- terminate thread
function at 0x10003415 with 116 features:
- write file
- send data
- link function at runtime
- create HTTP request
- get common file path
- send HTTP request
- connect to HTTP server
"""
ostream = rutils.StringIO()

View File

@@ -17,10 +17,9 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r
if sys.version_info >= (3, 0):
# py3
requirements.append("networkx")
requirements.append("pylancelot~=0.3.6")
else:
# py2
requirements.append("enum34")
requirements.append("enum34==1.1.6") # v1.1.6 is needed by halo 0.0.30 / spinners 0.0.24
requirements.append("vivisect @ https://github.com/williballenthin/vivisect/tarball/v0.0.20200804#egg=vivisect")
requirements.append("viv-utils")
requirements.append("networkx==2.2") # v2.2 is last version supported by Python 2.7
@@ -43,7 +42,11 @@ setuptools.setup(
url="https://www.github.com/fireeye/capa",
packages=setuptools.find_packages(exclude=["tests"]),
package_dir={"capa": "capa"},
entry_points={"console_scripts": ["capa=capa.main:main",]},
entry_points={
"console_scripts": [
"capa=capa.main:main",
]
},
include_package_data=True,
install_requires=requirements,
extras_require={
@@ -55,7 +58,7 @@ setuptools.setup(
"pycodestyle",
"black ; python_version>'3.0'",
"isort",
],
]
},
zip_safe=False,
keywords="capa",

View File

@@ -80,16 +80,6 @@ def get_viv_extractor(path):
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
@lru_cache
def get_lancelot_extractor(path):
import capa.features.extractors.lancelot
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
@lru_cache()
def extract_file_features(extractor):
features = collections.defaultdict(set)
@@ -150,8 +140,6 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_")
elif name.startswith("c9188"):
return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_")
elif name == "aspack":
return os.path.join(CD, "data", "2055994ff75b4309eee3a49c5749d306")
else:
raise ValueError("unexpected sample fixture")
@@ -352,10 +340,20 @@ FEATURE_PRESENCE_TESTS = [
("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), True),
("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True),
# insn/api: x64
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True,),
(
"kernel32-64",
"function=0x180001010",
capa.features.insn.API("RtlVirtualUnwind"),
True,
),
("kernel32-64", "function=0x180001010", capa.features.insn.API("RtlVirtualUnwind"), True),
# insn/api: x64 thunk
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True,),
(
"kernel32-64",
"function=0x1800202B0",
capa.features.insn.API("RtlCaptureContext"),
True,
),
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True),
# insn/api: resolve indirect calls
("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), True),
@@ -439,7 +437,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
def get_extractor(path):
if sys.version_info >= (3, 0):
extractor = get_lancelot_extractor(path)
raise RuntimeError("no supported py3 backends yet")
else:
extractor = get_viv_extractor(path)
@@ -506,8 +504,3 @@ def z499c2_extractor():
@pytest.fixture
def al_khaser_x86_extractor():
return get_extractor(get_data_path_by_name("al-khaser x86"))
@pytest.fixture
def aspack_extractor():
return get_extractor(get_data_path_by_name("aspack"))

View File

@@ -59,7 +59,13 @@ def test_some():
)
assert (
Some(2, [Number(1), Number(2), Number(3)]).evaluate(
{Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}, Number(4): {1},}
{
Number(0): {1},
Number(1): {1},
Number(2): {1},
Number(3): {1},
Number(4): {1},
}
)
== True
)
@@ -258,7 +264,9 @@ def test_match_matched_rules():
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule1") in features
assert capa.features.MatchedRule("test rule2") in features
@@ -266,7 +274,9 @@ def test_match_matched_rules():
# the ordering of the rules must not matter,
# the engine should match rules in an appropriate order.
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(reversed(rules)), {capa.features.insn.Number(100): {1}}, 0x0,
capa.engine.topologically_order_rules(reversed(rules)),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule1") in features
assert capa.features.MatchedRule("test rule2") in features
@@ -312,22 +322,30 @@ def test_regex():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("aaaa"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("aaaa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("aBBBBa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("abbbba"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("abbbba"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.MatchedRule("rule with implied wildcards") in features
@@ -350,7 +368,9 @@ def test_regex_ignorecase():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("aBBBBa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
@@ -429,7 +449,9 @@ def test_match_namespace():
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.insn.API("CreateFile"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.insn.API("CreateFile"): {1}},
0x0,
)
assert "CreateFile API" in matches
assert "file-create" in matches
@@ -439,7 +461,9 @@ def test_match_namespace():
assert capa.features.MatchedRule("file/create/CreateFile") in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.insn.API("WriteFile"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.insn.API("WriteFile"): {1}},
0x0,
)
assert "WriteFile API" in matches
assert "file-create" not in matches

View File

@@ -21,13 +21,19 @@ import capa.features.extractors
EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
{
"base address": 0x401000,
"file features": [(0x402345, capa.features.Characteristic("embedded pe")),],
"file features": [
(0x402345, capa.features.Characteristic("embedded pe")),
],
"functions": {
0x401000: {
"features": [(0x401000, capa.features.Characteristic("indirect call")),],
"features": [
(0x401000, capa.features.Characteristic("indirect call")),
],
"basic blocks": {
0x401000: {
"features": [(0x401000, capa.features.Characteristic("tight loop")),],
"features": [
(0x401000, capa.features.Characteristic("tight loop")),
],
"instructions": {
0x401000: {
"features": [
@@ -35,7 +41,11 @@ EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
(0x401000, capa.features.Characteristic("nzxor")),
],
},
0x401002: {"features": [(0x401002, capa.features.insn.Mnemonic("mov")),],},
0x401002: {
"features": [
(0x401002, capa.features.insn.Mnemonic("mov")),
],
},
},
},
},

View File

@@ -1,104 +1,104 @@
# run this script from within IDA with ./tests/data/mimikatz.exe open
import sys
import logging
import os.path
import binascii
import traceback
import pytest
try:
sys.path.append(os.path.dirname(__file__))
from fixtures import *
finally:
sys.path.pop()
logger = logging.getLogger("test_ida_features")
def check_input_file(wanted):
import idautils
# some versions (7.4) of IDA return a truncated version of the MD5.
# https://github.com/idapython/bin/issues/11
try:
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
except UnicodeDecodeError:
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
# rather than the hex digest
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
if not wanted.startswith(found):
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida
return capa.features.extractors.ida.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
if __name__ == "__main__":
print("-" * 80)
# invoke all functions in this module that start with `test_`
for name in dir(sys.modules[__name__]):
if not name.startswith("test_"):
continue
test = getattr(sys.modules[__name__], name)
logger.debug("invoking test: %s", name)
sys.stderr.flush()
test()
print("DONE")
# run this script from within IDA with ./tests/data/mimikatz.exe open
import sys
import logging
import os.path
import binascii
import traceback
import pytest
try:
sys.path.append(os.path.dirname(__file__))
from fixtures import *
finally:
sys.path.pop()
logger = logging.getLogger("test_ida_features")
def check_input_file(wanted):
import idautils
# some versions (7.4) of IDA return a truncated version of the MD5.
# https://github.com/idapython/bin/issues/11
try:
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
except UnicodeDecodeError:
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
# rather than the hex digest
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
if not wanted.startswith(found):
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida
return capa.features.extractors.ida.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
if __name__ == "__main__":
print("-" * 80)
# invoke all functions in this module that start with `test_`
for name in dir(sys.modules[__name__]):
if not name.startswith("test_"):
continue
test = getattr(sys.modules[__name__], name)
logger.debug("invoking test: %s", name)
sys.stderr.flush()
test()
print("DONE")

View File

@@ -1,26 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from fixtures import *
@parametrize(
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_features(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected)
@parametrize(
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_feature_counts(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected)

View File

@@ -44,10 +44,20 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
path = z9324d_extractor.path
rule_file = tmpdir.mkdir("capa").join("rule.yml")
rule_file.write(RULE_CONTENT)
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
assert (
capa.main.main(
[
path,
"-v",
"-r",
rule_file.strpath,
]
)
== 0
)
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces")
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
def test_main_shellcode(z499c2_extractor):
path = z499c2_extractor.path
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0

View File

@@ -483,6 +483,21 @@ def test_number_arch():
assert r.evaluate({Number(2, arch=ARCH_X64): {1}}) == False
def test_number_arch_symbol():
r = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- number/x32: 2 = some constant
"""
)
)
assert r.evaluate({Number(2, arch=ARCH_X32, description="some constant"): {1}}) == True
def test_offset_symbol():
rule = textwrap.dedent(
"""
@@ -546,6 +561,21 @@ def test_offset_arch():
assert r.evaluate({Offset(2, arch=ARCH_X64): {1}}) == False
def test_offset_arch_symbol():
r = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- offset/x32: 2 = some constant
"""
)
)
assert r.evaluate({Offset(2, arch=ARCH_X32, description="some constant"): {1}}) == True
def test_invalid_offset():
with pytest.raises(capa.rules.InvalidRule):
r = capa.rules.Rule.from_yaml(
@@ -650,12 +680,16 @@ def test_regex_values_always_string():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("123"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("123"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules), {capa.features.String("0x123"): {1}}, 0x0,
capa.engine.topologically_order_rules(rules),
{capa.features.String("0x123"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features

View File

@@ -1,62 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import pefile
import pytest
from fixtures import *
import capa.unpack
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_is_packed(aspack_extractor):
path = aspack_extractor.path
with open(path, "rb") as f:
buf = f.read()
assert capa.unpack.is_packed(buf) is True
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_detect(aspack_extractor):
path = aspack_extractor.path
with open(path, "rb") as f:
buf = f.read()
assert capa.unpack.detect_packer(buf) == "aspack"
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_unpack(aspack_extractor):
with open(aspack_extractor.path, "rb") as f:
buf = f.read()
unpacked = capa.unpack.unpack_pe("aspack", buf)
pe = pefile.PE(data=unpacked)
assert pe.OPTIONAL_HEADER.ImageBase == 0x4AD00000
assert pe.OPTIONAL_HEADER.AddressOfEntryPoint == 0x1A610
assert b"This program cannot be run in DOS mode" in unpacked
assert "(C) Copyright 1985-2000 Microsoft Corp.".encode("utf-16le") in unpacked
assert "CMD.EXE has halted. %0".encode("utf-16le") in unpacked
dlls = set([])
syms = set([])
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dlls.add(entry.dll.decode("ascii").lower().partition(".")[0])
for imp in entry.imports:
syms.add(imp.name.decode("ascii"))
assert dlls == {"advapi32", "kernel32", "msvcrt", "user32"}
assert "RegQueryValueExW" in syms
assert "WriteConsoleW" in syms
assert "realloc" in syms
assert "GetProcessWindowStation" in syms

View File

@@ -11,7 +11,9 @@ from fixtures import *
@parametrize(
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
"sample,scope,feature,expected",
FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_viv_features(sample, scope, feature, expected):
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):
@@ -19,7 +21,9 @@ def test_viv_features(sample, scope, feature, expected):
@parametrize(
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
"sample,scope,feature,expected",
FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_viv_feature_counts(sample, scope, feature, expected):
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):