mirror of
https://github.com/mandiant/capa.git
synced 2025-12-13 08:00:44 -08:00
Compare commits
173 Commits
v1.4.1
...
backend-mi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3831f1c104 | ||
|
|
dc828e82b3 | ||
|
|
2e98ba990c | ||
|
|
d008fef23f | ||
|
|
fe458c387a | ||
|
|
3e52c7de23 | ||
|
|
2d1e7946e3 | ||
|
|
f2fe173ef3 | ||
|
|
b2fc52d390 | ||
|
|
5ba4629c3c | ||
|
|
4fc9c77791 | ||
|
|
31ba9ee1b3 | ||
|
|
b4a808ac76 | ||
|
|
0f030115d1 | ||
|
|
42573d8df2 | ||
|
|
073c2b5754 | ||
|
|
ef41d74b82 | ||
|
|
84b3f38810 | ||
|
|
2288f38a11 | ||
|
|
dbc4e06657 | ||
|
|
2433777a76 | ||
|
|
bb7001f5f2 | ||
|
|
9b5aaa40de | ||
|
|
96d74f48f4 | ||
|
|
f07af25a6a | ||
|
|
14e65c4601 | ||
|
|
b5c2fb0259 | ||
|
|
92d98db7bb | ||
|
|
e6f7ef604a | ||
|
|
0eb8d3e47c | ||
|
|
072e30498b | ||
|
|
d6e73577af | ||
|
|
a81f98be8e | ||
|
|
0980e35c29 | ||
|
|
336c2a3aff | ||
|
|
e3055bc740 | ||
|
|
9406e3dbfb | ||
|
|
5307b7e1b1 | ||
|
|
f18a8f5b31 | ||
|
|
cfe99c4b72 | ||
|
|
0d439c0f55 | ||
|
|
6288a96a8b | ||
|
|
819b6f6ccf | ||
|
|
4bc06aa8cd | ||
|
|
7b64425c24 | ||
|
|
44c9d6a22b | ||
|
|
c750447d62 | ||
|
|
059ec8f3f2 | ||
|
|
2c5508febd | ||
|
|
905fff041b | ||
|
|
20ce29b033 | ||
|
|
4bd93a680e | ||
|
|
c9bf7f424d | ||
|
|
4cde2e1a78 | ||
|
|
48c045d381 | ||
|
|
2b385ead7f | ||
|
|
0fcc9f3df6 | ||
|
|
b251202804 | ||
|
|
6967010281 | ||
|
|
7e0846e66a | ||
|
|
4e3daad96d | ||
|
|
37fb3da5db | ||
|
|
762f48957c | ||
|
|
c1af7b8783 | ||
|
|
f89084677d | ||
|
|
0716084bbb | ||
|
|
a6c946e6c9 | ||
|
|
3f6e088faa | ||
|
|
9abdd5813b | ||
|
|
f33ea36e6f | ||
|
|
8788e0a9c9 | ||
|
|
b1c1cb4b9b | ||
|
|
982d4ac472 | ||
|
|
b7a8d667b9 | ||
|
|
8f8729df05 | ||
|
|
e928d281dd | ||
|
|
625583f5ab | ||
|
|
ab54553dd2 | ||
|
|
47bf7b1325 | ||
|
|
145d75f579 | ||
|
|
01d976d7f7 | ||
|
|
095e3720ab | ||
|
|
d62a37fe1f | ||
|
|
5323f2fc31 | ||
|
|
5539cb0d08 | ||
|
|
76e80106d6 | ||
|
|
9ab7b9a033 | ||
|
|
fe97d6a349 | ||
|
|
2242c2afe8 | ||
|
|
ec25fb5c36 | ||
|
|
ce25f5cadd | ||
|
|
1099f40f19 | ||
|
|
70368b3f1e | ||
|
|
0181ebad45 | ||
|
|
e158e3f13c | ||
|
|
b1bbded23c | ||
|
|
b77d9d3738 | ||
|
|
d0b2421752 | ||
|
|
96b65a7c60 | ||
|
|
177c90093e | ||
|
|
28ee091107 | ||
|
|
64c71d8e6d | ||
|
|
9ce0c94e17 | ||
|
|
08c3372635 | ||
|
|
2fafc70b69 | ||
|
|
0e62ebe3a2 | ||
|
|
1cc4d20b89 | ||
|
|
af4889894a | ||
|
|
429a5e1ea3 | ||
|
|
4ef860eb07 | ||
|
|
b59ebf30c6 | ||
|
|
a1ae8d54a6 | ||
|
|
8155207bea | ||
|
|
337d2cfa6d | ||
|
|
df2229782b | ||
|
|
5920552649 | ||
|
|
b4827fcb00 | ||
|
|
63983ccb65 | ||
|
|
eac7e2b749 | ||
|
|
65a365bca1 | ||
|
|
fecd0e11eb | ||
|
|
51ad526cfc | ||
|
|
10a062017d | ||
|
|
0d351794db | ||
|
|
067e3ffced | ||
|
|
50d55fae56 | ||
|
|
ce63628d3d | ||
|
|
13df7f90f6 | ||
|
|
f5099b873d | ||
|
|
70eb38895d | ||
|
|
7aea9fa1d2 | ||
|
|
5d30be31e0 | ||
|
|
7abe66e3de | ||
|
|
49ef5e5e64 | ||
|
|
c2266bc105 | ||
|
|
a813e219e6 | ||
|
|
1c1fb20546 | ||
|
|
65feb60bb8 | ||
|
|
f7492c7dc7 | ||
|
|
dfc805b89b | ||
|
|
75defc13a0 | ||
|
|
7d4888bb77 | ||
|
|
1a34029171 | ||
|
|
f6ad4652e4 | ||
|
|
1e25604b0b | ||
|
|
3a43ffa641 | ||
|
|
8f6bcf3d98 | ||
|
|
0fd9753681 | ||
|
|
76a04dfe25 | ||
|
|
16317182e3 | ||
|
|
6bcdf64f67 | ||
|
|
d276a07a71 | ||
|
|
f3b59b342a | ||
|
|
4a0f1f22ba | ||
|
|
0c85e7604c | ||
|
|
8f6a46e2d8 | ||
|
|
74b2c18296 | ||
|
|
b12d0b6424 | ||
|
|
60ddf0400e | ||
|
|
669d3484c0 | ||
|
|
5420ad97a3 | ||
|
|
36822926af | ||
|
|
eef8f2e781 | ||
|
|
31ac667623 | ||
|
|
868ceb25bf | ||
|
|
ee3ab94774 | ||
|
|
1c47877a8c | ||
|
|
84698462f3 | ||
|
|
da7dc793e7 | ||
|
|
044ee83fbc | ||
|
|
aea324c4a8 | ||
|
|
4d05b20830 | ||
|
|
276928951c |
6
.github/dependabot.yml
vendored
Normal file
6
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -2,7 +2,7 @@ name: build
|
||||
|
||||
on:
|
||||
release:
|
||||
types: [created, edited, published]
|
||||
types: [edited, published]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -45,13 +45,13 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [code_style, rule_linter]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- python: 2.7
|
||||
- python: 3.6
|
||||
- python: 3.7
|
||||
- python: 3.8
|
||||
- python: '3.9.0-rc.1' # Python latest
|
||||
- python: 3.9.1
|
||||
steps:
|
||||
- name: Checkout capa with submodules
|
||||
uses: actions/checkout@v2
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||

