Merge pull request #81 from fireeye/render-metadata

render: report header and metadata
This commit is contained in:
Willi Ballenthin
2020-07-02 16:33:32 -06:00
committed by GitHub
12 changed files with 163 additions and 42 deletions

View File

@@ -39,6 +39,15 @@ class FeatureExtractor(object):
#
super(FeatureExtractor, self).__init__()
@abc.abstractmethod
def get_base_address(self):
"""
fetch the preferred load address at which the sample was analyzed.
returns: int
"""
raise NotImplemented
@abc.abstractmethod
def extract_file_features(self):
"""

View File

@@ -43,6 +43,9 @@ class IdaFeatureExtractor(FeatureExtractor):
def __init__(self):
super(IdaFeatureExtractor, self).__init__()
def get_base_address(self):
return idaapi.get_imagebase()
def extract_file_features(self):
for feature, va in capa.features.extractors.ida.file.extract_features():
yield feature, va

View File

@@ -44,6 +44,10 @@ class VivisectFeatureExtractor(FeatureExtractor):
self.vw = vw
self.path = path
def get_base_address(self):
# assume there is only one file loaded into the vw
return list(self.vw.filemeta.values())[0]["imagebase"]
def extract_file_features(self):
for feature, va in capa.features.extractors.viv.file.extract_features(self.vw, self.path):
yield feature, va

View File

@@ -1,7 +1,9 @@
import logging
import datetime
import idc
import idaapi
import idautils
logger = logging.getLogger("capa")
@@ -48,3 +50,17 @@ def get_func_start_ea(ea):
""" """
f = idaapi.get_func(ea)
return f if f is None else f.start_ea
def collect_metadata():
return {
"timestamp": datetime.datetime.now().isoformat(),
# "argv" is not relevant here
"sample": {
"md5": idautils.GetInputFileMD5(),
# "sha1" not easily accessible
"sha256": idaapi.retrieve_input_file_sha256(),
"path": idaapi.get_input_file_path(),
},
"analysis": {"format": idaapi.get_file_type_name(), "extractor": "ida",},
}

View File

@@ -363,12 +363,8 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("analysis completed.")
doc = capa.render.convert_capabilities_to_result_document(rules, capabilities)
import json
with open("C:\\Users\\spring\\Desktop\\hmm.json", "w") as twitter_data_file:
json.dump(doc, twitter_data_file, indent=4, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder)
meta = capa.ida.helpers.collect_metadata()
doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
self.model_data.render_capa_doc(doc)
self.render_capa_doc_summary(doc)

View File

@@ -4,8 +4,10 @@ capa - detect capabilities in programs.
"""
import os
import sys
import hashlib
import logging
import os.path
import datetime
import collections
import tqdm
@@ -320,6 +322,36 @@ def get_rules(rule_path):
return rules
def collect_metadata(argv, path, format, extractor):
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
with open(path, "rb") as f:
buf = f.read()
md5.update(buf)
sha1.update(buf)
sha256.update(buf)
return {
"timestamp": datetime.datetime.now().isoformat(),
"version": capa.version.__version__,
"argv": argv,
"sample": {
"md5": md5.hexdigest(),
"sha1": sha1.hexdigest(),
"sha256": sha256.hexdigest(),
"path": os.path.normpath(path),
},
"analysis": {
"format": format,
"extractor": extractor.__class__.__name__,
"base_address": extractor.get_base_address(),
},
}
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
@@ -420,9 +452,11 @@ def main(argv=None):
taste = f.read(8)
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
format = "freeze"
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
format = args.format
try:
extractor = get_extractor(args.sample, args.format)
except UnsupportedFormatError:
@@ -446,6 +480,8 @@ def main(argv=None):
logger.error("-" * 80)
return -1
meta = collect_metadata(argv, args.sample, format, extractor)
capabilities = find_capabilities(rules, extractor)
if has_file_limitation(rules, capabilities):
@@ -460,13 +496,13 @@ def main(argv=None):
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
if args.json:
print(capa.render.render_json(rules, capabilities))
print(capa.render.render_json(meta, rules, capabilities))
elif args.vverbose:
print(capa.render.render_vverbose(rules, capabilities))
print(capa.render.render_vverbose(meta, rules, capabilities))
elif args.verbose:
print(capa.render.render_verbose(rules, capabilities))
print(capa.render.render_verbose(meta, rules, capabilities))
else:
print(capa.render.render_default(rules, capabilities))
print(capa.render.render_default(meta, rules, capabilities))
colorama.deinit()
logger.info("done.")
@@ -475,11 +511,12 @@ def main(argv=None):
def ida_main():
import capa.ida.helpers
import capa.features.extractors.ida
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
import capa.ida.helpers
if not capa.ida.helpers.is_supported_file_type():
return -1
@@ -500,18 +537,15 @@ def ida_main():
logger.debug("default rule path (source method): %s", rules_path)
rules = get_rules(rules_path)
import capa.rules
rules = capa.rules.RuleSet(rules)
import capa.features.extractors.ida
capabilities = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor())
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
render_capabilities_default(rules, capabilities)
meta = capa.ida.helpers.collect_metadata()
print(capa.render.render_default(meta, rules, capabilities))
def is_runtime_ida():

