mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
add example how to render it as library
```
>>> from capa_as_library import capa_details
>>> details = capa_details("/opt/CAPEv2/storage/analyses/83/binary", "dictionary")
>>> from pprint import pprint as pp
>>> pp(details)
{'ATTCK': {'DEFENSE EVASION': ['Obfuscated Files or Information [T1027]',
'Virtualization/Sandbox Evasion::System Checks '
'[T1497.001]'],
'EXECUTION': ['Shared Modules [T1129]']},
'CAPABILITY': {'anti-analysis/anti-vm/vm-detection': ['execute anti-VM '
'instructions (3 '
'matches)'],
'anti-analysis/obfuscation/string/stackstring': ['contain '
'obfuscated '
'stackstrings'],
'data-manipulation/encryption/rc4': ['encrypt data using RC4 '
'PRGA'],
'executable/pe/section/rsrc': ['contain a resource (.rsrc) '
'section'],
'host-interaction/cli': ['accept command line arguments'],
'host-interaction/environment-variable': ['query environment '
'variable'],
'host-interaction/file-system/read': ['read .ini file',
'read file'],
'host-interaction/file-system/write': ['write file (3 '
'matches)'],
'host-interaction/process': ['get thread local storage value '
'(3 matches)',
'set thread local storage value '
'(2 matches)'],
'host-interaction/process/terminate': ['terminate process (3 '
'matches)'],
'host-interaction/thread/terminate': ['terminate thread'],
'linking/runtime-linking': ['link function at runtime (7 '
'matches)',
'link many functions at runtime'],
'load-code/pe': ['parse PE header (3 matches)']},
'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Virtual Machine Detection::Instruction '
'Testing [B0009.029]'],
'ANTI-STATIC ANALYSIS': ['Disassembler Evasion::Argument Obfuscation '
'[B0012.001]'],
'CRYPTOGRAPHY': ['Encrypt Data::RC4 [C0027.009]',
'Generate Pseudo-random Sequence::RC4 PRGA '
'[C0021.004]']},
'md5': 'ad56c384476a81faef9aebd60b2f4623',
'path': '/opt/CAPEv2/storage/analyses/83/binary',
'sha1': 'aa027d89f5d3f991ad3e14ffb681616a77621836',
'sha256': '16995e059eb47de0b58a95ce2c3d863d964a7a16064d4298cee9db1de266e68d'}
>>>
```
This commit is contained in:
@@ -244,15 +244,6 @@ def render_default(meta, rules, capabilities):
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return capa.render.default.render_default(doc)
|
||||
|
||||
def render_dictionary(meta, rules, capabilities):
|
||||
# break import loop
|
||||
import capa.render.dictionary
|
||||
import capa.render.verbose
|
||||
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return capa.render.dictionary.render_dictionary(doc)
|
||||
|
||||
|
||||
class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
"""JSON encoder that emits Python sets as sorted lists"""
|
||||
|
||||
|
||||
@@ -1,177 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import collections
|
||||
|
||||
import capa.render.utils as rutils
|
||||
|
||||
def render_meta(doc, ostream):
|
||||
|
||||
ostream["md5"] = doc["meta"]["sample"]["md5"]
|
||||
ostream["sha1"] = doc["meta"]["sample"]["sha1"]
|
||||
ostream["sha256"] = doc["meta"]["sample"]["sha256"]
|
||||
ostream["path"] =doc["meta"]["sample"]["path"]
|
||||
|
||||
def find_subrule_matches(doc):
|
||||
"""
|
||||
collect the rule names that have been matched as a subrule match.
|
||||
this way we can avoid displaying entries for things that are too specific.
|
||||
"""
|
||||
matches = set([])
|
||||
|
||||
def rec(node):
|
||||
if not node["success"]:
|
||||
# there's probably a bug here for rules that do `not: match: ...`
|
||||
# but we don't have any examples of this yet
|
||||
return
|
||||
|
||||
elif node["node"]["type"] == "statement":
|
||||
for child in node["children"]:
|
||||
rec(child)
|
||||
|
||||
elif node["node"]["type"] == "feature":
|
||||
if node["node"]["feature"]["type"] == "match":
|
||||
matches.add(node["node"]["feature"]["match"])
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for node in rule["matches"].values():
|
||||
rec(node)
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def render_capabilities(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'CAPABILITY': {'accept command line arguments': 'host-interaction/cli',
|
||||
'allocate thread local storage (2 matches)': 'host-interaction/process',
|
||||
'check for time delay via GetTickCount': 'anti-analysis/anti-debugging/debugger-detection',
|
||||
'check if process is running under wine': 'anti-analysis/anti-emulation/wine',
|
||||
'contain a resource (.rsrc) section': 'executable/pe/section/rsrc',
|
||||
'write file (3 matches)': 'host-interaction/file-system/write'}
|
||||
}
|
||||
"""
|
||||
subrule_matches = find_subrule_matches(doc)
|
||||
|
||||
ostream["CAPABILITY"] = dict()
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if rule["meta"]["name"] in subrule_matches:
|
||||
# rules that are also matched by other rules should not get rendered by default.
|
||||
# this cuts down on the amount of output while giving approx the same detail.
|
||||
# see #224
|
||||
continue
|
||||
|
||||
count = len(rule["matches"])
|
||||
if count == 1:
|
||||
capability = rule["meta"]["name"]
|
||||
else:
|
||||
capability = "%s (%d matches)" % (rule["meta"]["name"], count)
|
||||
|
||||
ostream["CAPABILITY"].setdefault(rule["meta"]["namespace"], list())
|
||||
ostream["CAPABILITY"][rule["meta"]["namespace"]].append(capability)
|
||||
|
||||
def render_attack(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'ATTCK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'],
|
||||
'DEFENSE EVASION': ['Obfuscated Files or Information [T1027]',
|
||||
'Virtualization/Sandbox Evasion::System Checks '
|
||||
'[T1497.001]'],
|
||||
'DISCOVERY': ['File and Directory Discovery [T1083]',
|
||||
'Query Registry [T1012]',
|
||||
'System Information Discovery [T1082]'],
|
||||
'EXECUTION': ['Shared Modules [T1129]']}
|
||||
}
|
||||
"""
|
||||
ostream["ATTCK"] = dict()
|
||||
tactics = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("att&ck"):
|
||||
continue
|
||||
|
||||
for attack in rule["meta"]["att&ck"]:
|
||||
tactic, _, rest = attack.partition("::")
|
||||
if "::" in rest:
|
||||
technique, _, rest = rest.partition("::")
|
||||
subtechnique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, subtechnique, id))
|
||||
else:
|
||||
technique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, id))
|
||||
|
||||
for tactic, techniques in sorted(tactics.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(techniques):
|
||||
if len(spec) == 2:
|
||||
technique, id = spec
|
||||
inner_rows.append("%s %s" % (technique, id))
|
||||
elif len(spec) == 3:
|
||||
technique, subtechnique, id = spec
|
||||
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected ATT&CK spec format")
|
||||
ostream["ATTCK"].setdefault(tactic.upper(), inner_rows)
|
||||
|
||||
|
||||
def render_mbc(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check '
|
||||
'GetTickCount [B0001.032]',
|
||||
'Emulator Detection [B0004]',
|
||||
'Virtual Machine Detection::Instruction '
|
||||
'Testing [B0009.029]',
|
||||
'Virtual Machine Detection [B0009]'],
|
||||
'COLLECTION': ['Keylogging::Polling [F0002.002]'],
|
||||
'CRYPTOGRAPHY': ['Encrypt Data::RC4 [C0027.009]',
|
||||
'Generate Pseudo-random Sequence::RC4 PRGA '
|
||||
'[C0021.004]']}
|
||||
}
|
||||
"""
|
||||
ostream["MBC"] = dict()
|
||||
objectives = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("mbc"):
|
||||
continue
|
||||
|
||||
mbcs = rule["meta"]["mbc"]
|
||||
if not isinstance(mbcs, list):
|
||||
raise ValueError("invalid rule: MBC mapping is not a list")
|
||||
|
||||
for mbc in mbcs:
|
||||
objective, _, rest = mbc.partition("::")
|
||||
if "::" in rest:
|
||||
behavior, _, rest = rest.partition("::")
|
||||
method, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, method, id))
|
||||
else:
|
||||
behavior, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, id))
|
||||
|
||||
for objective, behaviors in sorted(objectives.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(behaviors):
|
||||
if len(spec) == 2:
|
||||
behavior, id = spec
|
||||
inner_rows.append("%s %s" % (behavior, id))
|
||||
elif len(spec) == 3:
|
||||
behavior, method, id = spec
|
||||
inner_rows.append("%s::%s %s" % (behavior, method, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected MBC spec format")
|
||||
ostream["MBC"].setdefault(objective.upper(), inner_rows)
|
||||
|
||||
def render_dictionary(doc):
|
||||
ostream = dict()
|
||||
|
||||
render_meta(doc, ostream)
|
||||
render_attack(doc, ostream)
|
||||
render_mbc(doc, ostream)
|
||||
render_capabilities(doc, ostream)
|
||||
|
||||
return ostream
|
||||
@@ -1,33 +1,209 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
|
||||
import collections
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.features
|
||||
from capa.engine import *
|
||||
import capa.render.utils as rutils
|
||||
from capa.render import convert_capabilities_to_result_document
|
||||
|
||||
# edit this to set the path for file to analyze and rule directory
|
||||
SAMPLE_PATH = "path/to/file"
|
||||
RULES_PATH = "/tmp/capa/rules/"
|
||||
|
||||
# load rules from disk
|
||||
rules = capa.main.get_rules(RULES_PATH, disable_progress=True)
|
||||
rules = capa.rules.RuleSet(rules)
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(SAMPLE_PATH, "auto", disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
# === Render ddictionary helpers
|
||||
def render_meta(doc, ostream):
|
||||
ostream["md5"] = doc["meta"]["sample"]["md5"]
|
||||
ostream["sha1"] = doc["meta"]["sample"]["sha1"]
|
||||
ostream["sha256"] = doc["meta"]["sample"]["sha256"]
|
||||
ostream["path"] = doc["meta"]["sample"]["path"]
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata("", SAMPLE_PATH, RULES_PATH, "auto", extractor)
|
||||
meta["analysis"].update(counts)
|
||||
def find_subrule_matches(doc):
|
||||
"""
|
||||
collect the rule names that have been matched as a subrule match.
|
||||
this way we can avoid displaying entries for things that are too specific.
|
||||
"""
|
||||
matches = set([])
|
||||
|
||||
# render results
|
||||
# ...as json
|
||||
capa_json = json.loads(capa.render.render_json(meta, rules, capabilities))
|
||||
# ...as human readable text table
|
||||
capa_texttable = capa.render.render_default(meta, rules, capabilities)
|
||||
# ...as python dictionary, simplified as textable but in dictionary
|
||||
capa_dict = capa.render.render_dictionary(meta, rules, capabilities)
|
||||
def rec(node):
|
||||
if not node["success"]:
|
||||
# there's probably a bug here for rules that do `not: match: ...`
|
||||
# but we don't have any examples of this yet
|
||||
return
|
||||
|
||||
elif node["node"]["type"] == "statement":
|
||||
for child in node["children"]:
|
||||
rec(child)
|
||||
|
||||
elif node["node"]["type"] == "feature":
|
||||
if node["node"]["feature"]["type"] == "match":
|
||||
matches.add(node["node"]["feature"]["match"])
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for node in rule["matches"].values():
|
||||
rec(node)
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def render_capabilities(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'CAPABILITY': {'accept command line arguments': 'host-interaction/cli',
|
||||
'allocate thread local storage (2 matches)': 'host-interaction/process',
|
||||
'check for time delay via GetTickCount': 'anti-analysis/anti-debugging/debugger-detection',
|
||||
'check if process is running under wine': 'anti-analysis/anti-emulation/wine',
|
||||
'contain a resource (.rsrc) section': 'executable/pe/section/rsrc',
|
||||
'write file (3 matches)': 'host-interaction/file-system/write'}
|
||||
}
|
||||
"""
|
||||
subrule_matches = find_subrule_matches(doc)
|
||||
|
||||
ostream["CAPABILITY"] = dict()
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if rule["meta"]["name"] in subrule_matches:
|
||||
# rules that are also matched by other rules should not get rendered by default.
|
||||
# this cuts down on the amount of output while giving approx the same detail.
|
||||
# see #224
|
||||
continue
|
||||
|
||||
count = len(rule["matches"])
|
||||
if count == 1:
|
||||
capability = rule["meta"]["name"]
|
||||
else:
|
||||
capability = "%s (%d matches)" % (rule["meta"]["name"], count)
|
||||
|
||||
ostream["CAPABILITY"].setdefault(rule["meta"]["namespace"], list())
|
||||
ostream["CAPABILITY"][rule["meta"]["namespace"]].append(capability)
|
||||
|
||||
def render_attack(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'ATT&CK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'],
|
||||
'DEFENSE EVASION': ['Obfuscated Files or Information [T1027]',
|
||||
'Virtualization/Sandbox Evasion::System Checks '
|
||||
'[T1497.001]'],
|
||||
'DISCOVERY': ['File and Directory Discovery [T1083]',
|
||||
'Query Registry [T1012]',
|
||||
'System Information Discovery [T1082]'],
|
||||
'EXECUTION': ['Shared Modules [T1129]']}
|
||||
}
|
||||
"""
|
||||
ostream["ATTCK"] = dict()
|
||||
tactics = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("att&ck"):
|
||||
continue
|
||||
|
||||
for attack in rule["meta"]["att&ck"]:
|
||||
tactic, _, rest = attack.partition("::")
|
||||
if "::" in rest:
|
||||
technique, _, rest = rest.partition("::")
|
||||
subtechnique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, subtechnique, id))
|
||||
else:
|
||||
technique, _, id = rest.rpartition(" ")
|
||||
tactics[tactic].add((technique, id))
|
||||
|
||||
for tactic, techniques in sorted(tactics.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(techniques):
|
||||
if len(spec) == 2:
|
||||
technique, id = spec
|
||||
inner_rows.append("%s %s" % (technique, id))
|
||||
elif len(spec) == 3:
|
||||
technique, subtechnique, id = spec
|
||||
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected ATT&CK spec format")
|
||||
ostream["ATTCK"].setdefault(tactic.upper(), inner_rows)
|
||||
|
||||
|
||||
def render_mbc(doc, ostream):
|
||||
"""
|
||||
example::
|
||||
{'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check '
|
||||
'GetTickCount [B0001.032]',
|
||||
'Emulator Detection [B0004]',
|
||||
'Virtual Machine Detection::Instruction '
|
||||
'Testing [B0009.029]',
|
||||
'Virtual Machine Detection [B0009]'],
|
||||
'COLLECTION': ['Keylogging::Polling [F0002.002]'],
|
||||
'CRYPTOGRAPHY': ['Encrypt Data::RC4 [C0027.009]',
|
||||
'Generate Pseudo-random Sequence::RC4 PRGA '
|
||||
'[C0021.004]']}
|
||||
}
|
||||
"""
|
||||
ostream["MBC"] = dict()
|
||||
objectives = collections.defaultdict(set)
|
||||
for rule in rutils.capability_rules(doc):
|
||||
if not rule["meta"].get("mbc"):
|
||||
continue
|
||||
|
||||
mbcs = rule["meta"]["mbc"]
|
||||
if not isinstance(mbcs, list):
|
||||
raise ValueError("invalid rule: MBC mapping is not a list")
|
||||
|
||||
for mbc in mbcs:
|
||||
objective, _, rest = mbc.partition("::")
|
||||
if "::" in rest:
|
||||
behavior, _, rest = rest.partition("::")
|
||||
method, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, method, id))
|
||||
else:
|
||||
behavior, _, id = rest.rpartition(" ")
|
||||
objectives[objective].add((behavior, id))
|
||||
|
||||
for objective, behaviors in sorted(objectives.items()):
|
||||
inner_rows = []
|
||||
for spec in sorted(behaviors):
|
||||
if len(spec) == 2:
|
||||
behavior, id = spec
|
||||
inner_rows.append("%s %s" % (behavior, id))
|
||||
elif len(spec) == 3:
|
||||
behavior, method, id = spec
|
||||
inner_rows.append("%s::%s %s" % (behavior, method, id))
|
||||
else:
|
||||
raise RuntimeError("unexpected MBC spec format")
|
||||
ostream["MBC"].setdefault(objective.upper(), inner_rows)
|
||||
|
||||
def render_dictionary(doc):
|
||||
ostream = dict()
|
||||
render_meta(doc, ostream)
|
||||
render_attack(doc, ostream)
|
||||
render_mbc(doc, ostream)
|
||||
render_capabilities(doc, ostream)
|
||||
|
||||
return ostream
|
||||
|
||||
# === render dictionary helpers
|
||||
def capa_details(file_path: str, output_format: str="dictionary") -> dict:
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(file_path, "auto", disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, "auto", extractor)
|
||||
meta["analysis"].update(counts)
|
||||
|
||||
capa_output = False
|
||||
if output_format == "dictionary":
|
||||
# ...as python dictionary, simplified as textable but in dictionary
|
||||
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
capa_output = render_dictionary(doc)
|
||||
elif output_format == "json":
|
||||
# render results
|
||||
# ...as json
|
||||
capa_output = json.loads(capa.render.render_json(meta, rules, capabilities))
|
||||
elif output_format == "texttable":
|
||||
# ...as human readable text table
|
||||
capa_output = capa.render.render_default(meta, rules, capabilities)
|
||||
|
||||
return capa_output
|
||||
|
||||
Reference in New Issue
Block a user