Merge pull request #1502 from Aayush-Goel-04/Aayush-Goel-04/Issue#1411

Update Metadata type in capa main
This commit is contained in:
Willi Ballenthin
2023-06-06 13:04:35 +02:00
committed by GitHub
9 changed files with 129 additions and 181 deletions

View File

@@ -7,6 +7,7 @@
- Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
### Breaking Changes
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat
### New Rules (7)

View File

@@ -22,7 +22,8 @@ import capa
import capa.version
import capa.render.utils as rutils
import capa.features.common
import capa.render.result_document
import capa.features.freeze
import capa.render.result_document as rdoc
from capa.features.address import AbsoluteVirtualAddress
logger = logging.getLogger("capa")
@@ -140,37 +141,35 @@ def collect_metadata(rules):
else:
os = "unknown os"
return {
"timestamp": datetime.datetime.now().isoformat(),
"argv": [],
"sample": {
"md5": md5,
"sha1": "", # not easily accessible
"sha256": sha256,
"path": idaapi.get_input_file_path(),
},
"analysis": {
"format": idaapi.get_file_type_name(),
"arch": arch,
"os": os,
"extractor": "ida",
"rules": rules,
"base_address": idaapi.get_imagebase(),
"layout": {
return rdoc.Metadata(
timestamp=datetime.datetime.now(),
version=capa.version.__version__,
argv=(),
sample=rdoc.Sample(
md5=md5,
sha1="", # not easily accessible
sha256=sha256,
path=idaapi.get_input_file_path(),
),
analysis=rdoc.Analysis(
format=idaapi.get_file_type_name(),
arch=arch,
os=os,
extractor="ida",
rules=rules,
base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
layout=rdoc.Layout(
functions=tuple()
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
},
),
# ignore these for now - not used by IDA plugin.
"feature_counts": {
"file": {},
"functions": {},
},
"library_functions": {},
},
"version": capa.version.__version__,
}
feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
library_functions=tuple(),
),
)
class IDAIO:
@@ -217,12 +216,12 @@ def idb_contains_cached_results() -> bool:
return False
def load_and_verify_cached_results() -> Optional[capa.render.result_document.ResultDocument]:
def load_and_verify_cached_results() -> Optional[rdoc.ResultDocument]:
"""verifies that cached results have valid (mapped) addresses for the current database"""
logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE)
n = netnode.Netnode(CAPA_NETNODE)
doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
doc = rdoc.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
for rule in rutils.capability_rules(doc):
for location_, _ in rule.matches:

View File

@@ -771,8 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(ruleset, extractor, capabilities)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False

View File

