mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
1
.gitignore
vendored
1
.gitignore
vendored
@@ -114,3 +114,4 @@ venv.bak/
|
||||
isort-output.log
|
||||
black-output.log
|
||||
rule-linter-output.log
|
||||
.vscode
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
|
||||
### New Features
|
||||
|
||||
- show in which function a BB match is #130 @williballenthin
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
### New Rules (8)
|
||||
|
||||
@@ -134,6 +134,12 @@ def collect_metadata():
|
||||
"format": idaapi.get_file_type_name(),
|
||||
"extractor": "ida",
|
||||
"base_address": idaapi.get_imagebase(),
|
||||
"layout": {
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
},
|
||||
},
|
||||
"version": capa.version.__version__,
|
||||
}
|
||||
|
||||
@@ -751,6 +751,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
meta = capa.ida.helpers.collect_metadata()
|
||||
capabilities, counts = capa.main.find_capabilities(self.ruleset_cache, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(self.ruleset_cache, extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
46
capa/main.py
46
capa/main.py
@@ -582,10 +582,55 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
|
||||
"extractor": extractor.__class__.__name__,
|
||||
"rules": rules_path,
|
||||
"base_address": extractor.get_base_address(),
|
||||
"layout": {
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def compute_layout(rules, extractor, capabilities):
|
||||
"""
|
||||
compute a metadata structure that links basic blocks
|
||||
to the functions in which they're found.
|
||||
|
||||
only collect the basic blocks at which some rule matched.
|
||||
otherwise, we may pollute the json document with
|
||||
a large amount of un-referenced data.
|
||||
"""
|
||||
functions_by_bb = {}
|
||||
bbs_by_function = {}
|
||||
for f in extractor.get_functions():
|
||||
bbs_by_function[int(f)] = []
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
functions_by_bb[int(bb)] = int(f)
|
||||
bbs_by_function[int(f)].append(int(bb))
|
||||
|
||||
matched_bbs = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
|
||||
for (addr, match) in matches:
|
||||
assert addr in functions_by_bb
|
||||
matched_bbs.add(addr)
|
||||
|
||||
layout = {
|
||||
"functions": {
|
||||
f: {
|
||||
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
|
||||
# this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
}
|
||||
for f, bbs in bbs_by_function.items()
|
||||
}
|
||||
}
|
||||
|
||||
return layout
|
||||
|
||||
|
||||
def install_common_args(parser, wanted=None):
|
||||
"""
|
||||
register a common set of command line arguments for re-use by main & scripts.
|
||||
@@ -948,6 +993,7 @@ def main(argv=None):
|
||||
|
||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
|
||||
@@ -203,6 +203,11 @@ def render_rules(ostream, doc):
|
||||
api: kernel32.GetLastError @ 0x10004A87
|
||||
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
|
||||
"""
|
||||
functions_by_bb = {}
|
||||
for function, info in doc["meta"]["analysis"]["layout"]["functions"].items():
|
||||
for bb in info["matched_basic_blocks"]:
|
||||
functions_by_bb[bb] = function
|
||||
|
||||
had_match = False
|
||||
for rule in rutils.capability_rules(doc):
|
||||
count = len(rule["matches"])
|
||||
@@ -247,7 +252,12 @@ def render_rules(ostream, doc):
|
||||
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()):
|
||||
ostream.write(rule["meta"]["scope"])
|
||||
ostream.write(" @ ")
|
||||
ostream.writeln(rutils.hex(location))
|
||||
ostream.write(rutils.hex(location))
|
||||
|
||||
if rule["meta"]["scope"] == capa.rules.BASIC_BLOCK_SCOPE:
|
||||
ostream.write(" in function " + rutils.hex(functions_by_bb[location]))
|
||||
|
||||
ostream.write("\n")
|
||||
render_match(ostream, match, indent=1)
|
||||
ostream.write("\n")
|
||||
|
||||
|
||||
@@ -129,6 +129,7 @@ def get_capa_results(args):
|
||||
meta = capa.main.collect_metadata("", path, "", extractor)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
return {
|
||||
"path": path,
|
||||
|
||||
@@ -163,14 +163,15 @@ def render_dictionary(doc):
|
||||
|
||||
# ==== render dictionary helpers
|
||||
def capa_details(file_path, output_format="dictionary"):
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor)
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
capa_output = False
|
||||
if output_format == "dictionary":
|
||||
|
||||
@@ -87,22 +87,34 @@ def render_matches_by_function(doc):
|
||||
- send HTTP request
|
||||
- connect to HTTP server
|
||||
"""
|
||||
functions_by_bb = {}
|
||||
for function, info in doc["meta"]["analysis"]["layout"]["functions"].items():
|
||||
for bb in info["matched_basic_blocks"]:
|
||||
functions_by_bb[bb] = function
|
||||
|
||||
ostream = rutils.StringIO()
|
||||
|
||||
matches_by_function = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for va in rule["matches"].keys():
|
||||
matches_by_function[va].add(rule["meta"]["name"])
|
||||
if rule["meta"]["scope"] == capa.rules.FUNCTION_SCOPE:
|
||||
for va in rule["matches"].keys():
|
||||
matches_by_function[va].add(rule["meta"]["name"])
|
||||
elif rule["meta"]["scope"] == capa.rules.BASIC_BLOCK_SCOPE:
|
||||
for va in rule["matches"].keys():
|
||||
function = functions_by_bb[va]
|
||||
matches_by_function[function].add(rule["meta"]["name"])
|
||||
else:
|
||||
# file scope
|
||||
pass
|
||||
|
||||
for va, feature_count in sorted(doc["meta"]["analysis"]["feature_counts"]["functions"].items()):
|
||||
va = int(va)
|
||||
if not matches_by_function.get(va, {}):
|
||||
continue
|
||||
ostream.writeln("function at 0x%X with %d features: " % (va, feature_count))
|
||||
for rule_name in matches_by_function[va]:
|
||||
for rule_name in sorted(matches_by_function[va]):
|
||||
ostream.writeln(" - " + rule_name)
|
||||
|
||||
ostream.write("\n")
|
||||
return ostream.getvalue()
|
||||
|
||||
|
||||
@@ -174,6 +186,7 @@ def main(argv=None):
|
||||
meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if capa.main.has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
@@ -190,8 +203,6 @@ def main(argv=None):
|
||||
print(render_matches_by_function(doc))
|
||||
colorama.deinit()
|
||||
|
||||
logger.info("done.")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -182,6 +182,8 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "kernel32.dll_")
|
||||
elif name == "kernel32-64":
|
||||
return os.path.join(CD, "data", "kernel32-64.dll_")
|
||||
elif name == "pma01-01":
|
||||
return os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_")
|
||||
elif name == "pma12-04":
|
||||
return os.path.join(CD, "data", "Practical Malware Analysis Lab 12-04.exe_")
|
||||
elif name == "pma16-01":
|
||||
@@ -234,6 +236,8 @@ def get_sample_md5_by_name(name):
|
||||
return "56bed8249e7c2982a90e54e1e55391a2"
|
||||
elif name == "pma16-01":
|
||||
return "7faafc7e4a5c736ebfee6abbbc812d80"
|
||||
elif name == "pma01-01":
|
||||
return "290934c61de9176ad682ffdd65f0a669"
|
||||
elif name == "pma21-01":
|
||||
return "c8403fb05244e23a7931c766409b5e22"
|
||||
elif name == "al-khaser x86":
|
||||
|
||||
@@ -375,3 +375,13 @@ def test_backend_option(capsys):
|
||||
std_json = json.loads(std.out)
|
||||
assert std_json["meta"]["analysis"]["extractor"] == "SmdaFeatureExtractor"
|
||||
assert len(std_json["rules"]) > 0
|
||||
|
||||
|
||||
def test_json_meta(capsys):
|
||||
path = fixtures.get_data_path_by_name("pma01-01")
|
||||
assert capa.main.main([path, "-j"]) == 0
|
||||
std = capsys.readouterr()
|
||||
std_json = json.loads(std.out)
|
||||
# remember: json can't have integer keys :-(
|
||||
assert str(0x10001010) in std_json["meta"]["analysis"]["layout"]["functions"]
|
||||
assert 0x10001179 in std_json["meta"]["analysis"]["layout"]["functions"][str(0x10001010)]["matched_basic_blocks"]
|
||||
|
||||
Reference in New Issue
Block a user