mirror of
https://github.com/mandiant/capa.git
synced 2026-01-07 19:01:04 -08:00
@@ -1,8 +1,10 @@
|
||||
import os
|
||||
import logging
|
||||
import datetime
|
||||
import collections
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
from PyQt5 import QtGui, QtCore, QtWidgets
|
||||
|
||||
import capa.main
|
||||
@@ -362,12 +364,22 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
logger.info("analysis completed.")
|
||||
|
||||
doc = capa.render.convert_capabilities_to_result_document(rules, capabilities)
|
||||
meta = {
|
||||
"timestamp": datetime.datetime.now().isoformat(),
|
||||
# "argv" is not relevant here
|
||||
"sample": {
|
||||
"md5": idautils.GetInputFileMD5(),
|
||||
# "sha1" not easily accessible
|
||||
# "sha256" not easily accessible
|
||||
"path": idaapi.get_input_file_path(),
|
||||
},
|
||||
"analysis": {
|
||||
# "format" is difficult to determine via IDAPython
|
||||
"extractor": "ida",
|
||||
}
|
||||
}
|
||||
|
||||
import json
|
||||
|
||||
with open("C:\\Users\\spring\\Desktop\\hmm.json", "w") as twitter_data_file:
|
||||
json.dump(doc, twitter_data_file, indent=4, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder)
|
||||
doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
|
||||
self.model_data.render_capa_doc(doc)
|
||||
self.render_capa_doc_summary(doc)
|
||||
|
||||
42
capa/main.py
42
capa/main.py
@@ -4,8 +4,10 @@ capa - detect capabilities in programs.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import hashlib
|
||||
import logging
|
||||
import os.path
|
||||
import datetime
|
||||
import collections
|
||||
|
||||
import tqdm
|
||||
@@ -320,6 +322,34 @@ def get_rules(rule_path):
|
||||
return rules
|
||||
|
||||
|
||||
def collect_metadata(argv, path, format, extractor):
|
||||
md5 = hashlib.md5()
|
||||
sha1 = hashlib.sha1()
|
||||
sha256 = hashlib.sha256()
|
||||
|
||||
with open(path, 'rb') as f:
|
||||
buf = f.read()
|
||||
|
||||
md5.update(buf)
|
||||
sha1.update(buf)
|
||||
sha256.update(buf)
|
||||
|
||||
return {
|
||||
"timestamp": datetime.datetime.now().isoformat(),
|
||||
"argv": argv,
|
||||
"sample": {
|
||||
"md5": md5.hexdigest(),
|
||||
"sha1": sha1.hexdigest(),
|
||||
"sha256": sha256.hexdigest(),
|
||||
"path": os.path.normpath(path),
|
||||
},
|
||||
"analysis": {
|
||||
"format": format,
|
||||
"extractor": extractor.__class__.__name__,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
@@ -420,9 +450,11 @@ def main(argv=None):
|
||||
taste = f.read(8)
|
||||
|
||||
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
|
||||
format = "freeze"
|
||||
with open(args.sample, "rb") as f:
|
||||
extractor = capa.features.freeze.load(f.read())
|
||||
else:
|
||||
format = args.format
|
||||
try:
|
||||
extractor = get_extractor(args.sample, args.format)
|
||||
except UnsupportedFormatError:
|
||||
@@ -446,6 +478,8 @@ def main(argv=None):
|
||||
logger.error("-" * 80)
|
||||
return -1
|
||||
|
||||
meta = collect_metadata(argv, args.sample, format, extractor)
|
||||
|
||||
capabilities = find_capabilities(rules, extractor)
|
||||
|
||||
if has_file_limitation(rules, capabilities):
|
||||
@@ -460,13 +494,13 @@ def main(argv=None):
|
||||
# renderers should use coloring and assume it will be stripped out if necessary.
|
||||
colorama.init()
|
||||
if args.json:
|
||||
print(capa.render.render_json(rules, capabilities))
|
||||
print(capa.render.render_json(meta, rules, capabilities))
|
||||
elif args.vverbose:
|
||||
print(capa.render.render_vverbose(rules, capabilities))
|
||||
print(capa.render.render_vverbose(meta, rules, capabilities))
|
||||
elif args.verbose:
|
||||
print(capa.render.render_verbose(rules, capabilities))
|
||||
print(capa.render.render_verbose(meta, rules, capabilities))
|
||||
else:
|
||||
print(capa.render.render_default(rules, capabilities))
|
||||
print(capa.render.render_default(meta, rules, capabilities))
|
||||
colorama.deinit()
|
||||
|
||||
logger.info("done.")
|
||||
|
||||
@@ -175,7 +175,7 @@ def convert_match_to_result_document(rules, capabilities, result):
|
||||
return doc
|
||||
|
||||
|
||||
def convert_capabilities_to_result_document(rules, capabilities):
|
||||
def convert_capabilities_to_result_document(meta, rules, capabilities):
|
||||
"""
|
||||
convert the given rule set and capabilities result to a common, Python-native data structure.
|
||||
this format can be directly emitted to JSON, or passed to the other `render_*` routines
|
||||
@@ -187,22 +187,29 @@ def convert_capabilities_to_result_document(rules, capabilities):
|
||||
|
||||
```json
|
||||
{
|
||||
$rule-name: {
|
||||
"meta": {...copied from rule.meta...},
|
||||
"matches: {
|
||||
$address: {...match details...},
|
||||
...
|
||||
}
|
||||
},
|
||||
...
|
||||
"meta": {...},
|
||||
"rules: {
|
||||
$rule-name: {
|
||||
"meta": {...copied from rule.meta...},
|
||||
"matches: {
|
||||
$address: {...match details...},
|
||||
...
|
||||
}
|
||||
},
|
||||
...
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Args:
|
||||
meta (Dict[str, Any]):
|
||||
rules (RuleSet):
|
||||
capabilities (Dict[str, List[Tuple[int, Result]]]):
|
||||
"""
|
||||
doc = {}
|
||||
doc = {
|
||||
"meta": meta,
|
||||
"rules": {},
|
||||
}
|
||||
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
@@ -210,7 +217,7 @@ def convert_capabilities_to_result_document(rules, capabilities):
|
||||
if rule.meta.get("capa/subscope-rule"):
|
||||
continue
|
||||
|
||||
doc[rule_name] = {
|
||||
doc["rules"][rule_name] = {
|
||||
"meta": dict(rule.meta),
|
||||
"source": rule.definition,
|
||||
"matches": {
|
||||
@@ -221,35 +228,36 @@ def convert_capabilities_to_result_document(rules, capabilities):
|
||||
return doc
|
||||
|
||||
|
||||
def render_vverbose(rules, capabilities):
|
||||
def render_vverbose(meta, rules, capabilities):
|
||||
# there's an import loop here
|
||||
# if capa.render imports capa.render.vverbose
|
||||
# and capa.render.vverbose import capa.render (implicitly, as a submodule)
|
||||
# so, defer the import until routine is called, breaking the import loop.
|
||||
import capa.render.vverbose
|
||||
|
||||
doc = convert_capabilities_to_result_document(rules, capabilities)
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return capa.render.vverbose.render_vverbose(doc)
|
||||
|
||||
|
||||
def render_verbose(rules, capabilities):
|
||||
def render_verbose(meta, rules, capabilities):
|
||||
# break import loop
|
||||
import capa.render.verbose
|
||||
|
||||
doc = convert_capabilities_to_result_document(rules, capabilities)
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return capa.render.verbose.render_verbose(doc)
|
||||
|
||||
|
||||
def render_default(rules, capabilities):
|
||||
def render_default(meta, rules, capabilities):
|
||||
# break import loop
|
||||
import capa.render.verbose
|
||||
import capa.render.default
|
||||
|
||||
doc = convert_capabilities_to_result_document(rules, capabilities)
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return capa.render.default.render_default(doc)
|
||||
|
||||
|
||||
class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
"""JSON encoder that emits Python sets as sorted lists"""
|
||||
def default(self, obj):
|
||||
if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, six.string_types):
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
@@ -260,7 +268,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
||||
|
||||
def render_json(rules, capabilities):
|
||||
def render_json(meta, rules, capabilities):
|
||||
return json.dumps(
|
||||
convert_capabilities_to_result_document(rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
|
||||
convert_capabilities_to_result_document(meta, rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
|
||||
)
|
||||
|
||||
@@ -20,7 +20,7 @@ def hex(n):
|
||||
def capability_rules(doc):
|
||||
"""enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc)."""
|
||||
for (_, _, rule) in sorted(
|
||||
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc.values())
|
||||
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc["rules"].values())
|
||||
):
|
||||
if rule["meta"].get("lib"):
|
||||
continue
|
||||
|
||||
@@ -43,7 +43,7 @@ def render_verbose(doc):
|
||||
rows.append((key, v))
|
||||
|
||||
if rule["meta"]["scope"] != capa.rules.FILE_SCOPE:
|
||||
locations = doc[rule["meta"]["name"]]["matches"].keys()
|
||||
locations = doc["rules"][rule["meta"]["name"]]["matches"].keys()
|
||||
rows.append(("matches", "\n".join(map(rutils.hex, locations))))
|
||||
|
||||
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
|
||||
|
||||
@@ -165,7 +165,7 @@ def render_vverbose(doc):
|
||||
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
|
||||
|
||||
if rule["meta"]["scope"] == capa.rules.FILE_SCOPE:
|
||||
matches = list(doc[rule["meta"]["name"]]["matches"].values())
|
||||
matches = list(doc["rules"][rule["meta"]["name"]]["matches"].values())
|
||||
if len(matches) != 1:
|
||||
# i think there should only ever be one match per file-scope rule,
|
||||
# because we do the file-scope evaluation a single time.
|
||||
@@ -174,7 +174,7 @@ def render_vverbose(doc):
|
||||
raise RuntimeError("unexpected file scope match count: " + len(matches))
|
||||
render_match(ostream, matches[0], indent=0)
|
||||
else:
|
||||
for location, match in sorted(doc[rule["meta"]["name"]]["matches"].items()):
|
||||
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()):
|
||||
ostream.write(rule["meta"]["scope"])
|
||||
ostream.write(" @ ")
|
||||
ostream.writeln(rutils.hex(location))
|
||||
|
||||
Reference in New Issue
Block a user