merging upstream

This commit is contained in:
Michael Hunhoff
2020-07-06 21:07:15 -06:00
21 changed files with 540 additions and 127 deletions

View File

@@ -61,9 +61,7 @@ Alternatively, you can fetch a nightly build of a standalone binary from one of
- Linux: TODO
- OSX: TODO
## installation
See [doc/installation.md](doc/installation.md) for information on how to setup the project, including how to use it as a Python library.
To use capa as a library or integrate with another tool, see [doc/installation.md](doc/installation.md) for further setup instructions.
For more information about how to use capa, including running it as an IDA script/plugin see [doc/usage.md](doc/usage.md).
@@ -142,3 +140,9 @@ Here's an example rule used by capa:
The [github.com/fireeye/capa-rules](https://github.com/fireeye/capa-rules) repository contains hundreds of standard library rules that are distributed with capa.
Please learn to write rules and contribute new entries as you find interesting techniques in malware.
# further information
- [doc/usage.md](doc/usage.md)
- [doc/installation.md](doc/installation.md)
- [github.com/fireeye/capa-rules](https://github.com/fireeye/capa-rules)
- [doc/rule format.md](https://github.com/fireeye/capa-rules/blob/master/doc/format.md)

View File

@@ -1,17 +1,5 @@
import abc
try:
import ida
except (ImportError, SyntaxError):
pass
try:
import viv
except (ImportError, SyntaxError):
pass
__all__ = ["ida", "viv"]
class FeatureExtractor(object):
"""
@@ -193,6 +181,7 @@ class NullFeatureExtractor(FeatureExtractor):
example::
extractor = NullFeatureExtractor({
'base address: 0x401000,
'file features': [
(0x402345, capa.features.Characteristic('embedded pe')),
],
@@ -227,6 +216,9 @@ class NullFeatureExtractor(FeatureExtractor):
super(NullFeatureExtractor, self).__init__()
self.features = features
def get_base_address(self):
return self.features["base address"]
def extract_file_features(self):
for p in self.features.get("file features", []):
va, feature = p

View File

@@ -5,7 +5,6 @@ import idaapi
import capa.features.extractors.ida.file
import capa.features.extractors.ida.insn
import capa.features.extractors.ida.helpers
import capa.features.extractors.ida.function
import capa.features.extractors.ida.basicblock
@@ -47,8 +46,9 @@ class IdaFeatureExtractor(FeatureExtractor):
yield feature, ea
def get_functions(self):
import capa.features.extractors.ida.helpers as ida_helpers
# ignore library functions and thunk functions as identified by IDA
for f in capa.features.extractors.ida.helpers.get_functions(skip_thunks=True, skip_libs=True):
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
yield add_ea_int_cast(f)
def extract_function_features(self, f):
@@ -64,7 +64,8 @@ class IdaFeatureExtractor(FeatureExtractor):
yield feature, ea
def get_instructions(self, f, bb):
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
import capa.features.extractors.ida.helpers as ida_helpers
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
yield add_ea_int_cast(insn)
def extract_insn_features(self, f, bb, insn):

View File

@@ -8,6 +8,11 @@ from capa.features.insn import Number, Offset, Mnemonic
from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def interface_extract_instruction_XXX(f, bb, insn):
"""
parse features from the given instruction.
@@ -257,11 +262,12 @@ def is_security_cookie(f, bb, insn):
# expect security cookie init in first basic block within first bytes (instructions)
bb0 = f.basic_blocks[0]
if bb == bb0 and insn.va < bb.va + 30:
if bb == bb0 and insn.va < (bb.va + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
elif bb.instructions[-1].isReturn() and insn.va > bb.va + bb.size - 30:
elif bb.instructions[-1].isReturn() and insn.va > (bb.va + bb.size - SECURITY_COOKIE_BYTES_DELTA):
return True
return False

View File

@@ -534,6 +534,9 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
parent, display, source=doc["rules"].get(feature[feature["type"]], {}).get("source", "")
)
if feature["type"] == "basicblock":
return CapaExplorerBlockItem(parent, location)
if feature["type"] in ("bytes", "api", "mnemonic", "number", "offset"):
# display instruction preview
return CapaExplorerInstructionViewItem(parent, display, location)

View File

@@ -345,7 +345,13 @@ class CapaExplorerForm(idaapi.PluginForm):
rules_path = os.path.join(os.path.dirname(self.file_loc), "../..", "rules")
rules = capa.main.get_rules(rules_path)
rules = capa.rules.RuleSet(rules)
capabilities = capa.main.find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor(), True)
meta = capa.ida.helpers.collect_metadata()
capabilities, counts = capa.main.find_capabilities(
rules, capa.features.extractors.ida.IdaFeatureExtractor(), True
)
meta["analysis"].update(counts)
# support binary files specifically for x86/AMD64 shellcode
# warn user binary file is loaded but still allow capa to process it
@@ -370,7 +376,6 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("analysis completed.")
meta = capa.ida.helpers.collect_metadata()
doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
self.model_data.render_capa_doc(doc)

View File

@@ -68,7 +68,7 @@ def find_function_capabilities(ruleset, extractor, f):
function_features[capa.features.MatchedRule(rule_name)].add(va)
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, oint(f))
return function_matches, bb_matches
return function_matches, bb_matches, len(function_features)
def find_file_capabilities(ruleset, extractor, function_features):
@@ -84,40 +84,46 @@ def find_file_capabilities(ruleset, extractor, function_features):
if feature not in file_features:
file_features[feature] = set()
logger.info("analyzed file and extracted %d features", len(file_features))
logger.debug("analyzed file and extracted %d features", len(file_features))
file_features.update(function_features)
_, matches = capa.engine.match(ruleset.file_rules, file_features, 0x0)
return matches
return matches, len(file_features)
def find_capabilities(ruleset, extractor, disable_progress=None):
all_function_matches = collections.defaultdict(list)
all_bb_matches = collections.defaultdict(list)
meta = {"feature_counts": {"file": 0, "functions": {},}}
for f in tqdm.tqdm(extractor.get_functions(), disable=disable_progress, unit=" functions"):
function_matches, bb_matches = find_function_capabilities(ruleset, extractor, f)
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][f.__int__()] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", f.__int__(), feature_count)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
# mapping from matched rule feature to set of addresses at which it matched.
# type: Dict[MatchedRule, Set[int]]
# schema: Dic[MatchedRule: Set[int]
function_features = {
capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results))
for rule_name, results in all_function_matches.items()
}
all_file_matches = find_file_capabilities(ruleset, extractor, function_features)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_features)
meta["feature_counts"]["file"] = feature_count
matches = {}
matches.update(all_bb_matches)
matches.update(all_function_matches)
matches.update(all_file_matches)
return matches
return matches, meta
def has_rule_with_namespace(rules, capabilities, rule_cat):
@@ -193,6 +199,9 @@ def is_supported_file_type(sample):
return False
SHELLCODE_BASE = 0x690000
def get_shellcode_vw(sample, arch="auto"):
"""
Return shellcode workspace using explicit arch or via auto detect
@@ -205,13 +214,12 @@ def get_shellcode_vw(sample, arch="auto"):
# choose arch with most functions, idea by Jay G.
vw_cands = []
for arch in ["i386", "amd64"]:
vw_cands.append(viv_utils.getShellcodeWorkspace(sample_bytes, arch))
vw_cands.append(viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE))
if not vw_cands:
raise ValueError("could not generate vivisect workspace")
vw = max(vw_cands, key=lambda vw: len(vw.getFunctions()))
else:
vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch)
vw.setMeta("Format", "blob") # TODO fix in viv_utils
vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE)
return vw
@@ -299,7 +307,10 @@ def get_rules(rule_path):
for root, dirs, files in os.walk(rule_path):
for file in files:
if not file.endswith(".yml"):
logger.warning("skipping non-.yml file: %s", file)
if not (file.endswith(".md") or file.endswith(".git")):
# expect to see readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_path = os.path.join(root, file)
@@ -483,7 +494,8 @@ def main(argv=None):
meta = collect_metadata(argv, args.sample, format, extractor)
capabilities = find_capabilities(rules, extractor)
capabilities, counts = find_capabilities(rules, extractor)
meta["analysis"].update(counts)
if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
@@ -540,12 +552,14 @@ def ida_main():
rules = get_rules(rules_path)
rules = capa.rules.RuleSet(rules)
capabilities = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor())
meta = capa.ida.helpers.collect_metadata()
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor())
meta["analysis"].update(counts)
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
meta = capa.ida.helpers.collect_metadata()
print(capa.render.render_default(meta, rules, capabilities))

View File

@@ -17,13 +17,10 @@ def width(s, character_count):
def render_meta(doc, ostream):
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((width(k, 22), width(doc["meta"][k], 60)))
for k in ("path", "md5"):
rows.append((k, doc["meta"]["sample"][k]))
rows = [
(width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)),
("path", doc["meta"]["sample"]["path"]),
]
ostream.write(tabulate.tabulate(rows, tablefmt="psql"))
ostream.write("\n")
@@ -50,7 +47,7 @@ def render_capabilities(doc, ostream):
capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count)
rows.append((capability, rule["meta"]["namespace"]))
ostream.write(tabulate.tabulate(rows, headers=[width("CAPABILITY", 40), width("NAMESPACE", 40)], tablefmt="psql"))
ostream.write(tabulate.tabulate(rows, headers=[width("CAPABILITY", 50), width("NAMESPACE", 50)], tablefmt="psql"))
ostream.write("\n")
@@ -99,7 +96,7 @@ def render_attack(doc, ostream):
raise RuntimeError("unexpected ATT&CK spec format")
rows.append((rutils.bold(tactic.upper()), "\n".join(inner_rows),))
ostream.write(
tabulate.tabulate(rows, headers=[width("ATT&CK Tactic", 20), width("ATT&CK Technique", 60)], tablefmt="psql")
tabulate.tabulate(rows, headers=[width("ATT&CK Tactic", 20), width("ATT&CK Technique", 80)], tablefmt="psql")
)
ostream.write("\n")

View File

@@ -20,24 +20,53 @@ import capa.rules
import capa.render.utils as rutils
def render_verbose(doc):
ostream = rutils.StringIO()
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((k, doc["meta"][k]))
for k in ("path", "md5", "sha1", "sha256"):
rows.append((k, doc["meta"]["sample"][k]))
for k in ("format", "extractor"):
rows.append((k.replace("_", " "), doc["meta"]["analysis"][k]))
rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"])))
def render_meta(ostream, doc):
"""
like:
md5 84882c9d43e23d63b82004fae74ebb61
sha1 c6fb3b50d946bec6f391aefa4e54478cf8607211
sha256 5eced7367ed63354b4ed5c556e2363514293f614c2c2eb187273381b2ef5f0f9
path /tmp/suspicious.dll_
timestamp 2020-07-03T10:17:05.796933
capa version 0.0.0
format auto
extractor VivisectFeatureExtractor
base address 0x10000000
function count 42
total feature count 1918
"""
rows = [
("md5", doc["meta"]["sample"]["md5"]),
("sha1", doc["meta"]["sample"]["sha1"]),
("sha256", doc["meta"]["sample"]["sha256"]),
("path", doc["meta"]["sample"]["path"]),
("timestamp", doc["meta"]["timestamp"]),
("capa version", doc["meta"]["version"]),
("format", doc["meta"]["analysis"]["format"]),
("extractor", doc["meta"]["analysis"]["extractor"]),
("base address", hex(doc["meta"]["analysis"]["base_address"])),
("function count", len(doc["meta"]["analysis"]["feature_counts"]["functions"])),
(
"total feature count",
doc["meta"]["analysis"]["feature_counts"]["file"]
+ sum(doc["meta"]["analysis"]["feature_counts"]["functions"].values()),
),
]
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
def render_rules(ostream, doc):
"""
like:
receive data (2 matches)
namespace communication
description all known techniques for receiving data from a potential C2 server
scope function
matches 0x10003A13
0x10003797
"""
for rule in rutils.capability_rules(doc):
count = len(rule["matches"])
if count == 1:
@@ -64,4 +93,14 @@ def render_verbose(doc):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
def render_verbose(doc):
ostream = rutils.StringIO()
render_meta(ostream, doc)
ostream.write("\n")
render_rules(ostream, doc)
ostream.write("\n")
return ostream.getvalue()

View File

@@ -1,7 +1,10 @@
import collections
import tabulate
import capa.rules
import capa.render.utils as rutils
import capa.render.verbose
def render_locations(ostream, match):
@@ -138,25 +141,23 @@ def render_match(ostream, match, indent=0, mode=MODE_SUCCESS):
render_match(ostream, child, indent=indent + 1, mode=child_mode)
def render_vverbose(doc):
ostream = rutils.StringIO()
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((k, doc["meta"][k]))
for k in ("path", "md5", "sha1", "sha256"):
rows.append((k, doc["meta"]["sample"][k]))
for k in ("format", "extractor"):
rows.append((k.replace("_", " "), doc["meta"]["analysis"][k]))
rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"])))
ostream.writeln(rutils.bold("Capa Report for " + doc["meta"]["sample"]["md5"]))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
def render_rules(ostream, doc):
"""
like:
## rules
check for OutputDebugString error
namespace anti-analysis/anti-debugging/debugger-detection
author michael.hunhoff@fireeye.com
scope function
mbc Anti-Behavioral Analysis::Detect Debugger::OutputDebugString
examples Practical Malware Analysis Lab 16-02.exe_:0x401020
function @ 0x10004706
and:
api: kernel32.SetLastError @ 0x100047C2
api: kernel32.GetLastError @ 0x10004A87
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
"""
for rule in rutils.capability_rules(doc):
count = len(rule["matches"])
if count == 1:
@@ -195,7 +196,16 @@ def render_vverbose(doc):
ostream.write(" @ ")
ostream.writeln(rutils.hex(location))
render_match(ostream, match, indent=1)
ostream.write("\n")
def render_vverbose(doc):
ostream = rutils.StringIO()
capa.render.verbose.render_meta(ostream, doc)
ostream.write("\n")
render_rules(ostream, doc)
ostream.write("\n")
return ostream.getvalue()

View File

@@ -58,24 +58,11 @@ SUPPORTED_FEATURES = {
capa.features.String,
},
FUNCTION_SCOPE: {
capa.features.MatchedRule,
capa.features.insn.API,
capa.features.insn.Number,
capa.features.String,
capa.features.Bytes,
capa.features.insn.Offset,
capa.features.insn.Mnemonic,
# plus basic block scope features, see below
capa.features.basicblock.BasicBlock,
capa.features.Characteristic("switch"),
capa.features.Characteristic("nzxor"),
capa.features.Characteristic("peb access"),
capa.features.Characteristic("fs access"),
capa.features.Characteristic("gs access"),
capa.features.Characteristic("cross section flow"),
capa.features.Characteristic("stack string"),
capa.features.Characteristic("calls from"),
capa.features.Characteristic("calls to"),
capa.features.Characteristic("indirect call"),
capa.features.Characteristic("loop"),
capa.features.Characteristic("recursive call"),
},
@@ -98,6 +85,9 @@ SUPPORTED_FEATURES = {
},
}
# all basic block scope features are also function scope features
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE])
class InvalidRule(ValueError):
def __init__(self, msg):

View File

@@ -44,6 +44,18 @@ Finally, use `pip` to install the source code in "editable" mode. This means tha
You'll find that the `capa.exe` (Windows) or `capa` (Linux) executables in your path now invoke the capa binary from this directory.
We use the following tools to ensure consistent code style and formatting:
- [black](https://github.com/psf/black) code formatter, with `-l 120`
- [isort](https://pypi.org/project/isort/) code formatter, with `--length-sort --line-width 120`
- [dos2unix](https://linux.die.net/man/1/dos2unix) for UNIX-style LF newlines
- [capafmt](https://github.com/fireeye/capa/blob/master/scripts/capafmt.py) rule formatter
To install these development dependencies, run:
`$ pip install -e ./local/path/to/src[dev]`
Note that some development dependencies (including the black code formatter) require Python3.
### 4. Setup hooks [optional]
If you plan to contribute to capa, you may want to setup the hooks.

View File

@@ -1,35 +1,65 @@
# capa usage
# command line
After you have downloaded the standalone version of capa or installed it via `pip` (see the [installation](installation.md) documentation) you can run capa directly from your terminal shell.
- `$ capa -h`
- `$ capa malware.exe`
In this mode capa relies on vivisect which only runs under Python 2.
## only run selected rules
Use the `-t` option to only run selected rules. This is the preferred method over specifying a rule path which fails if dependent rules reside in other directories.
```
$ capa -t communication malware.exe
usage: capa [-h] [-r RULES] [-t TAG] [--version] [-j] [-v] [-vv] [-d] [-q]
[-f {auto,pe,sc32,sc64,freeze}]
sample
detect capabilities in programs.
positional arguments:
sample Path to sample to analyze
optional arguments:
-h, --help show this help message and exit
-r RULES, --rules RULES
Path to rule file or directory, use embedded rules by
default
-t TAG, --tag TAG Filter on rule meta field values
--version Print the executable version and exit
-j, --json Emit JSON instead of text
-v, --verbose Enable verbose result document (no effect with --json)
-vv, --vverbose Enable very verbose result document (no effect with
--json)
-d, --debug Enable debugging output on STDERR
-q, --quiet Disable all output but errors
-f {auto,pe,sc32,sc64,freeze}, --format {auto,pe,sc32,sc64,freeze}
Select sample format, auto: (default) detect file type
automatically, pe: Windows PE file, sc32: 32-bit
shellcode, sc64: 64-bit shellcode, freeze: features
previously frozen by capa
```
# IDA Pro
capa runs from within IDA Pro. Run `capa/main.py` via File - Script file... (ALT + F7).
## tips and tricks
When running in IDA, capa uses IDA's disassembly and file analysis as its backend. These results may vary from the standalone version that uses vivisect.
- [match only rules by given author or namespace](#only-run-selected-rules)
- [IDA Pro capa explorer](#capa-explorer)
- [IDA Pro rule generator](#rule-generator)
In IDA, capa supports Python 2 and Python 3. If you encounter issues with your specific setup, please open a new [Issue](https://github.com/fireeye/capa/issues).
### only run selected rules
Use the `-t` option to run rules with the given metadata value (see therule fields `rule.meta.*`).
For example, `capa -t william.ballenthin@mandiant.com` runs rules that reference Willi's email address (probably as the author), or
`capa -t communication` runs rules with the namespace `communication`.
## IDA Pro plugins
capa comes with two IDA Pro plugins located in the `capa/ida` directory.
### IDA Pro integrations
You can run capa from within IDA Pro. Run `capa/main.py` via `File - Script file...` (or ALT + F7).
When running in IDA, capa uses IDA's disassembly and file analysis as its backend.
These results may vary from the standalone version that uses vivisect.
IDA's analysis is generally a bit faster and more thorough than vivisect's, so you might prefer this mode.
### capa explorer
When run under IDA, capa supports both Python 2 and Python 3 interpreters.
If you encounter issues with your specific setup, please open a new [Issue](https://github.com/fireeye/capa/issues).
Additionally, capa comes with two IDA Pro plugins located in the `capa/ida` directory: the explorer and the rule generator.
#### capa explorer
The capa explorer allows you to interactively display and browse capabilities capa identified in a binary.
As you select rules or logic, capa will highlight the addresses that support its analysis conclusions.
We like to use capa to help find the most interesting parts of a program, such as where the C2 mechanism might be.
![capa explorer](capa_explorer.png)
### rule generator
#### rule generator
The rule generator helps you to easily write new rules based on the function you are currently analyzing in your IDA disassembly view.
It shows the features that capa can extract from the function, and lets you quickly pull these into a rule template.
You'll still have to provide the logic structures (`and`, `or`, `not`, etc.) but the features will be prepared for you.

2
rules

Submodule rules updated: 799b7bbf4b...a8621978cf

View File

@@ -89,7 +89,7 @@ class NamespaceDoesntMatchRulePath(Lint):
if "lib" in rule.meta:
return False
return rule.meta["namespace"] not in posixpath.normpath(rule.meta["capa/path"])
return rule.meta["namespace"] not in get_normpath(rule.meta["capa/path"])
class MissingScope(Lint):
@@ -180,7 +180,7 @@ class DoesntMatchExample(Lint):
try:
extractor = capa.main.get_extractor(path, "auto")
capabilities = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
except Exception as e:
logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e)
return True
@@ -216,7 +216,7 @@ class LibRuleNotInLibDirectory(Lint):
if "lib" not in rule.meta:
return False
return "/lib/" not in posixpath.normpath(rule.meta["capa/path"])
return "/lib/" not in get_normpath(rule.meta["capa/path"])
class LibRuleHasNamespace(Lint):
@@ -314,6 +314,10 @@ FEATURE_LINTS = (
)
def get_normpath(path):
return posixpath.normpath(path).replace(os.sep, "/")
def lint_features(ctx, rule):
features = get_features(ctx, rule)
return run_feature_lints(FEATURE_LINTS, ctx, features)

View File

@@ -0,0 +1,242 @@
#!/usr/bin/env python2
"""
show-capabilities-by-function
Invoke capa to extract the capabilities of the given sample
and emit the results grouped by function.
This is useful to identify "complex functions" - that is,
functions that implement a lot of different types of logic.
Example::
$ python scripts/show-capabilities-by-function.py /tmp/suspicious.dll_
function at 0x1000321A with 33 features:
- get hostname
- initialize Winsock library
function at 0x10003286 with 63 features:
- create thread
- terminate thread
function at 0x10003415 with 116 features:
- write file
- send data
- link function at runtime
- create HTTP request
- get common file path
- send HTTP request
- connect to HTTP server
function at 0x10003797 with 81 features:
- get socket status
- send data
- receive data
- create TCP socket
- send data on socket
- receive data on socket
- act as TCP client
- resolve DNS
- create UDP socket
- initialize Winsock library
- set socket configuration
- connect TCP socket
...
"""
import os
import sys
import logging
import collections
import argparse
import colorama
import capa.main
import capa.rules
import capa.engine
import capa.render
import capa.features
import capa.render.utils as rutils
import capa.features.freeze
import capa.features.extractors.viv
logger = logging.getLogger("capa.show-capabilities-by-function")
def render_matches_by_function(doc):
"""
like:
function at 0x1000321a with 33 features:
- get hostname
- initialize Winsock library
function at 0x10003286 with 63 features:
- create thread
- terminate thread
function at 0x10003415 with 116 features:
- write file
- send data
- link function at runtime
- create HTTP request
- get common file path
- send HTTP request
- connect to HTTP server
"""
ostream = rutils.StringIO()
matches_by_function = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
for va in rule["matches"].keys():
matches_by_function[va].add(rule["meta"]["name"])
for va, feature_count in sorted(doc["meta"]["analysis"]["feature_counts"]["functions"].items()):
va = int(va)
if not matches_by_function.get(va, {}):
continue
ostream.writeln("function at 0x%X with %d features: " % (va, feature_count))
for rule_name in matches_by_function[va]:
ostream.writeln(" - " + rule_name)
ostream.write("\n")
return ostream.getvalue()
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
formats = [
("auto", "(default) detect file type automatically"),
("pe", "Windows PE file"),
("sc32", "32-bit shellcode"),
("sc64", "64-bit shellcode"),
("freeze", "features previously frozen by capa"),
]
format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
parser.add_argument("sample", type=str, help="Path to sample to analyze")
parser.add_argument(
"-r",
"--rules",
type=str,
default="(embedded rules)",
help="Path to rule file or directory, use embedded rules by default",
)
parser.add_argument("-t", "--tag", type=str, help="Filter on rule meta field values")
parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
parser.add_argument(
"-f",
"--format",
choices=[f[0] for f in formats],
default="auto",
help="Select sample format, %s" % format_help,
)
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# disable vivisect-related logging, it's verbose and not relevant for capa users
capa.main.set_vivisect_log_level(logging.CRITICAL)
# py2 doesn't know about cp65001, which is a variant of utf-8 on windows
# tqdm bails when trying to render the progress bar in this setup.
# because cp65001 is utf-8, we just map that codepage to the utf-8 codec.
# see #380 and: https://stackoverflow.com/a/3259271/87207
import codecs
codecs.register(lambda name: codecs.lookup("utf-8") if name == "cp65001" else None)
if args.rules == "(embedded rules)":
logger.info("-" * 80)
logger.info(" Using default embedded rules.")
logger.info(" To provide your own rules, use the form `capa.exe ./path/to/rules/ /path/to/mal.exe`.")
logger.info(" You can see the current default rule set here:")
logger.info(" https://github.com/fireeye/capa-rules")
logger.info("-" * 80)
logger.debug("detected running from source")
args.rules = os.path.join(os.path.dirname(__file__), "..", "rules")
logger.debug("default rule path (source method): %s", args.rules)
else:
logger.info("using rules path: %s", args.rules)
try:
rules = capa.main.get_rules(args.rules)
rules = capa.rules.RuleSet(rules)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.info("selected %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
with open(args.sample, "rb") as f:
taste = f.read(8)
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
format = "freeze"
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
format = args.format
try:
extractor = capa.main.get_extractor(args.sample, args.format)
except capa.main.UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing PE files (or shellcode, when using --format sc32|sc64)."
)
logger.error(
" If you don't know the input file type, you can try using the `file` utility to guess it."
)
logger.error("-" * 80)
return -1
except capa.main.UnsupportedRuntimeError:
logger.error("-" * 80)
logger.error(" Unsupported runtime or Python interpreter.")
logger.error(" ")
logger.error(" capa supports running under Python 2.7 using Vivisect for binary analysis.")
logger.error(" It can also run within IDA Pro, using either Python 2.7 or 3.5+.")
logger.error(" ")
logger.error(
" If you're seeing this message on the command line, please ensure you're running Python 2.7."
)
logger.error("-" * 80)
return -1
meta = capa.main.collect_metadata(argv, args.sample, format, extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor)
meta["analysis"].update(counts)
if capa.main.has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return -1
# colorama will detect:
# - when on Windows console, and fixup coloring, and
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
print(render_matches_by_function(doc))
colorama.deinit()
logger.info("done.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,6 +1,60 @@
#!/usr/bin/env python2
"""
show the features extracted by capa.
show-features
Show the features that capa extracts from the given sample,
to assist with the development of rules.
If you have a function with a capability that you'd like to detect,
you can run this tool and grep for the function/basic block/instruction addresses
to see what capa picks up.
This way, you can verify that capa successfully notices the features you'd reference.
Example::
$ python scripts/show-features.py /tmp/suspicious.dll_
...
file: 0x10004e4d: export(__entry)
file: 0x10004706: export(Install)
file: 0x10004c2b: export(uninstallA)
file: 0x10005034: import(kernel32.GetStartupInfoA)
file: 0x10005034: import(GetStartupInfoA)
file: 0x10005048: import(kernel32.SetLastError)
file: 0x00004e10: string(Y29ubmVjdA==)
file: 0x00004e28: string(practicalmalwareanalysis.com)
file: 0x00004e68: string(serve.html)
file: 0x00004eb8: string(dW5zdXBwb3J0)
file: 0x00004ec8: string(c2xlZXA=)
func: 0x100012c2: characteristic(calls to)
func: 0x10001000: characteristic(loop)
bb : 0x10001000: basic block
insn: 0x10001000: mnemonic(push)
insn: 0x10001001: mnemonic(push)
insn: 0x10001002: mnemonic(push)
insn: 0x10001003: mnemonic(push)
insn: 0x10001004: mnemonic(push)
insn: 0x10001005: mnemonic(push)
insn: 0x10001006: mnemonic(xor)
insn: 0x10001008: number(0x1)
insn: 0x10001008: mnemonic(mov)
bb : 0x1000100a: basic block
bb : 0x1000100a: characteristic(tight loop)
insn: 0x1000100a: mnemonic(movzx)
insn: 0x1000100d: mnemonic(mov)
insn: 0x1000100f: offset(0x1000A7C8)
insn: 0x1000100f: mnemonic(mov)
insn: 0x10001015: offset(0x100075C8)
insn: 0x10001015: mnemonic(mov)
insn: 0x1000101b: mnemonic(mov)
insn: 0x1000101d: number(0x80)
insn: 0x1000101d: mnemonic(and)
insn: 0x10001020: mnemonic(neg)
insn: 0x10001022: mnemonic(sbb)
insn: 0x10001024: number(0x1B)
insn: 0x10001024: mnemonic(and)
insn: 0x10001027: number(0x1)
insn: 0x10001027: mnemonic(shl)
...
"""
import sys
import logging

View File

@@ -93,7 +93,8 @@ def get_capabilities(path, rules):
logger.debug("matching rules in %s", path)
with open(path, "rb") as f:
extractor = capa.features.freeze.load(f.read())
return capa.main.find_capabilities(rules, extractor, disable_progress=True)
capabilities, meta = capa.main.find_capabilities(rules, extractor, disable_progress=True)
return capabilities
def get_function_hits(capabilities, rule_name):

View File

@@ -41,7 +41,15 @@ setuptools.setup(
include_package_data=True,
install_requires=requirements,
extras_require={
"dev": ["pytest", "pytest-sugar", "pytest-instafail", "pytest-cov", "pycodestyle", "black", "isort"]
"dev": [
"pytest",
"pytest-sugar",
"pytest-instafail",
"pytest-cov",
"pycodestyle",
"black ; python_version>'3.0'",
"isort",
]
},
zip_safe=False,
keywords="capa",

View File

@@ -10,6 +10,7 @@ from fixtures import *
EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
{
"base address": 0x401000,
"file features": [(0x402345, capa.features.Characteristic("embedded pe")),],
"functions": {
0x401000: {
@@ -58,7 +59,7 @@ def test_null_feature_extractor():
),
]
)
capabilities = capa.main.find_capabilities(rules, EXTRACTOR)
capabilities, meta = capa.main.find_capabilities(rules, EXTRACTOR)
assert "xor loop" in capabilities

View File

@@ -147,7 +147,7 @@ def test_match_across_scopes_file_function(sample_9324d1a8ae37a36ae560c37448c970
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(
sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path,
)
capabilities = capa.main.find_capabilities(rules, extractor)
capabilities, meta = capa.main.find_capabilities(rules, extractor)
assert "install service" in capabilities
assert ".text section" in capabilities
assert ".text section and install service" in capabilities
@@ -212,7 +212,7 @@ def test_match_across_scopes(sample_9324d1a8ae37a36ae560c37448c9705a):
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(
sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path
)
capabilities = capa.main.find_capabilities(rules, extractor)
capabilities, meta = capa.main.find_capabilities(rules, extractor)
assert "tight loop" in capabilities
assert "kill thread loop" in capabilities
assert "kill thread program" in capabilities
@@ -241,7 +241,7 @@ def test_subscope_bb_rules(sample_9324d1a8ae37a36ae560c37448c9705a):
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(
sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path,
)
capabilities = capa.main.find_capabilities(rules, extractor)
capabilities, meta = capa.main.find_capabilities(rules, extractor)
assert "test rule" in capabilities
@@ -267,7 +267,7 @@ def test_byte_matching(sample_9324d1a8ae37a36ae560c37448c9705a):
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(
sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path,
)
capabilities = capa.main.find_capabilities(rules, extractor)
capabilities, meta = capa.main.find_capabilities(rules, extractor)
assert "byte match test" in capabilities
@@ -294,5 +294,5 @@ def test_count_bb(sample_9324d1a8ae37a36ae560c37448c9705a):
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(
sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path,
)
capabilities = capa.main.find_capabilities(rules, extractor)
capabilities, meta = capa.main.find_capabilities(rules, extractor)
assert "count bb" in capabilities