View File

@@ -175,7 +175,7 @@ def convert_match_to_result_document(rules, capabilities, result):
return doc
def convert_capabilities_to_result_document(rules, capabilities):
def convert_capabilities_to_result_document(meta, rules, capabilities):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `render_*` routines
@@ -187,22 +187,29 @@ def convert_capabilities_to_result_document(rules, capabilities):
```json
{
$rule-name: {
"meta": {...copied from rule.meta...},
"matches: {
$address: {...match details...},
...
}
},
...
"meta": {...},
"rules: {
$rule-name: {
"meta": {...copied from rule.meta...},
"matches: {
$address: {...match details...},
...
}
},
...
}
}
```
Args:
meta (Dict[str, Any]):
rules (RuleSet):
capabilities (Dict[str, List[Tuple[int, Result]]]):
"""
doc = {}
doc = {
"meta": meta,
"rules": {},
}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
@@ -210,7 +217,7 @@ def convert_capabilities_to_result_document(rules, capabilities):
if rule.meta.get("capa/subscope-rule"):
continue
doc[rule_name] = {
doc["rules"][rule_name] = {
"meta": dict(rule.meta),
"source": rule.definition,
"matches": {
@@ -221,35 +228,37 @@ def convert_capabilities_to_result_document(rules, capabilities):
return doc
def render_vverbose(rules, capabilities):
def render_vverbose(meta, rules, capabilities):
# there's an import loop here
# if capa.render imports capa.render.vverbose
# and capa.render.vverbose import capa.render (implicitly, as a submodule)
# so, defer the import until routine is called, breaking the import loop.
import capa.render.vverbose
doc = convert_capabilities_to_result_document(rules, capabilities)
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.vverbose.render_vverbose(doc)
def render_verbose(rules, capabilities):
def render_verbose(meta, rules, capabilities):
# break import loop
import capa.render.verbose
doc = convert_capabilities_to_result_document(rules, capabilities)
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.verbose.render_verbose(doc)
def render_default(rules, capabilities):
def render_default(meta, rules, capabilities):
# break import loop
import capa.render.verbose
import capa.render.default
doc = convert_capabilities_to_result_document(rules, capabilities)
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.default.render_default(doc)
class CapaJsonObjectEncoder(json.JSONEncoder):
"""JSON encoder that emits Python sets as sorted lists"""
def default(self, obj):
if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, six.string_types):
return json.JSONEncoder.default(self, obj)
@@ -260,7 +269,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
return json.JSONEncoder.default(self, obj)
def render_json(rules, capabilities):
def render_json(meta, rules, capabilities):
return json.dumps(
convert_capabilities_to_result_document(rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
convert_capabilities_to_result_document(meta, rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True,
)

View File

@@ -5,6 +5,8 @@ import tabulate
import capa.render.utils as rutils
tabulate.PRESERVE_WHITESPACE = True
def width(s, character_count):
"""pad the given string to at least `character_count`"""
@@ -14,6 +16,19 @@ def width(s, character_count):
return s
def render_meta(doc, ostream):
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((width(k, 22), width(doc["meta"][k], 60)))
for k in ("path", "md5"):
rows.append((k, doc["meta"]["sample"][k]))
ostream.write(tabulate.tabulate(rows, tablefmt="psql"))
ostream.write("\n")
def render_capabilities(doc, ostream):
"""
example::
@@ -90,8 +105,10 @@ def render_attack(doc, ostream):
def render_default(doc):
ostream = six.StringIO()
ostream = rutils.StringIO()
render_meta(doc, ostream)
ostream.write("\n")
render_attack(doc, ostream)
ostream.write("\n")
render_capabilities(doc, ostream)

