From d0345fcd539039da61c808f0819ced6260f54d28 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 26 Jun 2020 22:54:13 -0600 Subject: [PATCH] render: start to implement common result document format --- capa/main.py | 12 ++- capa/render/__init__.py | 222 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+), 3 deletions(-) create mode 100644 capa/render/__init__.py diff --git a/capa/main.py b/capa/main.py index 63d13cfb..f7d2dc6e 100644 --- a/capa/main.py +++ b/capa/main.py @@ -13,6 +13,7 @@ import argparse import capa.rules import capa.engine +import capa.render import capa.features import capa.features.freeze import capa.features.extractors @@ -110,6 +111,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None): matches.update(all_bb_matches) matches.update(all_function_matches) matches.update(all_file_matches) + return matches @@ -635,6 +637,8 @@ def main(argv=None): help='Path to rule file or directory, use embedded rules by default') parser.add_argument('-t', '--tag', type=str, help='Filter on rule meta field values') + parser.add_argument('--json', action='store_true', + help='Emit JSON instead of text') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') parser.add_argument('-vv', '--vverbose', action='store_true', @@ -735,12 +739,14 @@ def main(argv=None): if not (args.verbose or args.vverbose): return -1 + if args.json: + print(capa.render.render_json(rules, capabilities)) if args.vverbose: - render_capabilities_vverbose(rules, capabilities) + print(capa.render.render_vverbose(rules, capabilities)) elif args.verbose: - render_capabilities_verbose(rules, capabilities) + print(capa.render.render_verbose(rules, capabilities)) else: - render_capabilities_default(rules, capabilities) + print(capa.render.render_default(rules, capabilities)) logger.info('done.') diff --git a/capa/render/__init__.py b/capa/render/__init__.py new file mode 100644 index 00000000..a4ed629e --- /dev/null +++ b/capa/render/__init__.py @@ -0,0 +1,222 @@ +import json + +import capa.engine + + +def convert_statement_to_result_document(rules, statement): + """ + args: + rules (RuleSet): + node (Statement): + + returns: Dict[str, Any] + """ + if isinstance(statement, capa.engine.And): + return { + 'type': 'and', + } + elif isinstance(statement, capa.engine.Or): + return { + 'type': 'or', + } + elif isinstance(statement, capa.engine.Not): + return { + 'type': 'not', + } + elif isinstance(statement, capa.engine.Or): + return { + 'type': 'or', + } + elif isinstance(statement, capa.engine.Some) and statement.count == 0: + return { + 'type': 'optional' + } + elif isinstance(statement, capa.engine.Some) and statement.count > 0: + return { + 'type': 'some', + 'count': statement.count, + } + elif isinstance(statement, capa.engine.Range): + return { + 'type': 'range', + 'min': statement.min, + 'max': statement.max, + } + elif isinstance(statement, capa.engine.Regex): + return { + 'type': 'regex', + 'pattern': statement.pattern, + } + elif isinstance(statement, capa.engine.Subscope): + return { + 'type': 'subscope', + 'scope': statement.scope, + } + else: + raise RuntimeError("unexpected match statement type: " + str(statement)) + + +def convert_feature_to_result_document(rules, feature): + """ + args: + rules (RuleSet): + node (Feature): + + returns: Dict[str, Any] + """ + name, value = feature.freeze_serialize() + + name = name.lower() + if name == 'matchedrule': + name = 'match' + + if isinstance(value, list) and len(value) == 1: + value = value[0] + + if name == 'match': + rule_name = value + rule = rules[rule_name] + if rule.meta.get('capa/subscope-rule'): + name = rule.meta['scope'] + # TODO: link this logic together, when present + + return { + 'type': name, + name: value, + } + + +def convert_node_to_result_document(rules, node): + """ + + args: + rules (RuleSet): + node (Statement|Feature): + + returns: Dict[str, Any] + """ + + if isinstance(node, capa.engine.Statement): + return { + 'type': 'statement', + 'statement': convert_statement_to_result_document(rules, node), + } + elif isinstance(node, capa.features.Feature): + return { + 'type': 'feature', + 'feature': convert_feature_to_result_document(rules, node), + } + else: + raise RuntimeError("unexpected match node type") + + +def convert_match_to_result_document(rules, result): + """ + convert the given rule set and Result instance into a common, Python-native data structure. + this will become part of the "result document" format that can be emitted to JSON. + + args: + rules (RuleSet): + result (Result): + + returns: Dict[str, Any] + """ + doc = { + 'success': bool(result.success), + 'node': convert_node_to_result_document(rules, result.statement), + 'children': [ + convert_match_to_result_document(rules, child) + for child in result.children + ], + } + + if isinstance(result.statement, capa.features.Feature): + if bool(result.success): + doc['locations'] = result.locations + + # TODO: can a feature ever have children? suspect so with `match`? + + return doc + + +def convert_capabilities_to_result_document(rules, capabilities): + """ + convert the given rule set and capabilties result to a common, Python-native data structure. + this format can be directly emitted to JSON, or passed to the other `render_*` routines + to render as text. + + TODO: document the structure and provide examples + + schema: + + ```json + { + $rule-name: { + "meta": {...copied from rule.meta...}, + "matches: { + $address: {...TODO: match details...}, + ... + } + }, + ... + } + ``` + + args: + rules (RuleSet): + capabilities (Dict[str, List[Tuple[int, Result]]]): + + returns: Dict[str, Any] + """ + doc = {} + + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + + if rule.meta.get('capa/subscope-rule'): + continue + + doc[rule_name] = { + 'meta': dict(rule.meta), + 'matches': { + addr: convert_match_to_result_document(rules, match) + for (addr, match) in matches + }, + } + + return doc + + +def render_vverbose(rules, capabilities): + doc = convert_capabilities_to_result_document(rules, capabilities) + return '' + + +def render_verbose(rules, capabilities): + doc = convert_capabilities_to_result_document(rules, capabilities) + return '' + + +def render_default(rules, capabilities): + doc = convert_capabilities_to_result_document(rules, capabilities) + return '' + + +class CapaJsonObjectEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, (list, dict, str, unicode, int, float, bool, type(None))): + return json.JSONEncoder.default(self, obj) + elif isinstance(obj, set): + return list(sorted(obj)) + else: + # probably will TypeError + return json.JSONEncoder.default(self, obj) + + +def render_json(rules, capabilities): + return json.dumps( + convert_capabilities_to_result_document(rules, capabilities), + cls=CapaJsonObjectEncoder, + sort_keys=True, + indent=4, + )