@@ -38,9 +38,11 @@ import capa.rules.cache
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.features.freeze as frz
import capa.render.vverbose
import capa.features.extractors
import capa.render.result_document
import capa.render.result_document as rdoc
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.dnfile_
@@ -245,13 +247,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
all_bb_matches = collections.defaultdict(list) # type: MatchResults
all_insn_matches = collections.defaultdict(list) # type: MatchResults
meta = {
"feature_counts": {
"file": 0,
"functions": {},
},
"library_functions": {},
} # type: Dict[str, Any]
feature_counts = rdoc.FeatureCounts(file=0, functions=tuple())
library_functions: Tuple[rdoc.LibraryFunction, ...] = tuple()
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
@@ -270,8 +267,10 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
meta["library_functions"][f.address] = function_name
n_libs = len(meta["library_functions"])
library_functions += (
rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name),
)
n_libs = len(library_functions)
percentage = round(100 * (n_libs / n_funcs))
if isinstance(pb, tqdm.tqdm):
pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)")
@@ -280,7 +279,9 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(
ruleset, extractor, f
)
meta["feature_counts"]["functions"][f.address] = feature_count
feature_counts.functions += (
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
)
logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)
for rule_name, res in function_matches.items():
@@ -301,7 +302,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count
feature_counts.file = feature_count
matches = {
rule_name: results
@@ -316,6 +317,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
)
}
meta = {
"feature_counts": feature_counts,
"library_functions": library_functions,
}
return matches, meta
@@ -739,7 +745,7 @@ def collect_metadata(
os_: str,
rules_path: List[str],
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
):
) -> rdoc.Metadata:
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
@@ -758,34 +764,37 @@ def collect_metadata(
arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
return {
"timestamp": datetime.datetime.now().isoformat(),
"version": capa.version.__version__,
"argv": argv,
"sample": {
"md5": md5.hexdigest(),
"sha1": sha1.hexdigest(),
"sha256": sha256.hexdigest(),
"path": os.path.normpath(sample_path),
},
"analysis": {
"format": format_,
"arch": arch,
"os": os_,
"extractor": extractor.__class__.__name__,
"rules": rules_path,
"base_address": extractor.get_base_address(),
"layout": {
return rdoc.Metadata(
timestamp=datetime.datetime.now(),
version=capa.version.__version__,
argv=tuple(argv) if argv else None,
sample=rdoc.Sample(
md5=md5.hexdigest(),
sha1=sha1.hexdigest(),
sha256=sha256.hexdigest(),
path=os.path.normpath(sample_path),
),
analysis=rdoc.Analysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
base_address=frz.Address.from_capa(extractor.get_base_address()),
layout=rdoc.Layout(
functions=tuple(),
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
},
},
}
),
feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
library_functions=tuple(),
),
)
def compute_layout(rules, extractor, capabilities):
def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
@@ -810,17 +819,19 @@ def compute_layout(rules, extractor, capabilities):
assert addr in functions_by_bb
matched_bbs.add(addr)
layout = {
"functions": {
f: {
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
# this object is open to extension in the future,
layout = rdoc.Layout(
functions=tuple(
rdoc.FunctionLayout(
address=frz.Address.from_capa(f),
matched_basic_blocks=tuple(
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
) # this object is open to extension in the future,
# such as with the function name, etc.
}
)
for f, bbs in bbs_by_function.items()
if len([bb for bb in bbs if bb in matched_bbs]) > 0
}
}
)
)
return layout
@@ -1197,8 +1208,7 @@ def main(argv=None):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION
# TODO: #1411 use a real type, not a dict here.
meta: Dict[str, Any]
meta: rdoc.Metadata
capabilities: MatchResults
counts: Dict[str, Any]
@@ -1214,7 +1224,7 @@ def main(argv=None):
if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
extractor = frz.load(f.read())
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces
@@ -1255,15 +1265,16 @@ def main(argv=None):
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = compute_layout(rules, extractor, capabilities)
if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return E_FILE_LIMITATION
if args.json:
print(capa.render.json.render(meta, rules, capabilities))
elif args.vverbose:
@@ -1308,7 +1319,9 @@ def ida_main():
meta = capa.ida.helpers.collect_metadata([rules_path])
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta["analysis"].update(counts)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

View File