View File

@@ -20,7 +20,7 @@ def hex(n):
def capability_rules(doc):
"""enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc)."""
for (_, _, rule) in sorted(
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc.values())
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc["rules"].values())
):
if rule["meta"].get("lib"):
continue

View File

@@ -23,6 +23,21 @@ import capa.render.utils as rutils
def render_verbose(doc):
ostream = rutils.StringIO()
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((k, doc["meta"][k]))
for k in ("path", "md5", "sha1", "sha256"):
rows.append((k, doc["meta"]["sample"][k]))
for k in ("format", "extractor"):
rows.append((k.replace("_", " "), doc["meta"]["analysis"][k]))
rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"])))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
for rule in rutils.capability_rules(doc):
count = len(rule["matches"])
if count == 1:
@@ -43,7 +58,7 @@ def render_verbose(doc):
rows.append((key, v))
if rule["meta"]["scope"] != capa.rules.FILE_SCOPE:
locations = doc[rule["meta"]["name"]]["matches"].keys()
locations = doc["rules"][rule["meta"]["name"]]["matches"].keys()
rows.append(("matches", "\n".join(map(rutils.hex, locations))))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))

View File

@@ -141,6 +141,22 @@ def render_match(ostream, match, indent=0, mode=MODE_SUCCESS):
def render_vverbose(doc):
ostream = rutils.StringIO()
rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)]
for k in ("timestamp", "version"):
rows.append((k, doc["meta"][k]))
for k in ("path", "md5", "sha1", "sha256"):
rows.append((k, doc["meta"]["sample"][k]))
for k in ("format", "extractor"):
rows.append((k.replace("_", " "), doc["meta"]["analysis"][k]))
rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"])))
ostream.writeln(rutils.bold("Capa Report for " + doc["meta"]["sample"]["md5"]))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
for rule in rutils.capability_rules(doc):
count = len(rule["matches"])
if count == 1:
@@ -165,7 +181,7 @@ def render_vverbose(doc):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
if rule["meta"]["scope"] == capa.rules.FILE_SCOPE:
matches = list(doc[rule["meta"]["name"]]["matches"].values())
matches = list(doc["rules"][rule["meta"]["name"]]["matches"].values())
if len(matches) != 1:
# i think there should only ever be one match per file-scope rule,
# because we do the file-scope evaluation a single time.
@@ -174,7 +190,7 @@ def render_vverbose(doc):
raise RuntimeError("unexpected file scope match count: " + len(matches))
render_match(ostream, matches[0], indent=0)
else:
for location, match in sorted(doc[rule["meta"]["name"]]["matches"].items()):
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()):
ostream.write(rule["meta"]["scope"])
ostream.write(" @ ")
ostream.writeln(rutils.hex(location))

View File

@@ -40,7 +40,9 @@ setuptools.setup(
entry_points={"console_scripts": ["capa=capa.main:main",]},
include_package_data=True,
install_requires=requirements,
extras_require={"dev": ["pytest", "pytest-sugar", "pycodestyle", "black", "isort"]},
extras_require={
"dev": ["pytest", "pytest-sugar", "pytest-instafail", "pytest-cov", "pycodestyle", "black", "isort"]
},
zip_safe=False,
keywords="capa",
classifiers=[