From b35fe6cdb2f85219bfbf0c8eb414f85ee6bb2e7f Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 24 May 2022 13:52:56 -0600 Subject: [PATCH] json, render: work with and serialize addresses --- capa/render/default.py | 2 +- capa/render/result_document.py | 121 ++++++++++++++++++++++++--------- capa/render/verbose.py | 11 +-- capa/render/vverbose.py | 29 +++++--- tests/test_main.py | 9 ++- 5 files changed, 121 insertions(+), 51 deletions(-) diff --git a/capa/render/default.py b/capa/render/default.py index 5c3d4ac0..2772d7e7 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -64,7 +64,7 @@ def find_subrule_matches(doc): matches.add(node["node"]["feature"]["match"]) for rule in rutils.capability_rules(doc): - for node in rule["matches"].values(): + for address, node in rule["matches"]: rec(node) return matches diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 33e083fa..25ced34b 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -6,12 +6,67 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import copy +from typing import Any, List import capa.engine import capa.render.utils import capa.features.common +import capa.features.address from capa.rules import RuleSet from capa.engine import MatchResults +from capa.helpers import assert_never +from capa.features.address import Address + + +def serialize_address(a: Address) -> Any: + if isinstance(a, capa.features.address.AbsoluteVirtualAddress): + return ("absolute", int(a)) + + elif isinstance(a, capa.features.address.RelativeVirtualAddress): + return ("relative", int(a)) + + elif isinstance(a, capa.features.address.FileOffsetAddress): + return ("file", int(a)) + + elif isinstance(a, capa.features.address.DNTokenAddress): + return ("dn token", a.token) + + elif isinstance(a, capa.features.address.DNTokenOffsetAddress): + return ("dn token offset", a.token, a.offset) + + elif a == capa.features.address.NO_ADDRESS: + return ("no address",) + + elif isinstance(a, capa.features.address.Address): + raise ValueError("don't use an Address instance directly") + + else: + assert_never(a) + + +def deserialize_address(doc: List[Any]) -> Address: + atype = doc[0] + + if atype == "absolute": + return capa.features.address.AbsoluteVirtualAddress(doc[1]) + + elif atype == "relative": + return capa.features.address.RelativeVirtualAddress(doc[1]) + + elif atype == "file": + return capa.features.address.FileOffsetAddress(doc[1]) + + elif atype == "dn token": + return capa.features.address.DNTokenAddress(doc[1]) + + elif atype == "dn token offset": + return capa.features.address.DNTokenOffsetAddress(doc[1], doc[2]) + + elif atype == "no address": + return capa.features.address.NO_ADDRESS + + else: + assert_never(atype) def convert_statement_to_result_document(statement): @@ -74,7 +129,13 @@ def convert_feature_to_result_document(feature): if feature.description: result["description"] = feature.description if feature.name in ("regex", "substring"): - result["matches"] = feature.matches + if feature.matches: + # regex featur matches are a dict from the capture group to list of location addresses + result["matches"] = {k: list(map(serialize_address, vs)) for k, vs in feature.matches.items()} + else: + # there were no matches + pass + return result @@ -120,10 +181,10 @@ def convert_match_to_result_document(rules, capabilities, result): # so only add `locations` to feature nodes. if isinstance(result.statement, capa.features.common.Feature): if bool(result.success): - doc["locations"] = result.locations + doc["locations"] = list(map(serialize_address, result.locations)) elif isinstance(result.statement, capa.engine.Range): if bool(result.success): - doc["locations"] = result.locations + doc["locations"] = list(map(serialize_address, result.locations)) # if we have a `match` statement, then we're referencing another rule or namespace. # this could an external rule (written by a human), or @@ -164,7 +225,7 @@ def convert_match_to_result_document(rules, capabilities, result): }, } - for location in doc["locations"]: + for location in result.locations: doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location])) else: # this is a namespace that we're matching @@ -196,7 +257,7 @@ def convert_match_to_result_document(rules, capabilities, result): # this would be a breaking change and require updates to the renderers. # in the meantime, the above might be sufficient. rule_matches = {address: result for (address, result) in capabilities[rule.name]} - for location in doc["locations"]: + for location in result.locations: # doc[locations] contains all matches for the given namespace. # for example, the feature might be `match: anti-analysis/packer` # which matches against "generic unpacker" and "UPX". @@ -280,30 +341,27 @@ def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities: to render as text. see examples of substructures in above routines. - - schema: - - ```json - { - "meta": {...}, - "rules: { - $rule-name: { - "meta": {...copied from rule.meta...}, - "matches: { - $address: {...match details...}, - ... - } - }, - ... - } - } - ``` - - Args: - meta (Dict[str, Any]): - rules (RuleSet): - capabilities (Dict[str, List[Tuple[int, Result]]]): """ + meta["analysis"]["base_address"] = serialize_address(meta["analysis"]["base_address"]) + + meta["analysis"]["feature_counts"]["functions"] = [ + {"address": serialize_address(address), "count": count} + for address, count in meta["analysis"]["feature_counts"]["functions"].items() + ] + + meta["analysis"]["library_functions"] = [ + {"address": serialize_address(address), "name": name} + for address, name in meta["analysis"]["library_functions"].items() + ] + + meta["analysis"]["layout"]["functions"] = [ + { + "address": serialize_address(faddr), + "matched_basic_blocks": list({"address": serialize_address(bb)} for bb in f["matched_basic_blocks"]), + } + for faddr, f in meta["analysis"]["layout"]["functions"].items() + ] + doc = { "meta": meta, "rules": {}, @@ -320,9 +378,10 @@ def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities: doc["rules"][rule_name] = { "meta": rule_meta, "source": rule.definition, - "matches": { - addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches - }, + "matches": [ + [serialize_address(addr), convert_match_to_result_document(rules, capabilities, match)] + for (addr, match) in matches + ], } return doc diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 80d5cada..8a0e5d43 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -28,6 +28,7 @@ import dnfile.mdtable import capa.rules import capa.render.utils as rutils import capa.render.result_document +import capa.render.result_document as rd from capa.rules import RuleSet from capa.engine import MatchResults from capa.features.address import ( @@ -90,14 +91,14 @@ def render_meta(ostream, doc): ("format", doc["meta"]["analysis"]["format"]), ("arch", doc["meta"]["analysis"]["arch"]), ("extractor", doc["meta"]["analysis"]["extractor"]), - ("base address", hex(doc["meta"]["analysis"]["base_address"])), + ("base address", format_address(rd.deserialize_address(doc["meta"]["analysis"]["base_address"]))), ("rules", "\n".join(doc["meta"]["analysis"]["rules"])), ("function count", len(doc["meta"]["analysis"]["feature_counts"]["functions"])), ("library function count", len(doc["meta"]["analysis"]["library_functions"])), ( "total feature count", doc["meta"]["analysis"]["feature_counts"]["file"] - + sum(doc["meta"]["analysis"]["feature_counts"]["functions"].values()), + + sum(map(lambda f: f["count"], doc["meta"]["analysis"]["feature_counts"]["functions"])), ), ] @@ -137,8 +138,8 @@ def render_rules(ostream, doc): rows.append((key, v)) if rule["meta"]["scope"] != capa.rules.FILE_SCOPE: - locations = doc["rules"][rule["meta"]["name"]]["matches"].keys() - rows.append(("matches", "\n".join(map(format_address, locations)))) + locations = list(map(lambda m: m[0], doc["rules"][rule["meta"]["name"]]["matches"])) + rows.append(("matches", "\n".join(map(lambda d: format_address(rd.deserialize_address(d)), locations)))) ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) ostream.write("\n") @@ -160,5 +161,5 @@ def render_verbose(doc): def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities) return render_verbose(doc) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 3a49c76d..81faf5f2 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -12,9 +12,10 @@ import capa.rules import capa.render.utils as rutils import capa.render.verbose import capa.features.common -import capa.render.result_document +import capa.render.result_document as rd from capa.rules import RuleSet from capa.engine import MatchResults +from capa.features.freeze import deserialize_address def render_locations(ostream, match): @@ -26,16 +27,16 @@ def render_locations(ostream, match): locations = list(sorted(match.get("locations", []))) if len(locations) == 1: ostream.write(" @ ") - ostream.write(v.format_address(locations[0])) + ostream.write(v.format_address(rd.deserialize_address(locations[0]))) elif len(locations) > 1: ostream.write(" @ ") if len(locations) > 4: # don't display too many locations, because it becomes very noisy. # probably only the first handful of locations will be useful for inspection. - ostream.write(", ".join(map(v.format_address, locations[0:4]))) + ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations[0:4]))) ostream.write(", and %d more..." % (len(locations) - 4)) else: - ostream.write(", ".join(map(v.format_address, locations))) + ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations))) def render_statement(ostream, match, statement, indent=0): @@ -211,9 +212,12 @@ def render_rules(ostream, doc): api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895 """ functions_by_bb = {} - for function, info in doc["meta"]["analysis"]["layout"]["functions"].items(): - for bb in info["matched_basic_blocks"]: - functions_by_bb[bb] = function + for finfo in doc["meta"]["analysis"]["layout"]["functions"]: + faddress = rd.deserialize_address(finfo["address"]) + + for bb in finfo["matched_basic_blocks"]: + bbaddress = rd.deserialize_address(bb["address"]) + functions_by_bb[bbaddress] = faddress had_match = False @@ -264,16 +268,19 @@ def render_rules(ostream, doc): ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) if rule["meta"]["scope"] == capa.rules.FILE_SCOPE: - matches = list(doc["rules"][rule["meta"]["name"]]["matches"].values()) + matches = doc["rules"][rule["meta"]["name"]]["matches"] if len(matches) != 1: # i think there should only ever be one match per file-scope rule, # because we do the file-scope evaluation a single time. # but i'm not 100% sure if this is/will always be true. # so, lets be explicit about our assumptions and raise an exception if they fail. raise RuntimeError("unexpected file scope match count: %d" % (len(matches))) - render_match(ostream, matches[0], indent=0) + first_address, first_match = matches[0] + render_match(ostream, first_match, indent=0) else: - for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()): + for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"]): + location = rd.deserialize_address(location) + ostream.write(rule["meta"]["scope"]) ostream.write(" @ ") ostream.write(capa.render.verbose.format_address(location)) @@ -302,5 +309,5 @@ def render_vverbose(doc): def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities) return render_vverbose(doc) diff --git a/tests/test_main.py b/tests/test_main.py index a4ab0551..3053f967 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -438,6 +438,9 @@ def test_json_meta(capsys): assert capa.main.main([path, "-j"]) == 0 std = capsys.readouterr() std_json = json.loads(std.out) - # remember: json can't have integer keys :-( - assert str(0x10001010) in std_json["meta"]["analysis"]["layout"]["functions"] - assert 0x10001179 in std_json["meta"]["analysis"]["layout"]["functions"][str(0x10001010)]["matched_basic_blocks"] + + assert ["absolute", 0x10001010] in map(lambda f: f["address"], std_json["meta"]["analysis"]["layout"]["functions"]) + + for addr, info in std_json["meta"]["analysis"]["layout"]["functions"]: + if addr == ["absolute", 0x10001010]: + assert {"address": ["absolute", 0x10001179]} in info["matched_basic_blocks"]