@@ -28,42 +28,47 @@ class FrozenModel(BaseModel):
extra = "forbid"
class Sample(FrozenModel):
class Model(BaseModel):
class Config:
extra = "forbid"
class Sample(Model):
md5: str
sha1: str
sha256: str
path: str
class BasicBlockLayout(FrozenModel):
class BasicBlockLayout(Model):
address: frz.Address
class FunctionLayout(FrozenModel):
class FunctionLayout(Model):
address: frz.Address
matched_basic_blocks: Tuple[BasicBlockLayout, ...]
class Layout(FrozenModel):
class Layout(Model):
functions: Tuple[FunctionLayout, ...]
class LibraryFunction(FrozenModel):
class LibraryFunction(Model):
address: frz.Address
name: str
class FunctionFeatureCount(FrozenModel):
class FunctionFeatureCount(Model):
address: frz.Address
count: int
class FeatureCounts(FrozenModel):
class FeatureCounts(Model):
file: int
functions: Tuple[FunctionFeatureCount, ...]
class Analysis(FrozenModel):
class Analysis(Model):
format: str
arch: str
os: str
@@ -75,92 +80,13 @@ class Analysis(FrozenModel):
library_functions: Tuple[LibraryFunction, ...]
class Metadata(FrozenModel):
class Metadata(Model):
timestamp: datetime.datetime
version: str
argv: Optional[Tuple[str, ...]]
sample: Sample
analysis: Analysis
@classmethod
def from_capa(cls, meta: Any) -> "Metadata":
return cls(
timestamp=meta["timestamp"],
version=meta["version"],
argv=meta["argv"] if "argv" in meta else None,
sample=Sample(
md5=meta["sample"]["md5"],
sha1=meta["sample"]["sha1"],
sha256=meta["sample"]["sha256"],
path=meta["sample"]["path"],
),
analysis=Analysis(
format=meta["analysis"]["format"],
arch=meta["analysis"]["arch"],
os=meta["analysis"]["os"],
extractor=meta["analysis"]["extractor"],
rules=meta["analysis"]["rules"],
base_address=frz.Address.from_capa(meta["analysis"]["base_address"]),
layout=Layout(
functions=tuple(
FunctionLayout(
address=frz.Address.from_capa(address),
matched_basic_blocks=tuple(
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
),
)
for address, f in meta["analysis"]["layout"]["functions"].items()
)
),
feature_counts=FeatureCounts(
file=meta["analysis"]["feature_counts"]["file"],
functions=tuple(
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
),
),
library_functions=tuple(
LibraryFunction(address=frz.Address.from_capa(address), name=name)
for address, name in meta["analysis"]["library_functions"].items()
),
),
)
def to_capa(self) -> Dict[str, Any]:
capa_meta = {
"timestamp": self.timestamp.isoformat(),
"version": self.version,
"sample": {
"md5": self.sample.md5,
"sha1": self.sample.sha1,
"sha256": self.sample.sha256,
"path": self.sample.path,
},
"analysis": {
"format": self.analysis.format,
"arch": self.analysis.arch,
"os": self.analysis.os,
"extractor": self.analysis.extractor,
"rules": self.analysis.rules,
"base_address": self.analysis.base_address.to_capa(),
"layout": {
"functions": {
f.address.to_capa(): {
"matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks]
}
for f in self.analysis.layout.functions
}
},
"feature_counts": {
"file": self.analysis.feature_counts.file,
"functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions},
},
"library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions},
},
}
return capa_meta
class CompoundStatementType:
AND = "and"
@@ -642,7 +568,7 @@ class ResultDocument(FrozenModel):
rules: Dict[str, RuleMatches]
@classmethod
def from_capa(cls, meta, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument":
def from_capa(cls, meta: Metadata, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument":
rule_matches: Dict[str, RuleMatches] = {}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
@@ -659,10 +585,9 @@ class ResultDocument(FrozenModel):
),
)
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
return ResultDocument(meta=meta, rules=rule_matches)
def to_capa(self) -> Tuple[Dict, Dict]:
meta = self.meta.to_capa()
def to_capa(self) -> Tuple[Metadata, Dict]:
capabilities: Dict[
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
] = collections.defaultdict(list)
@@ -678,4 +603,4 @@ class ResultDocument(FrozenModel):
capabilities[rule_name].append((addr.to_capa(), result))
return meta, capabilities
return self.meta, capabilities

View File

@@ -131,8 +131,10 @@ def get_capa_results(args):
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)

View File

@@ -172,10 +172,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
# collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
capa_output: Any = False
if output_format == "dictionary":
# ...as python dictionary, simplified as textable but in dictionary
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)

View File

@@ -178,8 +178,10 @@ def main(argv=None):
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
if capa.main.has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary

View File

@@ -282,5 +282,5 @@ def test_rdoc_to_capa():
rd = rdoc.ResultDocument.parse_file(path)
meta, capabilites = rd.to_capa()
assert isinstance(meta, dict)
assert isinstance(meta, rdoc.Metadata)
assert isinstance(capabilites, dict)