|
||||
|
||||
[](https://github.com/fireeye/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
|
||||
[](https://github.com/fireeye/capa-rules)
|
||||
[](https://github.com/fireeye/capa-rules)
|
||||
[](LICENSE.txt)
|
||||
|
||||
capa detects capabilities in executable files.
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
|
||||
import abc
|
||||
|
||||
from capa.helpers import oint
|
||||
|
||||
|
||||
class FeatureExtractor(object):
|
||||
"""
|
||||
@@ -35,6 +37,12 @@ class FeatureExtractor(object):
|
||||
#
|
||||
super(FeatureExtractor, self).__init__()
|
||||
|
||||
def block_offset(self, bb):
|
||||
return oint(bb)
|
||||
|
||||
def function_offset(self, f):
|
||||
return oint(f)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_base_address(self):
|
||||
"""
|
||||
|
||||
@@ -42,7 +42,9 @@ def is_ordinal(symbol):
|
||||
"""
|
||||
is the given symbol an ordinal that is prefixed by "#"?
|
||||
"""
|
||||
return symbol[0] == "#"
|
||||
if symbol:
|
||||
return symbol[0] == "#"
|
||||
return False
|
||||
|
||||
|
||||
def generate_symbols(dll, symbol):
|
||||
|
||||
@@ -166,6 +166,10 @@ def basic_block_size(bb):
|
||||
|
||||
def read_bytes_at(ea, count):
|
||||
""" """
|
||||
# check if byte has a value, see get_wide_byte doc
|
||||
if not idc.is_loaded(ea):
|
||||
return b""
|
||||
|
||||
segm_end = idc.get_segm_end(ea)
|
||||
if ea + count > segm_end:
|
||||
return idc.get_bytes(ea, segm_end - ea)
|
||||
|
||||
@@ -148,6 +148,9 @@ def extract_insn_bytes_features(f, bb, insn):
|
||||
example:
|
||||
push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
if idaapi.is_call_insn(insn):
|
||||
return
|
||||
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref != insn.ea:
|
||||
extracted_bytes = capa.features.extractors.ida.helpers.read_bytes_at(ref, MAX_BYTES_FEATURE_SIZE)
|
||||
@@ -302,7 +305,7 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn):
|
||||
bb (IDA BasicBlock)
|
||||
insn (IDA insn_t)
|
||||
"""
|
||||
if insn.itype != idaapi.NN_xor:
|
||||
if insn.itype not in (idaapi.NN_xor, idaapi.NN_xorpd, idaapi.NN_xorps, idaapi.NN_pxor):
|
||||
return
|
||||
if capa.features.extractors.ida.helpers.is_operand_equal(insn.Op1, insn.Op2):
|
||||
return
|
||||
|
||||
107
capa/features/extractors/miasm/__init__.py
Normal file
107
capa/features/extractors/miasm/__init__.py
Normal file
@@ -0,0 +1,107 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import miasm.analysis.binary
|
||||
import miasm.analysis.machine
|
||||
from miasm.core.locationdb import LocationDB
|
||||
|
||||
import capa.features.extractors.miasm.file
|
||||
import capa.features.extractors.miasm.insn
|
||||
import capa.features.extractors.miasm.function
|
||||
import capa.features.extractors.miasm.basicblock
|
||||
from capa.features.extractors import FeatureExtractor
|
||||
|
||||
|
||||
class MiasmFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, buf):
|
||||
super(MiasmFeatureExtractor, self).__init__()
|
||||
self.buf = buf
|
||||
self.loc_db = LocationDB()
|
||||
self.container = miasm.analysis.binary.Container.from_string(buf, self.loc_db)
|
||||
self.pe = self.container.executable
|
||||
self.machine = miasm.analysis.machine.Machine(self.container.arch)
|
||||
self.cfg = self._build_cfg()
|
||||
|
||||
def get_base_address(self):
|
||||
return self.container.entry_point
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in capa.features.extractors.miasm.file.extract_file_features(self):
|
||||
yield feature, va
|
||||
|
||||
# TODO: Improve this function (it just considers all loc_keys target of calls a function), port to miasm
|
||||
def get_functions(self):
|
||||
"""
|
||||
returns all loc_keys which are the argument of any call function
|
||||
"""
|
||||
functions = set()
|
||||
|
||||
for block in self.cfg.blocks:
|
||||
for line in block.lines:
|
||||
if line.is_subcall() and line.args[0].is_loc():
|
||||
loc_key = line.args[0].loc_key
|
||||
if loc_key not in functions:
|
||||
functions.add(loc_key)
|
||||
yield loc_key
|
||||
|
||||
def extract_function_features(self, loc_key):
|
||||
for feature, va in capa.features.extractors.miasm.function.extract_features(self, loc_key):
|
||||
yield feature, va
|
||||
|
||||
def block_offset(self, bb):
|
||||
return bb.lines[0].offset
|
||||
|
||||
def function_offset(self, f):
|
||||
return self.cfg.loc_key_to_block(f).lines[0].offset
|
||||
|
||||
def get_basic_blocks(self, loc_key):
|
||||
"""
|
||||
get the basic blocks of the function represented by lock_key
|
||||
"""
|
||||
block = self.cfg.loc_key_to_block(loc_key)
|
||||
disassembler = self.machine.dis_engine(self.container.bin_stream, loc_db=self.loc_db, follow_call=False)
|
||||
cfg = disassembler.dis_multiblock(self.block_offset(block))
|
||||
return cfg.blocks
|
||||
|
||||
def extract_basic_block_features(self, _, bb):
|
||||
for feature, va in capa.features.extractors.miasm.basicblock.extract_features(bb):
|
||||
yield feature, va
|
||||
|
||||
def get_instructions(self, _, bb):
|
||||
return bb.lines
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
for feature, va in capa.features.extractors.miasm.insn.extract_features(self, f, bb, insn):
|
||||
yield feature, va
|
||||
|
||||
def _get_entry_points(self):
|
||||
entry_points = {self.get_base_address()}
|
||||
|
||||
for _, va in miasm.jitter.loader.pe.get_export_name_addr_list(self.pe):
|
||||
entry_points.add(va)
|
||||
|
||||
return entry_points
|
||||
|
||||
# This is more efficient that using the `blocks` argument in `dis_multiblock`
|
||||
# See http://www.williballenthin.com/post/2020-01-12-miasm-part-2
|
||||
# TODO: port this efficiency improvement to miasm
|
||||
def _build_cfg(self):
|
||||
loc_db = self.container.loc_db
|
||||
disassembler = self.machine.dis_engine(self.container.bin_stream, follow_call=True, loc_db=loc_db)
|
||||
job_done = set()
|
||||
cfgs = {}
|
||||
|
||||
for va in self._get_entry_points():
|
||||
cfgs[va] = disassembler.dis_multiblock(va, job_done=job_done)
|
||||
|
||||
complete_cfs = miasm.core.asmblock.AsmCFG(loc_db)
|
||||
for cfg in cfgs.values():
|
||||
complete_cfs.merge(cfg)
|
||||
|
||||
disassembler.apply_splitting(complete_cfs)
|
||||
return complete_cfs
|
||||
134
capa/features/extractors/miasm/basicblock.py
Normal file
134
capa/features/extractors/miasm/basicblock.py
Normal file
@@ -0,0 +1,134 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import sys
|
||||
import string
|
||||
import struct
|
||||
|
||||
from capa.features import Characteristic
|
||||
from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
|
||||
|
||||
# TODO: Avoid this duplication (this code is in __init__ as well)
|
||||
def block_offset(bb):
|
||||
return bb.lines[0].offset
|
||||
|
||||
|
||||
def extract_bb_tight_loop(bb):
|
||||
""" check basic block for tight loop indicators """
|
||||
if any(c.loc_key == bb.loc_key for c in bb.bto):
|
||||
yield Characteristic("tight loop"), block_offset(bb)
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(instr):
|
||||
"""
|
||||
Return if instruction moves immediate onto stack
|
||||
"""
|
||||
if not instr.name.startswith("MOV"):
|
||||
return False
|
||||
|
||||
try:
|
||||
dst, src = instr.args
|
||||
except ValueError:
|
||||
# not two operands
|
||||
return False
|
||||
|
||||
if not src.is_int():
|
||||
return False
|
||||
|
||||
if not dst.is_mem():
|
||||
return False
|
||||
|
||||
# should detect things like `@8[ESP + 0x8]` and `EBP` and not fail in other cases
|
||||
if any(register in str(dst) for register in ["EBP", "RBP", "ESP", "RSP"]):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
if sys.version_info >= (3, 0):
|
||||
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||
else:
|
||||
return all(ord(c) < 127 and c in string.printable for c in chars)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == b"\x00" for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
def get_printable_len(insn):
|
||||
"""
|
||||
Return string length if all operand bytes are ascii or utf16-le printable
|
||||
"""
|
||||
dst, src = insn.args
|
||||
|
||||
if not src.is_int():
|
||||
return ValueError("unexpected operand type")
|
||||
|
||||
if not dst.is_mem():
|
||||
return ValueError("unexpected operand type")
|
||||
|
||||
if isinstance(src.arg, int):
|
||||
val = src.arg
|
||||
else:
|
||||
val = src.arg.arg
|
||||
|
||||
size = (val.bit_length() + 7) // 8
|
||||
|
||||
if size == 0:
|
||||
return 0
|
||||
elif size == 1:
|
||||
chars = struct.pack("<B", val)
|
||||
elif size == 2:
|
||||
chars = struct.pack("<H", val)
|
||||
elif size == 4:
|
||||
chars = struct.pack("<I", val)
|
||||
elif size == 8:
|
||||
chars = struct.pack("<Q", val)
|
||||
|
||||
if is_printable_ascii(chars):
|
||||
return size
|
||||
|
||||
if is_printable_utf16le(chars):
|
||||
return size / 2
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def extract_stackstring(bb):
|
||||
""" check basic block for stackstring indicators """
|
||||
count = 0
|
||||
for line in bb.lines:
|
||||
if is_mov_imm_to_stack(line):
|
||||
count += get_printable_len(line)
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
yield Characteristic("stack string"), block_offset(bb)
|
||||
return
|
||||
|
||||
|
||||
def extract_features(bb):
|
||||
"""
|
||||
extract features from the given basic block.
|
||||
args:
|
||||
bb (miasm.core.asmblock.AsmBlock): the basic block to process.
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this basic block.
|
||||
"""
|
||||
yield BasicBlock(), block_offset(bb)
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for feature, va in bb_handler(bb):
|
||||
yield feature, va
|
||||
|
||||
|
||||
BASIC_BLOCK_HANDLERS = (
|
||||
extract_bb_tight_loop,
|
||||
extract_stackstring,
|
||||
)
|
||||
102
capa/features/extractors/miasm/file.py
Normal file
102
capa/features/extractors/miasm/file.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import re
|
||||
|
||||
import miasm.analysis.binary
|
||||
|
||||
import capa.features.extractors.strings
|
||||
from capa.features import String, Characteristic
|
||||
from capa.features.file import Export, Import, Section
|
||||
|
||||
|
||||
def extract_file_embedded_pe(extractor):
|
||||
"""
|
||||
extract embedded PE features
|
||||
"""
|
||||
buf = extractor.buf
|
||||
for match in re.finditer(b"MZ", buf):
|
||||
offset = match.start()
|
||||
subcontainer = miasm.analysis.binary.ContainerPE.from_string(buf[offset:], loc_db=extractor.loc_db)
|
||||
if isinstance(subcontainer, miasm.analysis.binary.ContainerPE):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(extractor):
|
||||
"""
|
||||
extract file exports and their addresses
|
||||
"""
|
||||
for symbol, va in miasm.jitter.loader.pe.get_export_name_addr_list(extractor.pe):
|
||||
# Only use func names and not ordinals
|
||||
if isinstance(symbol, str):
|
||||
yield Export(symbol), va
|
||||
|
||||
|
||||
def extract_file_import_names(extractor):
|
||||
"""
|
||||
extract imported function names and their addresses
|
||||
1. imports by ordinal:
|
||||
- modulename.#ordinal
|
||||
2. imports by name, results in two features to support importname-only matching:
|
||||
- modulename.importname
|
||||
- importname
|
||||
"""
|
||||
for ((dll, symbol), va_set) in miasm.jitter.loader.pe.get_import_address_pe(extractor.pe).items():
|
||||
dll_name = dll[:-4] # Remove .dll
|
||||
for va in va_set:
|
||||
if isinstance(symbol, int):
|
||||
yield Import("%s.#%s" % (dll_name, symbol)), va
|
||||
else:
|
||||
yield Import("%s.%s" % (dll_name, symbol)), va
|
||||
yield Import(symbol), va
|
||||
|
||||
|
||||
def extract_file_section_names(extractor):
|
||||
"""
|
||||
extract file sections and their addresses
|
||||
"""
|
||||
for section in extractor.pe.SHList.shlist:
|
||||
name = section.name.partition(b"\x00")[0].decode("ascii")
|
||||
va = section.addr
|
||||
yield Section(name), va
|
||||
|
||||
|
||||
def extract_file_strings(extractor):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(extractor.buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(extractor.buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_features(extractor):
|
||||
"""
|
||||
extract file features from given buffer and parsed binary
|
||||
|
||||
args:
|
||||
buf (bytes): binary content
|
||||
container (miasm.analysis.binary.ContainerPE): parsed binary returned by miasm
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(extractor):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_embedded_pe,
|
||||
extract_file_export_names,
|
||||
extract_file_import_names,
|
||||
extract_file_section_names,
|
||||
extract_file_strings,
|
||||
)
|
||||
50
capa/features/extractors/miasm/function.py
Normal file
50
capa/features/extractors/miasm/function.py
Normal file
@@ -0,0 +1,50 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from capa.features import Characteristic
|
||||
|
||||
|
||||
def extract_function_calls_to(extractor, loc_key):
|
||||
for pred_key in extractor.cfg.predecessors(loc_key):
|
||||
pred_block = extractor.cfg.loc_key_to_block(pred_key)
|
||||
pred_insn = pred_block.get_subcall_instr()
|
||||
if pred_insn and pred_insn.is_subcall():
|
||||
dst = pred_insn.args[0]
|
||||
if dst.is_loc() and dst.loc_key == loc_key:
|
||||
yield Characteristic("calls to"), pred_insn.offset
|
||||
|
||||
|
||||
def extract_function_loop(extractor, loc_key):
|
||||
"""
|
||||
returns if the function has a loop
|
||||
"""
|
||||
block = extractor.cfg.loc_key_to_block(loc_key)
|
||||
disassembler = extractor.machine.dis_engine(
|
||||
extractor.container.bin_stream, loc_db=extractor.loc_db, follow_call=False
|
||||
)
|
||||
offset = extractor.block_offset(block)
|
||||
cfg = disassembler.dis_multiblock(offset)
|
||||
if cfg.has_loop():
|
||||
yield Characteristic("loop"), offset
|
||||
|
||||
|
||||
def extract_features(extractor, loc_key):
|
||||
"""
|
||||
extract features from the given function.
|
||||
args:
|
||||
cfg (AsmCFG): the CFG of the function from which to extract features
|
||||
loc_key (LocKey): LocKey which represents the beginning of the function
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this function.
|
||||
"""
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, va in func_handler(extractor, loc_key):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
126
capa/features/extractors/miasm/insn.py
Normal file
126
capa/features/extractors/miasm/insn.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import miasm.expression.expression
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.insn import Mnemonic
|
||||
|
||||
|
||||
# TODO: remove duplication (similar code in file.py)
|
||||
# TODO: this function should be cached
|
||||
def get_imports(pe):
|
||||
imports = {}
|
||||
for ((dll, symbol), va_set) in miasm.jitter.loader.pe.get_import_address_pe(pe).items():
|
||||
dll_name = dll[:-4]
|
||||
for va in va_set:
|
||||
if isinstance(symbol, int):
|
||||
imports[va] = "%s.#%s" % (dll_name, symbol)
|
||||
else:
|
||||
imports[va] = "%s.%s" % (dll_name, symbol)
|
||||
return imports
|
||||
|
||||
|
||||
def extract_insn_api_features(extractor, _f, _bb, insn):
|
||||
"""parse API features from the given instruction."""
|
||||
if insn.is_subcall():
|
||||
arg = insn.args[0]
|
||||
if isinstance(arg, miasm.expression.expression.ExprMem) and isinstance(
|
||||
arg.ptr, miasm.expression.expression.ExprInt
|
||||
):
|
||||
target = int(arg.ptr)
|
||||
imports = get_imports(extractor.pe)
|
||||
if target in imports:
|
||||
dll, _, symbol = imports[target].rpartition(".")
|
||||
for feature in capa.features.extractors.helpers.generate_symbols(dll, symbol):
|
||||
yield feature, insn.offset
|
||||
|
||||
|
||||
def extract_insn_number_features(extractor, f, bb, insn):
|
||||
"""parse number features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_string_features(extractor, f, bb, insn):
|
||||
"""parse string features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_offset_features(extractor, f, bb, insn):
|
||||
"""parse structure offset features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(extractor, f, bb, insn):
|
||||
"""
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(extractor, f, bb, insn):
|
||||
"""parse mnemonic features from the given instruction."""
|
||||
yield Mnemonic(insn.name), insn.offset
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(extractor, f, bb, insn):
|
||||
"""
|
||||
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(extractor, f, bb, insn):
|
||||
""" parse the instruction for access to fs or gs """
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(extractor, f, bb, insn):
|
||||
"""
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_calls_from(f, bb, insn):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_features(extractor, f, bb, insn):
|
||||
"""
|
||||
extract features from the given insn.
|
||||
args:
|
||||
extractor (MiasmFeatureExtractor)
|
||||
f (miasm.expression.expression.LocKey): the function from which to extract features
|
||||
bb (miasm.core.asmblock.AsmBlock): the basic block to process.
|
||||
insn (Instruction): the instruction to process.
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this insn.
|
||||
"""
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
for feature, va in insn_handler(extractor, f, bb, insn):
|
||||
yield feature, va
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
# extract_insn_number_features,
|
||||
# extract_insn_string_features,
|
||||
# extract_insn_bytes_features,
|
||||
# extract_insn_offset_features,
|
||||
# extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
# extract_insn_peb_access_characteristic_features,
|
||||
# extract_insn_cross_section_cflow,
|
||||
# extract_insn_segment_access_features,
|
||||
# extract_function_calls_from,
|
||||
# extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
52
capa/features/extractors/smda/__init__.py
Normal file
52
capa/features/extractors/smda/__init__.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import sys
|
||||
import types
|
||||
|
||||
from smda.common.SmdaReport import SmdaReport
|
||||
from smda.common.SmdaInstruction import SmdaInstruction
|
||||
|
||||
import capa.features.extractors.smda.file
|
||||
import capa.features.extractors.smda.insn
|
||||
import capa.features.extractors.smda.function
|
||||
import capa.features.extractors.smda.basicblock
|
||||
from capa.main import UnsupportedRuntimeError
|
||||
from capa.features.extractors import FeatureExtractor
|
||||
|
||||
|
||||
class SmdaFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, smda_report: SmdaReport, path):
|
||||
super(SmdaFeatureExtractor, self).__init__()
|
||||
if sys.version_info < (3, 0):
|
||||
raise UnsupportedRuntimeError("SMDA should only be used with Python 3.")
|
||||
self.smda_report = smda_report
|
||||
self.path = path
|
||||
|
||||
def get_base_address(self):
|
||||
return self.smda_report.base_addr
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path):
|
||||
yield feature, va
|
||||
|
||||
def get_functions(self):
|
||||
for function in self.smda_report.getFunctions():
|
||||
yield function
|
||||
|
||||
def extract_function_features(self, f):
|
||||
for feature, va in capa.features.extractors.smda.function.extract_features(f):
|
||||
yield feature, va
|
||||
|
||||
def get_basic_blocks(self, f):
|
||||
for bb in f.getBlocks():
|
||||
yield bb
|
||||
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb):
|
||||
yield feature, va
|
||||
|
||||
def get_instructions(self, f, bb):
|
||||
for smda_ins in bb.getInstructions():
|
||||
yield smda_ins
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn):
|
||||
yield feature, va
|
||||
131
capa/features/extractors/smda/basicblock.py
Normal file
131
capa/features/extractors/smda/basicblock.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import sys
|
||||
import string
|
||||
import struct
|
||||
|
||||
from capa.features import Characteristic
|
||||
from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
|
||||
|
||||
def _bb_has_tight_loop(f, bb):
|
||||
"""
|
||||
parse tight loops, true if last instruction in basic block branches to bb start
|
||||
"""
|
||||
return bb.offset in f.blockrefs[bb.offset] if bb.offset in f.blockrefs else False
|
||||
|
||||
|
||||
def extract_bb_tight_loop(f, bb):
|
||||
""" check basic block for tight loop indicators """
|
||||
if _bb_has_tight_loop(f, bb):
|
||||
yield Characteristic("tight loop"), bb.offset
|
||||
|
||||
|
||||
def _bb_has_stackstring(f, bb):
|
||||
"""
|
||||
extract potential stackstring creation, using the following heuristics:
|
||||
- basic block contains enough moves of constant bytes to the stack
|
||||
"""
|
||||
count = 0
|
||||
for instr in bb.getInstructions():
|
||||
if is_mov_imm_to_stack(instr):
|
||||
count += get_printable_len(instr.getDetailed())
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_operands(smda_ins):
|
||||
return [o.strip() for o in smda_ins.operands.split(",")]
|
||||
|
||||
|
||||
def extract_stackstring(f, bb):
|
||||
""" check basic block for stackstring indicators """
|
||||
if _bb_has_stackstring(f, bb):
|
||||
yield Characteristic("stack string"), bb.offset
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(smda_ins):
|
||||
"""
|
||||
Return if instruction moves immediate onto stack
|
||||
"""
|
||||
if not smda_ins.mnemonic.startswith("mov"):
|
||||
return False
|
||||
|
||||
try:
|
||||
dst, src = get_operands(smda_ins)
|
||||
except ValueError:
|
||||
# not two operands
|
||||
return False
|
||||
|
||||
try:
|
||||
int(src, 16)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
if not any(regname in dst for regname in ["ebp", "rbp", "esp", "rsp"]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == 0x00 for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
def get_printable_len(instr):
|
||||
"""
|
||||
Return string length if all operand bytes are ascii or utf16-le printable
|
||||
|
||||
Works on a capstone instruction
|
||||
"""
|
||||
# should have exactly two operands for mov immediate
|
||||
if len(instr.operands) != 2:
|
||||
return 0
|
||||
|
||||
op_value = instr.operands[1].value.imm
|
||||
|
||||
if instr.imm_size == 1:
|
||||
chars = struct.pack("<B", op_value & 0xFF)
|
||||
elif instr.imm_size == 2:
|
||||
chars = struct.pack("<H", op_value & 0xFFFF)
|
||||
elif instr.imm_size == 4:
|
||||
chars = struct.pack("<I", op_value & 0xFFFFFFFF)
|
||||
elif instr.imm_size == 8:
|
||||
chars = struct.pack("<Q", op_value & 0xFFFFFFFFFFFFFFFF)
|
||||
else:
|
||||
raise ValueError("Unhandled operand data type 0x%x." % instr.imm_size)
|
||||
|
||||
if is_printable_ascii(chars):
|
||||
return instr.imm_size
|
||||
if is_printable_utf16le(chars):
|
||||
return instr.imm_size // 2
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def extract_features(f, bb):
|
||||
"""
|
||||
extract features from the given basic block.
|
||||
|
||||
args:
|
||||
f (smda.common.SmdaFunction): the function from which to extract features
|
||||
bb (smda.common.SmdaBasicBlock): the basic block to process.
|
||||
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this basic block.
|
||||
"""
|
||||
yield BasicBlock(), bb.offset
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for feature, va in bb_handler(f, bb):
|
||||
yield feature, va
|
||||
|
||||
|
||||
BASIC_BLOCK_HANDLERS = (
|
||||
extract_bb_tight_loop,
|
||||
extract_stackstring,
|
||||
)
|
||||
139
capa/features/extractors/smda/file.py
Normal file
139
capa/features/extractors/smda/file.py
Normal file
@@ -0,0 +1,139 @@
|
||||
import struct
|
||||
|
||||
# if we have SMDA we definitely have lief
|
||||
import lief
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
import capa.features.extractors.strings
|
||||
from capa.features import String, Characteristic
|
||||
from capa.features.file import Export, Import, Section
|
||||
|
||||
|
||||
def carve(pbytes, offset=0):
|
||||
"""
|
||||
Return a list of (offset, size, xor) tuples of embedded PEs
|
||||
|
||||
Based on the version from vivisect:
|
||||
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
|
||||
And its IDA adaptation:
|
||||
capa/features/extractors/ida/file.py
|
||||
"""
|
||||
mz_xor = [
|
||||
(
|
||||
capa.features.extractors.helpers.xor_static(b"MZ", i),
|
||||
capa.features.extractors.helpers.xor_static(b"PE", i),
|
||||
i,
|
||||
)
|
||||
for i in range(256)
|
||||
]
|
||||
|
||||
pblen = len(pbytes)
|
||||
todo = [(pbytes.find(mzx, offset), mzx, pex, i) for mzx, pex, i in mz_xor]
|
||||
todo = [(off, mzx, pex, i) for (off, mzx, pex, i) in todo if off != -1]
|
||||
|
||||
while len(todo):
|
||||
|
||||
off, mzx, pex, i = todo.pop()
|
||||
|
||||
# The MZ header has one field we will check
|
||||
# e_lfanew is at 0x3c
|
||||
e_lfanew = off + 0x3C
|
||||
if pblen < (e_lfanew + 4):
|
||||
continue
|
||||
|
||||
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(pbytes[e_lfanew : e_lfanew + 4], i))[0]
|
||||
|
||||
nextres = pbytes.find(mzx, off + 1)
|
||||
if nextres != -1:
|
||||
todo.append((nextres, mzx, pex, i))
|
||||
|
||||
peoff = off + newoff
|
||||
if pblen < (peoff + 2):
|
||||
continue
|
||||
|
||||
if pbytes[peoff : peoff + 2] == pex:
|
||||
yield (off, i)
|
||||
|
||||
|
||||
def extract_file_embedded_pe(smda_report, file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
fbytes = f.read()
|
||||
|
||||
for offset, i in carve(fbytes, 1):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(smda_report, file_path):
|
||||
lief_binary = lief.parse(file_path)
|
||||
if lief_binary is not None:
|
||||
for function in lief_binary.exported_functions:
|
||||
yield Export(function.name), function.address
|
||||
|
||||
|
||||
def extract_file_import_names(smda_report, file_path):
|
||||
# extract import table info via LIEF
|
||||
lief_binary = lief.parse(file_path)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
for imported_library in lief_binary.imports:
|
||||
library_name = imported_library.name.lower()
|
||||
library_name = library_name[:-4] if library_name.endswith(".dll") else library_name
|
||||
for func in imported_library.entries:
|
||||
if func.name:
|
||||
va = func.iat_address + smda_report.base_addr
|
||||
for name in capa.features.extractors.helpers.generate_symbols(library_name, func.name):
|
||||
yield Import(name), va
|
||||
elif func.is_ordinal:
|
||||
for name in capa.features.extractors.helpers.generate_symbols(library_name, "#%s" % func.ordinal):
|
||||
yield Import(name), va
|
||||
|
||||
|
||||
def extract_file_section_names(smda_report, file_path):
|
||||
lief_binary = lief.parse(file_path)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
if lief_binary and lief_binary.sections:
|
||||
base_address = lief_binary.optional_header.imagebase
|
||||
for section in lief_binary.sections:
|
||||
yield Section(section.name), base_address + section.virtual_address
|
||||
|
||||
|
||||
def extract_file_strings(smda_report, file_path):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
with open(file_path, "rb") as f:
|
||||
b = f.read()
|
||||
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(b):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(b):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_features(smda_report, file_path):
|
||||
"""
|
||||
extract file features from given workspace
|
||||
|
||||
args:
|
||||
smda_report (smda.common.SmdaReport): a SmdaReport
|
||||
file_path: path to the input file
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
|
||||
for file_handler in FILE_HANDLERS:
|
||||
result = file_handler(smda_report, file_path)
|
||||
for feature, va in file_handler(smda_report, file_path):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_embedded_pe,
|
||||
extract_file_export_names,
|
||||
extract_file_import_names,
|
||||
extract_file_section_names,
|
||||
extract_file_strings,
|
||||
)
|
||||
38
capa/features/extractors/smda/function.py
Normal file
38
capa/features/extractors/smda/function.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from capa.features import Characteristic
|
||||
from capa.features.extractors import loops
|
||||
|
||||
|
||||
def extract_function_calls_to(f):
|
||||
for inref in f.inrefs:
|
||||
yield Characteristic("calls to"), inref
|
||||
|
||||
|
||||
def extract_function_loop(f):
|
||||
"""
|
||||
parse if a function has a loop
|
||||
"""
|
||||
edges = []
|
||||
for bb_from, bb_tos in f.blockrefs.items():
|
||||
for bb_to in bb_tos:
|
||||
edges.append((bb_from, bb_to))
|
||||
|
||||
if edges and loops.has_loop(edges):
|
||||
yield Characteristic("loop"), f.offset
|
||||
|
||||
|
||||
def extract_features(f):
|
||||
"""
|
||||
extract features from the given function.
|
||||
|
||||
args:
|
||||
f (smda.common.SmdaFunction): the function from which to extract features
|
||||
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this function.
|
||||
"""
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, va in func_handler(f):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
393
capa/features/extractors/smda/insn.py
Normal file
393
capa/features/extractors/smda/insn.py
Normal file
@@ -0,0 +1,393 @@
|
||||
import re
|
||||
import string
|
||||
import struct
|
||||
|
||||
from smda.common.SmdaReport import SmdaReport
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features import (
|
||||
ARCH_X32,
|
||||
ARCH_X64,
|
||||
MAX_BYTES_FEATURE_SIZE,
|
||||
THUNK_CHAIN_DEPTH_DELTA,
|
||||
Bytes,
|
||||
String,
|
||||
Characteristic,
|
||||
)
|
||||
from capa.features.insn import API, Number, Offset, Mnemonic
|
||||
|
||||
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
PATTERN_HEXNUM = re.compile(r"[+\-] (?P<num>0x[a-fA-F0-9]+)")
|
||||
PATTERN_SINGLENUM = re.compile(r"[+\-] (?P<num>[0-9])")
|
||||
|
||||
|
||||
def get_arch(smda_report):
|
||||
if smda_report.architecture == "intel":
|
||||
if smda_report.bitness == 32:
|
||||
return ARCH_X32
|
||||
elif smda_report.bitness == 64:
|
||||
return ARCH_X64
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def extract_insn_api_features(f, bb, insn):
|
||||
"""parse API features from the given instruction."""
|
||||
if insn.offset in f.apirefs:
|
||||
api_entry = f.apirefs[insn.offset]
|
||||
# reformat
|
||||
dll_name, api_name = api_entry.split("!")
|
||||
dll_name = dll_name.split(".")[0]
|
||||
dll_name = dll_name.lower()
|
||||
for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name):
|
||||
yield API(name), insn.offset
|
||||
elif insn.offset in f.outrefs:
|
||||
current_function = f
|
||||
current_instruction = insn
|
||||
for index in range(THUNK_CHAIN_DEPTH_DELTA):
|
||||
if current_function and len(current_function.outrefs[current_instruction.offset]) == 1:
|
||||
target = current_function.outrefs[current_instruction.offset][0]
|
||||
referenced_function = current_function.smda_report.getFunction(target)
|
||||
if referenced_function:
|
||||
# TODO SMDA: implement this function for both jmp and call, checking if function has 1 instruction which refs an API
|
||||
if referenced_function.isApiThunk():
|
||||
api_entry = (
|
||||
referenced_function.apirefs[target] if target in referenced_function.apirefs else None
|
||||
)
|
||||
if api_entry:
|
||||
# reformat
|
||||
dll_name, api_name = api_entry.split("!")
|
||||
dll_name = dll_name.split(".")[0]
|
||||
dll_name = dll_name.lower()
|
||||
for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name):
|
||||
yield API(name), insn.offset
|
||||
elif referenced_function.num_instructions == 1 and referenced_function.num_outrefs == 1:
|
||||
current_function = referenced_function
|
||||
current_instruction = [i for i in referenced_function.getInstructions()][0]
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def extract_insn_number_features(f, bb, insn):
|
||||
"""parse number features from the given instruction."""
|
||||
# example:
|
||||
#
|
||||
# push 3136B0h ; dwControlCode
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if insn.mnemonic == "add" and operands[0] in ["esp", "rsp"]:
|
||||
# skip things like:
|
||||
#
|
||||
# .text:00401140 call sub_407E2B
|
||||
# .text:00401145 add esp, 0Ch
|
||||
return
|
||||
for operand in operands:
|
||||
try:
|
||||
yield Number(int(operand, 16)), insn.offset
|
||||
yield Number(int(operand, 16), arch=get_arch(f.smda_report)), insn.offset
|
||||
except:
|
||||
continue
|
||||
|
||||
|
||||
def read_bytes(smda_report, va, num_bytes=None):
|
||||
"""
|
||||
read up to MAX_BYTES_FEATURE_SIZE from the given address.
|
||||
"""
|
||||
|
||||
rva = va - smda_report.base_addr
|
||||
if smda_report.buffer is None:
|
||||
return
|
||||
buffer_end = len(smda_report.buffer)
|
||||
max_bytes = num_bytes if num_bytes is not None else MAX_BYTES_FEATURE_SIZE
|
||||
if rva + max_bytes > buffer_end:
|
||||
return smda_report.buffer[rva:]
|
||||
else:
|
||||
return smda_report.buffer[rva : rva + max_bytes]
|
||||
|
||||
|
||||
def derefs(smda_report, p):
|
||||
"""
|
||||
recursively follow the given pointer, yielding the valid memory addresses along the way.
|
||||
useful when you may have a pointer to string, or pointer to pointer to string, etc.
|
||||
|
||||
this is a "do what i mean" type of helper function.
|
||||
|
||||
based on the implementation in viv/insn.py
|
||||
"""
|
||||
depth = 0
|
||||
while True:
|
||||
if not smda_report.isAddrWithinMemoryImage(p):
|
||||
return
|
||||
yield p
|
||||
|
||||
bytes_ = read_bytes(smda_report, p, num_bytes=4)
|
||||
val = struct.unpack("I", bytes_)[0]
|
||||
|
||||
# sanity: pointer points to self
|
||||
if val == p:
|
||||
return
|
||||
|
||||
# sanity: avoid chains of pointers that are unreasonably deep
|
||||
depth += 1
|
||||
if depth > 10:
|
||||
return
|
||||
|
||||
p = val
|
||||
|
||||
|
||||
def extract_insn_bytes_features(f, bb, insn):
|
||||
"""
|
||||
parse byte sequence features from the given instruction.
|
||||
example:
|
||||
# push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
for data_ref in insn.getDataRefs():
|
||||
for v in derefs(f.smda_report, data_ref):
|
||||
bytes_read = read_bytes(f.smda_report, v)
|
||||
if bytes_read is None:
|
||||
continue
|
||||
if capa.features.extractors.helpers.all_zeros(bytes_read):
|
||||
continue
|
||||
|
||||
yield Bytes(bytes_read), insn.offset
|
||||
|
||||
|
||||
def detect_ascii_len(smda_report, offset):
|
||||
if smda_report.buffer is None:
|
||||
return 0
|
||||
ascii_len = 0
|
||||
rva = offset - smda_report.base_addr
|
||||
char = smda_report.buffer[rva]
|
||||
while char < 127 and chr(char) in string.printable:
|
||||
ascii_len += 1
|
||||
rva += 1
|
||||
char = smda_report.buffer[rva]
|
||||
if char == 0:
|
||||
return ascii_len
|
||||
return 0
|
||||
|
||||
|
||||
def detect_unicode_len(smda_report, offset):
|
||||
if smda_report.buffer is None:
|
||||
return 0
|
||||
unicode_len = 0
|
||||
rva = offset - smda_report.base_addr
|
||||
char = smda_report.buffer[rva]
|
||||
second_char = smda_report.buffer[rva + 1]
|
||||
while char < 127 and chr(char) in string.printable and second_char == 0:
|
||||
unicode_len += 2
|
||||
rva += 2
|
||||
char = smda_report.buffer[rva]
|
||||
second_char = smda_report.buffer[rva + 1]
|
||||
if char == 0 and second_char == 0:
|
||||
return unicode_len
|
||||
return 0
|
||||
|
||||
|
||||
def read_string(smda_report, offset):
|
||||
alen = detect_ascii_len(smda_report, offset)
|
||||
if alen > 1:
|
||||
return read_bytes(smda_report, offset, alen).decode("utf-8")
|
||||
ulen = detect_unicode_len(smda_report, offset)
|
||||
if ulen > 2:
|
||||
return read_bytes(smda_report, offset, ulen).decode("utf-16")
|
||||
|
||||
|
||||
def extract_insn_string_features(f, bb, insn):
|
||||
"""parse string features from the given instruction."""
|
||||
# example:
|
||||
#
|
||||
# push offset aAcr ; "ACR > "
|
||||
for data_ref in insn.getDataRefs():
|
||||
for v in derefs(f.smda_report, data_ref):
|
||||
string_read = read_string(f.smda_report, v)
|
||||
if string_read:
|
||||
yield String(string_read.rstrip("\x00")), insn.offset
|
||||
|
||||
|
||||
def extract_insn_offset_features(f, bb, insn):
|
||||
"""parse structure offset features from the given instruction."""
|
||||
# examples:
|
||||
#
|
||||
# mov eax, [esi + 4]
|
||||
# mov eax, [esi + ecx + 16384]
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for operand in operands:
|
||||
if not "ptr" in operand:
|
||||
continue
|
||||
if "esp" in operand or "ebp" in operand or "rbp" in operand:
|
||||
continue
|
||||
number = 0
|
||||
number_hex = re.search(PATTERN_HEXNUM, operand)
|
||||
number_int = re.search(PATTERN_SINGLENUM, operand)
|
||||
if number_hex:
|
||||
number = int(number_hex.group("num"), 16)
|
||||
number = -1 * number if number_hex.group().startswith("-") else number
|
||||
elif number_int:
|
||||
number = int(number_int.group("num"))
|
||||
number = -1 * number if number_int.group().startswith("-") else number
|
||||
yield Offset(number), insn.offset
|
||||
yield Offset(number, arch=get_arch(f.smda_report)), insn.offset
|
||||
|
||||
|
||||
def is_security_cookie(f, bb, insn):
|
||||
"""
|
||||
check if an instruction is related to security cookie checks
|
||||
"""
|
||||
# security cookie check should use SP or BP
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if operands[1] not in ["esp", "ebp", "rsp", "rbp"]:
|
||||
return False
|
||||
for index, block in enumerate(f.getBlocks()):
|
||||
# expect security cookie init in first basic block within first bytes (instructions)
|
||||
block_instructions = [i for i in block.getInstructions()]
|
||||
if index == 0 and insn.offset < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA):
|
||||
return True
|
||||
# ... or within last bytes (instructions) before a return
|
||||
if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > (
|
||||
block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(f, bb, insn):
|
||||
"""
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
|
||||
if insn.mnemonic not in ("xor", "xorpd", "xorps", "pxor"):
|
||||
return
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if operands[0] == operands[1]:
|
||||
return
|
||||
|
||||
if is_security_cookie(f, bb, insn):
|
||||
return
|
||||
|
||||
yield Characteristic("nzxor"), insn.offset
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(f, bb, insn):
|
||||
"""parse mnemonic features from the given instruction."""
|
||||
yield Mnemonic(insn.mnemonic), insn.offset
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(f, bb, insn):
|
||||
"""
|
||||
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||
"""
|
||||
|
||||
if insn.mnemonic not in ["push", "mov"]:
|
||||
return
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for operand in operands:
|
||||
if "fs:" in operand and "0x30" in operand:
|
||||
yield Characteristic("peb access"), insn.offset
|
||||
elif "gs:" in operand and "0x60" in operand:
|
||||
yield Characteristic("peb access"), insn.offset
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(f, bb, insn):
|
||||
""" parse the instruction for access to fs or gs """
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for operand in operands:
|
||||
if "fs:" in operand:
|
||||
yield Characteristic("fs access"), insn.offset
|
||||
elif "gs:" in operand:
|
||||
yield Characteristic("gs access"), insn.offset
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(f, bb, insn):
|
||||
"""
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
if insn.mnemonic in ["call", "jmp"]:
|
||||
if insn.offset in f.apirefs:
|
||||
return
|
||||
|
||||
smda_report = insn.smda_function.smda_report
|
||||
if insn.offset in f.outrefs:
|
||||
for target in f.outrefs[insn.offset]:
|
||||
if smda_report.getSection(insn.offset) != smda_report.getSection(target):
|
||||
yield Characteristic("cross section flow"), insn.offset
|
||||
elif insn.operands.startswith("0x"):
|
||||
target = int(insn.operands, 16)
|
||||
if smda_report.getSection(insn.offset) != smda_report.getSection(target):
|
||||
yield Characteristic("cross section flow"), insn.offset
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_calls_from(f, bb, insn):
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
if insn.offset in f.outrefs:
|
||||
for outref in f.outrefs[insn.offset]:
|
||||
yield Characteristic("calls from"), outref
|
||||
|
||||
if outref == f.offset:
|
||||
# if we found a jump target and it's the function address
|
||||
# mark as recursive
|
||||
yield Characteristic("recursive call"), outref
|
||||
if insn.offset in f.apirefs:
|
||||
yield Characteristic("calls from"), f.apirefs[insn.offset]
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function or basic block scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_indirect_call_characteristic_features(f, bb, insn):
|
||||
"""
|
||||
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
"""
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
if insn.operands.startswith("0x"):
|
||||
return False
|
||||
if "qword ptr" in insn.operands and "rip" in insn.operands:
|
||||
return False
|
||||
if insn.operands.startswith("dword ptr [0x"):
|
||||
return False
|
||||
# call edx
|
||||
# call dword ptr [eax+50h]
|
||||
# call qword ptr [rsp+78h]
|
||||
yield Characteristic("indirect call"), insn.offset
|
||||
|
||||
|
||||
def extract_features(f, bb, insn):
|
||||
"""
|
||||
extract features from the given insn.
|
||||
|
||||
args:
|
||||
f (smda.common.SmdaFunction): the function to process.
|
||||
bb (smda.common.SmdaBasicBlock): the basic block to process.
|
||||
insn (smda.common.SmdaInstruction): the instruction to process.
|
||||
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this insn.
|
||||
"""
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
for feature, va in insn_handler(f, bb, insn):
|
||||
yield feature, va
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
extract_insn_number_features,
|
||||
extract_insn_string_features,
|
||||
extract_insn_bytes_features,
|
||||
extract_insn_offset_features,
|
||||
extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
extract_insn_peb_access_characteristic_features,
|
||||
extract_insn_cross_section_cflow,
|
||||
extract_insn_segment_access_features,
|
||||
extract_function_calls_from,
|
||||
extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
@@ -258,10 +258,10 @@ def extract_insn_bytes_features(f, bb, insn):
|
||||
example:
|
||||
# push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
for oper in insn.opers:
|
||||
if insn.mnem == "call":
|
||||
continue
|
||||
if insn.mnem == "call":
|
||||
return
|
||||
|
||||
for oper in insn.opers:
|
||||
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
|
||||
v = oper.getOperValue(oper)
|
||||
elif isinstance(oper, envi.archs.i386.disasm.i386RegMemOper):
|
||||
@@ -311,6 +311,10 @@ def read_string(vw, offset):
|
||||
# vivisect seems to mis-detect the end unicode strings
|
||||
# off by one, too short
|
||||
ulen += 1
|
||||
else:
|
||||
# vivisect seems to mis-detect the end unicode strings
|
||||
# off by two, too short
|
||||
ulen += 2
|
||||
return read_memory(vw, offset, ulen).decode("utf-16")
|
||||
|
||||
raise ValueError("not a string", offset)
|
||||
@@ -325,6 +329,9 @@ def extract_insn_string_features(f, bb, insn):
|
||||
for oper in insn.opers:
|
||||
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
|
||||
v = oper.getOperValue(oper)
|
||||
elif isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper):
|
||||
# like 0x10056CB4 in `lea eax, dword [0x10056CB4]`
|
||||
v = oper.imm
|
||||
elif isinstance(oper, envi.archs.i386.disasm.i386SibOper):
|
||||
# like 0x401000 in `mov eax, 0x401000[2 * ebx]`
|
||||
v = oper.imm
|
||||
@@ -415,7 +422,7 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn):
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
if insn.mnem != "xor":
|
||||
if insn.mnem not in ("xor", "xorpd", "xorps", "pxor"):
|
||||
return
|
||||
|
||||
if insn.opers[0] == insn.opers[1]:
|
||||
|
||||
@@ -5,6 +5,7 @@ json format:
|
||||
|
||||
{
|
||||
'version': 1,
|
||||
'base address': int(base address),
|
||||
'functions': {
|
||||
int(function va): {
|
||||
'basic blocks': {
|
||||
@@ -86,6 +87,7 @@ def dumps(extractor):
|
||||
"""
|
||||
ret = {
|
||||
"version": 1,
|
||||
"base address": extractor.get_base_address(),
|
||||
"functions": {},
|
||||
"scopes": {
|
||||
"file": [],
|
||||
@@ -147,6 +149,7 @@ def loads(s):
|
||||
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
|
||||
|
||||
features = {
|
||||
"base address": doc.get("base address"),
|
||||
"file features": [],
|
||||
"functions": {},
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ class API(Feature):
|
||||
modname, _, impname = name.rpartition(".")
|
||||
name = modname.lower() + "." + impname
|
||||
|
||||
super(API, self).__init__(name, description)
|
||||
super(API, self).__init__(name, description=description)
|
||||
|
||||
|
||||
class Number(Feature):
|
||||
@@ -37,4 +37,4 @@ class Offset(Feature):
|
||||
|
||||
class Mnemonic(Feature):
|
||||
def __init__(self, value, description=None):
|
||||
super(Mnemonic, self).__init__(value, description=description)
|
||||
super(Mnemonic, self).__init__(value.lower(), description=description)
|
||||
|
||||
@@ -103,6 +103,7 @@ def collect_metadata():
|
||||
"analysis": {
|
||||
"format": idaapi.get_file_type_name(),
|
||||
"extractor": "ida",
|
||||
"base_address": idaapi.get_imagebase(),
|
||||
},
|
||||
"version": capa.version.__version__,
|
||||
}
|
||||
|
||||
78
capa/main.py
78
capa/main.py
@@ -29,7 +29,7 @@ import capa.version
|
||||
import capa.features
|
||||
import capa.features.freeze
|
||||
import capa.features.extractors
|
||||
from capa.helpers import oint, get_file_taste
|
||||
from capa.helpers import get_file_taste
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SUPPORTED_FILE_MAGIC = set(["MZ"])
|
||||
@@ -40,8 +40,11 @@ logger = logging.getLogger("capa")
|
||||
|
||||
def set_vivisect_log_level(level):
|
||||
logging.getLogger("vivisect").setLevel(level)
|
||||
logging.getLogger("vivisect.base").setLevel(level)
|
||||
logging.getLogger("vivisect.impemu").setLevel(level)
|
||||
logging.getLogger("vtrace").setLevel(level)
|
||||
logging.getLogger("envi").setLevel(level)
|
||||
logging.getLogger("envi.codeflow").setLevel(level)
|
||||
|
||||
|
||||
def find_function_capabilities(ruleset, extractor, f):
|
||||
@@ -69,14 +72,14 @@ def find_function_capabilities(ruleset, extractor, f):
|
||||
bb_features[feature].add(va)
|
||||
function_features[feature].add(va)
|
||||
|
||||
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, oint(bb))
|
||||
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, extractor.block_offset(bb))
|
||||
|
||||
for rule_name, res in matches.items():
|
||||
bb_matches[rule_name].extend(res)
|
||||
for va, _ in res:
|
||||
function_features[capa.features.MatchedRule(rule_name)].add(va)
|
||||
|
||||
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, oint(f))
|
||||
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, extractor.function_offset(f))
|
||||
return function_matches, bb_matches, len(function_features)
|
||||
|
||||
|
||||
@@ -112,10 +115,16 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
|
||||
}
|
||||
}
|
||||
|
||||
for f in tqdm.tqdm(list(extractor.get_functions()), disable=disable_progress, desc="matching", unit=" functions"):
|
||||
pbar = tqdm.tqdm
|
||||
if disable_progress:
|
||||
# do not use tqdm to avoid unnecessary side effects when caller intends
|
||||
# to disable progress completely
|
||||
pbar = lambda s, *args, **kwargs: s
|
||||
|
||||
for f in pbar(list(extractor.get_functions()), desc="matching", unit=" functions"):
|
||||
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
||||
meta["feature_counts"]["functions"][f.__int__()] = feature_count
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", f.__int__(), feature_count)
|
||||
meta["feature_counts"]["functions"][extractor.function_offset(f)] = feature_count
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", extractor.function_offset(f), feature_count)
|
||||
|
||||
for rule_name, res in function_matches.items():
|
||||
all_function_matches[rule_name].extend(res)
|
||||
@@ -295,7 +304,27 @@ class UnsupportedRuntimeError(RuntimeError):
|
||||
|
||||
|
||||
def get_extractor_py3(path, format, disable_progress=False):
|
||||
raise UnsupportedRuntimeError()
|
||||
if False: # TODO: How to decide which backend to use?
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda
|
||||
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
|
||||
else:
|
||||
import capa.features.extractors.miasm
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
return capa.features.extractors.miasm.MiasmFeatureExtractor(buf)
|
||||
|
||||
|
||||
def get_extractor(path, format, disable_progress=False):
|
||||
@@ -351,7 +380,13 @@ def get_rules(rule_path, disable_progress=False):
|
||||
|
||||
rules = []
|
||||
|
||||
for rule_path in tqdm.tqdm(list(rule_paths), disable=disable_progress, desc="loading ", unit=" rules"):
|
||||
pbar = tqdm.tqdm
|
||||
if disable_progress:
|
||||
# do not use tqdm to avoid unnecessary side effects when caller intends
|
||||
# to disable progress completely
|
||||
pbar = lambda s, *args, **kwargs: s
|
||||
|
||||
for rule_path in pbar(list(rule_paths), desc="loading ", unit=" rules"):
|
||||
try:
|
||||
rule = capa.rules.Rule.from_yaml_file(rule_path)
|
||||
except capa.rules.InvalidRule:
|
||||
@@ -446,14 +481,23 @@ def main(argv=None):
|
||||
parser = argparse.ArgumentParser(
|
||||
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
parser.add_argument(
|
||||
# in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters
|
||||
# https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works
|
||||
# in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/)
|
||||
"sample",
|
||||
type=lambda s: s.decode(sys.getfilesystemencoding()),
|
||||
help="path to sample to analyze",
|
||||
)
|
||||
|
||||
if sys.version_info >= (3, 0):
|
||||
parser.add_argument(
|
||||
# Python 3 str handles non-ASCII arguments correctly
|
||||
"sample",
|
||||
type=str,
|
||||
help="path to sample to analyze",
|
||||
)
|
||||
else:
|
||||
parser.add_argument(
|
||||
# in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters
|
||||
# https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works
|
||||
# in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/)
|
||||
"sample",
|
||||
type=lambda s: s.decode(sys.getfilesystemencoding()),
|
||||
help="path to sample to analyze",
|
||||
)
|
||||
parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
@@ -550,7 +594,7 @@ def main(argv=None):
|
||||
# during the load of the RuleSet, we extract subscope statements into their own rules
|
||||
# that are subsequently `match`ed upon. this inflates the total rule count.
|
||||
# so, filter out the subscope rules when reporting total number of loaded rules.
|
||||
len(filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())),
|
||||
len([i for i in filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())]),
|
||||
)
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import re
|
||||
import uuid
|
||||
import codecs
|
||||
import logging
|
||||
@@ -600,6 +601,9 @@ class Rule(object):
|
||||
# use block mode, not inline json-like mode
|
||||
y.default_flow_style = False
|
||||
|
||||
# leave quotes unchanged
|
||||
y.preserve_quotes = True
|
||||
|
||||
# indent lists by two spaces below their parent
|
||||
#
|
||||
# features:
|
||||
@@ -614,16 +618,20 @@ class Rule(object):
|
||||
return y
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls, s):
|
||||
# use pyyaml because it can be much faster than ruamel (pure python)
|
||||
doc = yaml.load(s, Loader=cls._get_yaml_loader())
|
||||
def from_yaml(cls, s, use_ruamel=False):
|
||||
if use_ruamel:
|
||||
# ruamel enables nice formatting and doc roundtripping with comments
|
||||
doc = cls._get_ruamel_yaml_parser().load(s)
|
||||
else:
|
||||
# use pyyaml because it can be much faster than ruamel (pure python)
|
||||
doc = yaml.load(s, Loader=cls._get_yaml_loader())
|
||||
return cls.from_dict(doc, s)
|
||||
|
||||
@classmethod
|
||||
def from_yaml_file(cls, path):
|
||||
def from_yaml_file(cls, path, use_ruamel=False):
|
||||
with open(path, "rb") as f:
|
||||
try:
|
||||
return cls.from_yaml(f.read().decode("utf-8"))
|
||||
return cls.from_yaml(f.read().decode("utf-8"), use_ruamel=use_ruamel)
|
||||
except InvalidRule as e:
|
||||
raise InvalidRuleWithPath(path, str(e))
|
||||
|
||||
@@ -716,7 +724,18 @@ class Rule(object):
|
||||
# tweaking `ruamel.indent()` doesn't quite give us the control we want.
|
||||
# so, add the two extra spaces that we've determined we need through experimentation.
|
||||
# see #263
|
||||
doc = doc.replace(" description:", " description:")
|
||||
# only do this for the features section, so the meta description doesn't get reformatted
|
||||
# assumes features section always exists
|
||||
features_offset = doc.find("features")
|
||||
doc = doc[:features_offset] + doc[features_offset:].replace(" description:", " description:")
|
||||
|
||||
# for negative hex numbers, yaml dump outputs:
|
||||
# - offset: !!int '0x-30'
|
||||
# we prefer:
|
||||
# - offset: -0x30
|
||||
# the below regex makes these adjustments and while ugly, we don't have to explore the ruamel.yaml insides
|
||||
doc = re.sub(r"!!int '0x-([0-9a-fA-F]+)'", r"-0x\1", doc)
|
||||
|
||||
return doc
|
||||
|
||||
|
||||
@@ -866,7 +885,8 @@ class RuleSet(object):
|
||||
given a collection of rules, collect the rules that are needed at the given scope.
|
||||
these rules are ordered topologically.
|
||||
|
||||
don't include "lib" rules, unless they are dependencies of other rules.
|
||||
don't include auto-generated "subscope" rules.
|
||||
we want to include general "lib" rules here - even if they are not dependencies of other rules, see #398
|
||||
"""
|
||||
scope_rules = set([])
|
||||
|
||||
@@ -875,7 +895,7 @@ class RuleSet(object):
|
||||
# at lower scope, e.g. function scope.
|
||||
# so, we find all dependencies of all rules, and later will filter them down.
|
||||
for rule in rules:
|
||||
if rule.meta.get("lib", False):
|
||||
if rule.meta.get("capa/subscope-rule", False):
|
||||
continue
|
||||
|
||||
scope_rules.update(get_rules_and_dependencies(rules, rule.name))
|
||||
|
||||
@@ -74,8 +74,20 @@ Note that some development dependencies (including the black code formatter) req
|
||||
To check the code style, formatting and run the tests you can run the script `scripts/ci.sh`.
|
||||
You can run it with the argument `no_tests` to skip the tests and only run the code style and formatting: `scripts/ci.sh no_tests`
|
||||
|
||||
### 3. Setup hooks [optional]
|
||||
### 3. Compile binary using PyInstaller
|
||||
We compile capa standalone binaries using PyInstaller. To reproduce the build process check out the source code as described above and follow these steps.
|
||||
|
||||
#### Install PyInstaller:
|
||||
For Python 2.7: `$ pip install 'pyinstaller==3.*'` (PyInstaller 4 doesn't support Python 2.7)
|
||||
|
||||
For Python 3: `$ pip install 'pyinstaller`
|
||||
|
||||
#### Run Pyinstaller
|
||||
`$ pyinstaller .github/pyinstaller/pyinstaller.spec`
|
||||
|
||||
You can find the compiled binary in the created directory `dist/`.
|
||||
|
||||
### 4. Setup hooks [optional]
|
||||
If you plan to contribute to capa, you may want to setup the hooks.
|
||||
Run `scripts/setup-hooks.sh` to set the following hooks up:
|
||||
- The `pre-commit` hook runs checks before every `git commit`.
|
||||
@@ -84,4 +96,3 @@ Run `scripts/setup-hooks.sh` to set the following hooks up:
|
||||
- The `pre-push` hook runs checks before every `git push`.
|
||||
It runs `scripts/ci.sh` aborting the push if there are code style or rule linter offenses or if the tests fail.
|
||||
This way you can ensure everything is alright before sending a pull request.
|
||||
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 6830d707c7...faa670ac38
214
scripts/capa_as_library.py
Normal file
214
scripts/capa_as_library.py
Normal file
@@ -0,0 +1,214 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
import collections
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.features
|
||||
import capa.render.utils as rutils
|
||||
from capa.engine import *
|
||||
from capa.render import convert_capabilities_to_result_document
|
||||
|
||||
# edit this to set the path for file to analyze and rule directory
|
||||
RULES_PATH = "/tmp/capa/rules/"
|
||||
|
||||
# load rules from disk
|
||||
rules = capa.main.get_rules(RULES_PATH, disable_progress=True)
|
||||
rules = capa.rules.RuleSet(rules)
|
||||
|
||||
# == Render ddictionary helpers
|
||||
def render_meta(doc, ostream):
|
||||
ostream["md5"] = doc["meta"]["sample"]["md5"]
|
||||
ostream["sha1"] = doc["meta"]["sample"]["sha1"]
|
||||
ostream["sha256"] = doc["meta"]["sample"]["sha256"]
|
||||
ostream["path"] = doc["meta"]["sample"]["path"]
|
||||
|
||||
|
||||
def find_subrule_matches(doc):
|
||||
"""
|
||||
collect the rule names that have been matched as a subrule match.
|
||||
this way we can avoid displaying entries for things that are too specific.
|
||||
"""
|
||||
matches = set([])
|
||||
|
||||
def rec(node):
|
||||
if not node["success"]:
|
||||
# there's probably a bug here for rules that do `not: match: ...`
|
||||
# but we don't have any examples of this yet
|
||||
return
|
||||
|
||||
elif node["node"]["type"] == "statement":
|
||||
for child in node["children"]:
|
||||
rec(child)
|
||||
|
||||
elif node["node"]["type"] == "feature":
|
||||
if node["node"]["feature"]["type"] == "match":
|
||||
matches.add(node["node"]["feature"]["match"])
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for node in rule["matches"].values():
|
||||
rec(node)
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def render_capabilities(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'CAPABILITY': {'accept command line arguments': 'host-interaction/cli',
|
||||
'allocate thread local storage (2 matches)': 'host-interaction/process',
|
||||
'check for time delay via GetTickCount': 'anti-analysis/anti-debugging/debugger-detection',
|
||||
'check if process is running under wine': 'anti-analysis/anti-emulation/wine',
|
||||
'contain a resource (.rsrc) section': 'executable/pe/section/rsrc',
|
||||
'write file (3 matches)': 'host-interaction/file-system/write'}
|
||||
}
|
||||
"""
|
||||
subrule_matches = find_subrule_matches(doc)
|
||||
|
||||
ostream["CAPABILITY"] = dict()
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if rule["meta"]["name"] in subrule_matches:
|
||||
# rules that are also matched by other rules should not get rendered by default.
|
||||
# this cuts down on the amount of output while giving approx the same detail.
|
||||
# see #224
|
||||
continue
|
||||
|
||||
count = len(rule["matches"])
|
||||
if count == 1:
|
||||
capability = rule["meta"]["name"]
|
||||
else:
|
||||
capability = "%s (%d matches)" % (rule["meta"]["name"], count)
|
||||
|
||||
ostream["CAPABILITY"].setdefault(rule["meta"]["namespace"], list())
|
||||
ostream["CAPABILITY"][rule["meta"]["namespace"]].append(capability)
|
||||
|
||||
|
||||
def render_attack(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'ATT&CK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'],
|
||||
'DEFENSE EVASION': ['Obfuscated Files or Information [T1027]',
|
||||
'Virtualization/Sandbox Evasion::System Checks '
|
||||
'[T1497.001]'],
|
||||
'DISCOVERY': ['File and Directory Discovery [T1083]',
|
||||
'Query Registry [T1012]',
|
||||
'System Information Discovery [T1082]'],
|
||||
'EXECUTION': ['Shared Modules [T1129]']}
|
||||
}
|
||||
"""
|
||||
ostream["ATTCK"] = dict()
|
||||
tactics = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("att&ck"):
|
||||
continue
|
||||
|
||||
for attack in rule["meta"]["att&ck"]:
|
||||
tactic, _, rest = attack.partition("::")
|
||||
if "::" in rest:
|
||||
technique, _, rest = rest.partition("::")
|
||||
subtechnique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, subtechnique, id))
|
||||
else:
|
||||
technique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, id))
|
||||
|
||||
for tactic, techniques in sorted(tactics.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(techniques):
|
||||
if len(spec) == 2:
|
||||
technique, id = spec
|
||||
inner_rows.append("%s %s" % (technique, id))
|
||||
elif len(spec) == 3:
|
||||
technique, subtechnique, id = spec
|
||||
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected ATT&CK spec format")
|
||||
ostream["ATTCK"].setdefault(tactic.upper(), inner_rows)
|
||||
|
||||
|
||||
def render_mbc(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check '
|
||||
'GetTickCount [B0001.032]',
|
||||
'Emulator Detection [B0004]',
|
||||
'Virtual Machine Detection::Instruction '
|
||||
'Testing [B0009.029]',
|
||||
'Virtual Machine Detection [B0009]'],
|
||||
'COLLECTION': ['Keylogging::Polling [F0002.002]'],
|
||||
'CRYPTOGRAPHY': ['Encrypt Data::RC4 [C0027.009]',
|
||||
'Generate Pseudo-random Sequence::RC4 PRGA '
|
||||
'[C0021.004]']}
|
||||
}
|
||||
"""
|
||||
ostream["MBC"] = dict()
|
||||
objectives = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("mbc"):
|
||||
continue
|
||||
|
||||
mbcs = rule["meta"]["mbc"]
|
||||
if not isinstance(mbcs, list):
|
||||
raise ValueError("invalid rule: MBC mapping is not a list")
|
||||
|
||||
for mbc in mbcs:
|
||||
objective, _, rest = mbc.partition("::")
|
||||
if "::" in rest:
|
||||
behavior, _, rest = rest.partition("::")
|
||||
method, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, method, id))
|
||||
else:
|
||||
behavior, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, id))
|
||||
|
||||
for objective, behaviors in sorted(objectives.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(behaviors):
|
||||
if len(spec) == 2:
|
||||
behavior, id = spec
|
||||
inner_rows.append("%s %s" % (behavior, id))
|
||||
elif len(spec) == 3:
|
||||
behavior, method, id = spec
|
||||
inner_rows.append("%s::%s %s" % (behavior, method, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected MBC spec format")
|
||||
ostream["MBC"].setdefault(objective.upper(), inner_rows)
|
||||
|
||||
|
||||
def render_dictionary(doc):
|
||||
ostream = dict()
|
||||
render_meta(doc, ostream)
|
||||
render_attack(doc, ostream)
|
||||
render_mbc(doc, ostream)
|
||||
render_capabilities(doc, ostream)
|
||||
|
||||
return ostream
|
||||
|
||||
|
||||
# ==== render dictionary helpers
|
||||
def capa_details(file_path, output_format="dictionary"):
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(file_path, "auto", disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, "auto", extractor)
|
||||
meta["analysis"].update(counts)
|
||||
|
||||
capa_output = False
|
||||
if output_format == "dictionary":
|
||||
# ...as python dictionary, simplified as textable but in dictionary
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
capa_output = render_dictionary(doc)
|
||||
elif output_format == "json":
|
||||
# render results
|
||||
# ...as json
|
||||
capa_output = json.loads(capa.render.render_json(meta, rules, capabilities))
|
||||
elif output_format == "texttable":
|
||||
# ...as human readable text table
|
||||
capa_output = capa.render.render_default(meta, rules, capabilities)
|
||||
|
||||
return capa_output
|
||||
@@ -38,6 +38,12 @@ def main(argv=None):
|
||||
)
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
|
||||
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--check",
|
||||
action="store_true",
|
||||
help="Don't output (reformatted) rule, only return status. 0 = no changes, 1 = would reformat",
|
||||
)
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.verbose:
|
||||
@@ -50,12 +56,22 @@ def main(argv=None):
|
||||
logging.basicConfig(level=level)
|
||||
logging.getLogger("capafmt").setLevel(level)
|
||||
|
||||
rule = capa.rules.Rule.from_yaml_file(args.path)
|
||||
rule = capa.rules.Rule.from_yaml_file(args.path, use_ruamel=True)
|
||||
reformatted_rule = rule.to_yaml()
|
||||
|
||||
if args.check:
|
||||
if rule.definition == reformatted_rule:
|
||||
logger.info("rule is formatted correctly, nice! (%s)", rule.name)
|
||||
return 0
|
||||
else:
|
||||
logger.info("rule requires reformatting (%s)", rule.name)
|
||||
return 1
|
||||
|
||||
if args.in_place:
|
||||
with open(args.path, "wb") as f:
|
||||
f.write(rule.to_yaml().encode("utf-8"))
|
||||
f.write(reformatted_rule.encode("utf-8"))
|
||||
else:
|
||||
print(rule.to_yaml().rstrip("\n"))
|
||||
print(reformatted_rule)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import string
|
||||
import difflib
|
||||
import hashlib
|
||||
import logging
|
||||
import os.path
|
||||
@@ -24,6 +26,7 @@ import itertools
|
||||
import posixpath
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.features
|
||||
import capa.features.insn
|
||||
@@ -194,7 +197,7 @@ class DoesntMatchExample(Lint):
|
||||
continue
|
||||
|
||||
try:
|
||||
extractor = capa.main.get_extractor(path, "auto")
|
||||
extractor = capa.main.get_extractor(path, "auto", disable_progress=True)
|
||||
capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
|
||||
except Exception as e:
|
||||
logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e)
|
||||
@@ -232,7 +235,7 @@ class LibRuleNotInLibDirectory(Lint):
|
||||
if "lib" not in rule.meta:
|
||||
return False
|
||||
|
||||
return "/lib/" not in get_normpath(rule.meta["capa/path"])
|
||||
return "lib/" not in get_normpath(rule.meta["capa/path"])
|
||||
|
||||
|
||||
class LibRuleHasNamespace(Lint):
|
||||
@@ -276,6 +279,32 @@ class FeatureNegativeNumber(Lint):
|
||||
return False
|
||||
|
||||
|
||||
class FormatSingleEmptyLineEOF(Lint):
|
||||
name = "EOF format"
|
||||
recommendation = "end file with a single empty line"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
if rule.definition.endswith("\n") and not rule.definition.endswith("\n\n"):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class FormatIncorrect(Lint):
|
||||
name = "rule format incorrect"
|
||||
recommendation_template = "use scripts/capafmt.py or adjust as follows\n{:s}"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
actual = rule.definition
|
||||
expected = capa.rules.Rule.from_yaml(rule.definition, use_ruamel=True).to_yaml()
|
||||
|
||||
if actual != expected:
|
||||
diff = difflib.ndiff(actual.splitlines(1), expected.splitlines(1))
|
||||
self.recommendation = self.recommendation_template.format("".join(diff))
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def run_lints(lints, ctx, rule):
|
||||
for lint in lints:
|
||||
if lint.check_rule(ctx, rule):
|
||||
@@ -331,15 +360,25 @@ FEATURE_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def get_normpath(path):
|
||||
return posixpath.normpath(path).replace(os.sep, "/")
|
||||
|
||||
|
||||
def lint_features(ctx, rule):
|
||||
features = get_features(ctx, rule)
|
||||
return run_feature_lints(FEATURE_LINTS, ctx, features)
|
||||
|
||||
|
||||
FORMAT_LINTS = (
|
||||
FormatSingleEmptyLineEOF(),
|
||||
FormatIncorrect(),
|
||||
)
|
||||
|
||||
|
||||
def lint_format(ctx, rule):
|
||||
return run_lints(FORMAT_LINTS, ctx, rule)
|
||||
|
||||
|
||||
def get_normpath(path):
|
||||
return posixpath.normpath(path).replace(os.sep, "/")
|
||||
|
||||
|
||||
def get_features(ctx, rule):
|
||||
# get features from rule and all dependencies including subscopes and matched rules
|
||||
features = []
|
||||
@@ -390,6 +429,7 @@ def lint_rule(ctx, rule):
|
||||
lint_meta(ctx, rule),
|
||||
lint_logic(ctx, rule),
|
||||
lint_features(ctx, rule),
|
||||
lint_format(ctx, rule),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -500,6 +540,7 @@ def main(argv=None):
|
||||
action="store_true",
|
||||
help="Enable thorough linting - takes more time, but does a better job",
|
||||
)
|
||||
parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
|
||||
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
|
||||
args = parser.parse_args(args=argv)
|
||||
@@ -516,15 +557,20 @@ def main(argv=None):
|
||||
|
||||
capa.main.set_vivisect_log_level(logging.CRITICAL)
|
||||
logging.getLogger("capa").setLevel(logging.CRITICAL)
|
||||
logging.getLogger("viv_utils").setLevel(logging.CRITICAL)
|
||||
|
||||
time0 = time.time()
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
rules = capa.main.get_rules(args.rules, disable_progress=True)
|
||||
rules = capa.rules.RuleSet(rules)
|
||||
logger.info("successfully loaded %s rules", len(rules))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
except capa.rules.InvalidRule as e:
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
logger.debug("selected %s rules", len(rules))
|
||||
for i, r in enumerate(rules.rules, 1):
|
||||
logger.debug(" %d. %s", i, r)
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
@@ -542,6 +588,10 @@ def main(argv=None):
|
||||
}
|
||||
|
||||
did_violate = lint(ctx, rules)
|
||||
|
||||
min, sec = divmod(time.time() - time0, 60)
|
||||
logger.debug("lints ran for ~ %02d:%02dm", min, sec)
|
||||
|
||||
if not did_violate:
|
||||
logger.info("no suggestions, nice!")
|
||||
return 0
|
||||
|
||||
5
setup.py
5
setup.py
@@ -11,7 +11,6 @@ import sys
|
||||
|
||||
import setuptools
|
||||
|
||||
# halo==0.0.30 is the last version to support py2.7
|
||||
requirements = [
|
||||
"six",
|
||||
"tqdm",
|
||||
@@ -21,16 +20,18 @@ requirements = [
|
||||
"termcolor",
|
||||
"ruamel.yaml",
|
||||
"wcwidth",
|
||||
"halo==0.0.30",
|
||||
"ida-settings==2.1.0",
|
||||
]
|
||||
|
||||
if sys.version_info >= (3, 0):
|
||||
# py3
|
||||
requirements.append("halo")
|
||||
requirements.append("networkx")
|
||||
requirements.append("smda==1.5.13")
|
||||
else:
|
||||
# py2
|
||||
requirements.append("enum34==1.1.6") # v1.1.6 is needed by halo 0.0.30 / spinners 0.0.24
|
||||
requirements.append("halo==0.0.30") # halo==0.0.30 is the last version to support py2.7
|
||||
requirements.append("vivisect==0.1.0")
|
||||
requirements.append("viv-utils")
|
||||
requirements.append("networkx==2.2") # v2.2 is last version supported by Python 2.7
|
||||
|
||||
Submodule tests/data updated: aa9c1496e6...440149c420
@@ -10,6 +10,7 @@
|
||||
import os
|
||||
import sys
|
||||
import os.path
|
||||
import binascii
|
||||
import contextlib
|
||||
import collections
|
||||
|
||||
@@ -78,7 +79,44 @@ def get_viv_extractor(path):
|
||||
vw = capa.main.get_workspace(path, "sc64", should_save=False)
|
||||
else:
|
||||
vw = capa.main.get_workspace(path, "auto", should_save=True)
|
||||
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
||||
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
||||
fixup_viv(path, extractor)
|
||||
return extractor
|
||||
|
||||
|
||||
def fixup_viv(path, extractor):
|
||||
"""
|
||||
vivisect fixups to overcome differences between backends
|
||||
"""
|
||||
if "3b13b" in path:
|
||||
# vivisect only recognizes calling thunk function at 0x10001573
|
||||
extractor.vw.makeFunction(0x10006860)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_smda_extractor(path):
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda
|
||||
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
disasm = Disassembler(config)
|
||||
report = disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(report, path)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_miasm_extractor(path):
|
||||
import capa.features.extractors.miasm
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
print("Using miasm!!!!")
|
||||
return capa.features.extractors.miasm.MiasmFeatureExtractor(buf)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
@@ -129,6 +167,8 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "Practical Malware Analysis Lab 21-01.exe_")
|
||||
elif name == "al-khaser x86":
|
||||
return os.path.join(CD, "data", "al-khaser_x86.exe_")
|
||||
elif name == "al-khaser x64":
|
||||
return os.path.join(CD, "data", "al-khaser_x64.exe_")
|
||||
elif name.startswith("39c05"):
|
||||
return os.path.join(CD, "data", "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_")
|
||||
elif name.startswith("499c2"):
|
||||
@@ -149,8 +189,12 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "82BF6347ACF15E5D883715DC289D8A2B.exe_")
|
||||
elif name.startswith("pingtaest"):
|
||||
return os.path.join(CD, "data", "ping_täst.exe_")
|
||||
elif name.startswith("77329"):
|
||||
return os.path.join(CD, "data", "773290480d5445f11d3dc1b800728966.exe_")
|
||||
elif name.startswith("3b13b"):
|
||||
return os.path.join(CD, "data", "3b13b6f1d7cd14dc4a097a12e2e505c0a4cff495262261e2bfc991df238b9b04.dll_")
|
||||
else:
|
||||
raise ValueError("unexpected sample fixture")
|
||||
raise ValueError("unexpected sample fixture: %s" % name)
|
||||
|
||||
|
||||
def get_sample_md5_by_name(name):
|
||||
@@ -169,6 +213,8 @@ def get_sample_md5_by_name(name):
|
||||
return "c8403fb05244e23a7931c766409b5e22"
|
||||
elif name == "al-khaser x86":
|
||||
return "db648cd247281954344f1d810c6fd590"
|
||||
elif name == "al-khaser x64":
|
||||
return "3cb21ae76ff3da4b7e02d77ff76e82be"
|
||||
elif name.startswith("39c05"):
|
||||
return "b7841b9d5dc1f511a93cc7576672ec0c"
|
||||
elif name.startswith("499c2"):
|
||||
@@ -187,8 +233,13 @@ def get_sample_md5_by_name(name):
|
||||
return "64d9f7d96b99467f36e22fada623c3bb"
|
||||
elif name.startswith("82bf6"):
|
||||
return "82bf6347acf15e5d883715dc289d8a2b"
|
||||
elif name.startswith("77329"):
|
||||
return "773290480d5445f11d3dc1b800728966"
|
||||
elif name.startswith("3b13b"):
|
||||
# file name is SHA256 hash
|
||||
return "56a6ffe6a02941028cc8235204eef31d"
|
||||
else:
|
||||
raise ValueError("unexpected sample fixture")
|
||||
raise ValueError("unexpected sample fixture: %s" % name)
|
||||
|
||||
|
||||
def resolve_sample(sample):
|
||||
@@ -202,14 +253,14 @@ def sample(request):
|
||||
|
||||
def get_function(extractor, fva):
|
||||
for f in extractor.get_functions():
|
||||
if f.__int__() == fva:
|
||||
if extractor.function_offset(f) == fva:
|
||||
return f
|
||||
raise ValueError("function not found")
|
||||
|
||||
|
||||
def get_basic_block(extractor, f, va):
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
if bb.__int__() == va:
|
||||
if extractor.block_offset(bb) == va:
|
||||
return bb
|
||||
raise ValueError("basic block not found")
|
||||
|
||||
@@ -377,7 +428,7 @@ FEATURE_PRESENCE_TESTS = [
|
||||
),
|
||||
("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True),
|
||||
# insn/api: x64 nested thunk
|
||||
("82bf6", "function=0x140059342", capa.features.insn.API("ElfClearEventLogFile"), True),
|
||||
("al-khaser x64", "function=0x14004B4F0", capa.features.insn.API("__vcrt_GetModuleHandle"), True),
|
||||
# insn/api: call via jmp
|
||||
("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True),
|
||||
("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True),
|
||||
@@ -392,16 +443,21 @@ FEATURE_PRESENCE_TESTS = [
|
||||
("mimikatz", "function=0x40105D", capa.features.String("SCardTransmit"), True),
|
||||
("mimikatz", "function=0x40105D", capa.features.String("ACR > "), True),
|
||||
("mimikatz", "function=0x40105D", capa.features.String("nope"), False),
|
||||
("773290...", "function=0x140001140", capa.features.String(r"%s:\\OfficePackagesForWDAG"), True),
|
||||
# insn/regex, issue #262
|
||||
("pma16-01", "function=0x4021B0", capa.features.Regex("HTTP/1.0"), True),
|
||||
("pma16-01", "function=0x4021B0", capa.features.Regex("www.practicalmalwareanalysis.com"), False),
|
||||
# insn/string, pointer to string
|
||||
("mimikatz", "function=0x44EDEF", capa.features.String("INPUTEVENT"), True),
|
||||
# insn/string, direct memory reference
|
||||
("mimikatz", "function=0x46D6CE", capa.features.String("(null)"), True),
|
||||
# insn/bytes
|
||||
("mimikatz", "function=0x40105D", capa.features.Bytes("SCardControl".encode("utf-16le")), True),
|
||||
("mimikatz", "function=0x40105D", capa.features.Bytes("SCardTransmit".encode("utf-16le")), True),
|
||||
("mimikatz", "function=0x40105D", capa.features.Bytes("ACR > ".encode("utf-16le")), True),
|
||||
("mimikatz", "function=0x40105D", capa.features.Bytes("nope".encode("ascii")), False),
|
||||
# IDA features included byte sequences read from invalid memory, fixed in #409
|
||||
("mimikatz", "function=0x44570F", capa.features.Bytes(binascii.unhexlify("FF" * 256)), False),
|
||||
# insn/bytes, pointer to bytes
|
||||
("mimikatz", "function=0x44EDEF", capa.features.Bytes("INPUTEVENT".encode("utf-16le")), True),
|
||||
# insn/characteristic(nzxor)
|
||||
@@ -409,6 +465,9 @@ FEATURE_PRESENCE_TESTS = [
|
||||
("mimikatz", "function=0x40105D", capa.features.Characteristic("nzxor"), False),
|
||||
# insn/characteristic(nzxor): no security cookies
|
||||
("mimikatz", "function=0x46D534", capa.features.Characteristic("nzxor"), False),
|
||||
# insn/characteristic(nzxor): xorps
|
||||
# viv needs fixup to recognize function, see above
|
||||
("3b13b...", "function=0x10006860", capa.features.Characteristic("nzxor"), True),
|
||||
# insn/characteristic(peb access)
|
||||
("kernel32-64", "function=0x1800017D0", capa.features.Characteristic("peb access"), True),
|
||||
("mimikatz", "function=0x4556E5", capa.features.Characteristic("peb access"), False),
|
||||
@@ -473,7 +532,10 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
|
||||
|
||||
def get_extractor(path):
|
||||
if sys.version_info >= (3, 0):
|
||||
raise RuntimeError("no supported py3 backends yet")
|
||||
if False: # TODO: How to decide which backend to use?
|
||||
extractor = get_smda_extractor(path)
|
||||
else:
|
||||
extractor = get_miasm_extractor(path)
|
||||
else:
|
||||
extractor = get_viv_extractor(path)
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import capa.features
|
||||
from capa.engine import *
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main(z9324d_extractor):
|
||||
# tests rules can be loaded successfully and all output modes
|
||||
path = z9324d_extractor.path
|
||||
@@ -29,7 +28,6 @@ def test_main(z9324d_extractor):
|
||||
assert capa.main.main([path]) == 0
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main_single_rule(z9324d_extractor, tmpdir):
|
||||
# tests a single rule can be loaded successfully
|
||||
RULE_CONTENT = textwrap.dedent(
|
||||
@@ -58,7 +56,6 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys):
|
||||
# on py2.7, need to be careful about str (which can hold bytes)
|
||||
# vs unicode (which is only unicode characters).
|
||||
@@ -71,18 +68,22 @@ def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys):
|
||||
std = capsys.readouterr()
|
||||
# but here, we have to use a unicode instance,
|
||||
# because capsys has decoded the output for us.
|
||||
assert pingtaest_extractor.path.decode("utf-8") in std.out
|
||||
if sys.version_info >= (3, 0):
|
||||
assert pingtaest_extractor.path in std.out
|
||||
else:
|
||||
assert pingtaest_extractor.path.decode("utf-8") in std.out
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main_non_ascii_filename_nonexistent(tmpdir, caplog):
|
||||
NON_ASCII_FILENAME = "täst_not_there.exe"
|
||||
assert capa.main.main(["-q", NON_ASCII_FILENAME]) == -1
|
||||
|
||||
assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text
|
||||
if sys.version_info >= (3, 0):
|
||||
assert NON_ASCII_FILENAME in caplog.text
|
||||
else:
|
||||
assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_main_shellcode(z499c2_extractor):
|
||||
path = z499c2_extractor.path
|
||||
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0
|
||||
@@ -137,7 +138,6 @@ def test_ruleset():
|
||||
assert len(rules.basic_block_rules) == 1
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_match_across_scopes_file_function(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
@@ -201,7 +201,6 @@ def test_match_across_scopes_file_function(z9324d_extractor):
|
||||
assert ".text section and install service" in capabilities
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_match_across_scopes(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
@@ -264,7 +263,6 @@ def test_match_across_scopes(z9324d_extractor):
|
||||
assert "kill thread program" in capabilities
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_subscope_bb_rules(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
@@ -289,7 +287,6 @@ def test_subscope_bb_rules(z9324d_extractor):
|
||||
assert "test rule" in capabilities
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_byte_matching(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
@@ -312,7 +309,6 @@ def test_byte_matching(z9324d_extractor):
|
||||
assert "byte match test" in capabilities
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_count_bb(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
@@ -336,7 +332,6 @@ def test_count_bb(z9324d_extractor):
|
||||
assert "count bb" in capabilities
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_fix262(pma16_01_extractor, capsys):
|
||||
# tests rules can be loaded successfully and all output modes
|
||||
path = pma16_01_extractor.path
|
||||
@@ -347,7 +342,6 @@ def test_fix262(pma16_01_extractor, capsys):
|
||||
assert "www.practicalmalwareanalysis.com" not in std.out
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
||||
def test_not_render_rules_also_matched(z9324d_extractor, capsys):
|
||||
# rules that are also matched by other rules should not get rendered by default.
|
||||
# this cuts down on the amount of output while giving approx the same detail.
|
||||
|
||||
29
tests/test_miasm_features.py
Normal file
29
tests/test_miasm_features.py
Normal file
@@ -0,0 +1,29 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from fixtures import *
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_PRESENCE_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_miasm_features(sample, scope, feature, expected):
|
||||
do_test_feature_presence(get_miasm_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_miasm_feature_counts(sample, scope, feature, expected):
|
||||
do_test_feature_count(get_miasm_extractor, sample, scope, feature, expected)
|
||||
@@ -282,7 +282,8 @@ def test_lib_rules():
|
||||
),
|
||||
]
|
||||
)
|
||||
assert len(rules.function_rules) == 1
|
||||
# lib rules are added to the rule set
|
||||
assert len(rules.function_rules) == 2
|
||||
|
||||
|
||||
def test_subscope_rules():
|
||||
|
||||
30
tests/test_smda_features.py
Normal file
30
tests/test_smda_features.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
|
||||
from fixtures import *
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_PRESENCE_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_smda_features(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info < (3, 0), reason="SMDA only works on py3"):
|
||||
do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_smda_feature_counts(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info < (3, 0), reason="SMDA only works on py3"):
|
||||
do_test_feature_count(get_smda_extractor, sample, scope, feature, expected)
|
||||
Reference in New Issue
Block a user