Merge pull request #1351 from mandiant/wb-mr-proto

WIP: proto translation
This commit is contained in:
Willi Ballenthin
2023-03-22 09:44:59 +01:00
committed by GitHub
9 changed files with 2017 additions and 1115 deletions

View File

@@ -3,8 +3,7 @@
## master (unreleased)
### New Features
- add protobuf format for result documents #1219 @williballenthin
- add protobuf format for result documents #1219 @williballenthin
### Breaking Changes

View File

@@ -44,7 +44,7 @@ def is_runtime_ida():
return True
def assert_never(value: NoReturn) -> NoReturn:
def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})"

View File

@@ -1,444 +0,0 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import sys
from typing import Dict, Union
from dataclasses import dataclass
import pydantic
import capa.render
import capa.render.utils
import capa.features.freeze
import capa.render.result_document
import capa.features.freeze.features
from capa.render.utils import StringIO
def emit_proto_enum(out: StringIO, enum):
# like: AddressType
title = enum["title"]
# like: ADDRESSTYPE
prefix = title.upper()
def render_value(value):
# like: ADDRESSTYPE_ABSOLUTE
return "%s_%s" % (prefix, value.upper().replace(" ", "_"))
# like:
#
# enum AddressType {
# ADDRESSTYPE_UNSPECIFIED = 0;
# ADDRESSTYPE_ABSOLUTE = 1;
# ADDRESSTYPE_RELATIVE = 2;
# ...
# }
out.writeln(f"enum {title} {{")
out.writeln(f' {render_value("unspecified")} = 0;')
for i, value in enumerate(enum["enum"]):
out.writeln(f" {render_value(value)} = {i + 1};")
out.writeln(f"}}")
out.writeln("")
def is_ref(prop):
return "$ref" in prop
def get_ref_type_name(prop):
# from: {"$ref": "#/definitions/Scope"}},
# to: "Scope"
assert is_ref(prop)
assert prop["$ref"].startswith("#/definitions/")
return prop["$ref"][len("#/definitions/") :]
def is_primitive_type(prop):
# things like: string, integer, bool, etc.
return "type" in prop and not prop["type"] == "object"
def is_custom_type(prop):
# struct-like things defined in the schema, like Features, etc.
return "type" in prop and prop["type"] == "object" and "additionalProperties" not in prop
def get_custom_type_name(prop):
return prop["title"]
def is_tuple(prop):
# a tuple is an array with a fixed size.
# the types of the elements can vary.
# we'll emit a custom message type for each tuple, like Pair_Address_Match.
#
# like:
#
# {"items": [{"$ref": "#/definitions/Address"},
# {"$ref": "#/definitions/Match"}],
# "maxItems": 2,
# "minItems": 2,
# "type": "array"},
if "type" not in prop:
return False
if prop["type"] != "array":
return False
if "maxItems" not in prop or "minItems" not in prop:
return False
if prop["maxItems"] != prop["minItems"]:
# tuples have a fixed size
return False
return True
def get_tuple_type_name(prop):
assert is_tuple(prop)
if prop["maxItems"] == 2:
base = "Pair"
else:
base = "Tuple"
# this won't work for nested tuples, but good enough for here.
return base + "_" + "_".join(get_type_name(item) for item in prop["items"])
def is_array(prop):
# an array is a sequence of elements of the same type.
# typically we can use a repeated field for this.
# note: there's a special case within maps, where the array elements are a custom wrapper type.
#
# like:
#
# {"items": {"type": "string"},
# "title": "Parts",
# "type": "array"},
if "type" not in prop:
return False
if prop["type"] != "array":
return False
if "maxItems" in prop and "minItems" in prop and prop["maxItems"] == prop["minItems"]:
# tuples have a fixed size, arrays are variable
return False
if not isinstance(prop["items"], dict):
# array elements have a fixed type
return False
return True
def is_map(prop):
# a map maps from string key to a fixed type.
# the value type cannot be repeated, so we'll emit a custom wrapper type.
#
# like:
#
# {"additionalProperties": {"items": {"$ref": "#/definitions/Address"},
# "type": "array"},
# "title": "Captures",
# "type": "object"},
return "type" in prop and prop["type"] == "object" and "additionalProperties" in prop
def get_primitive_type_name(prop):
assert is_primitive_type(prop)
if prop["type"] == "string":
return "string"
elif prop["type"] == "boolean":
return "bool"
elif prop["type"] == "integer":
# this integer has arbitrary range.
# but proto supports only i64 and u64.
# so we hook this specially, including within the translator.
return "Integer"
elif prop["type"] == "number":
# number: int | float
# we hook this specially
return "Number"
elif is_tuple(prop):
return get_tuple_type_name(prop)
elif is_array(prop):
aitem = prop["items"]
if is_primitive_type(aitem):
atype = get_primitive_type_name(prop["items"])
elif is_ref(aitem):
atype = get_ref_type_name(aitem)
elif is_custom_type(aitem):
atype = get_custom_type_name(aitem)
else:
raise NotImplementedError(aitem)
return f"repeated {atype}"
else:
raise NotImplementedError(prop["type"])
def get_type_name(prop):
if is_primitive_type(prop):
return get_primitive_type_name(prop)
elif is_custom_type(prop):
return get_custom_type_name(prop)
elif is_ref(prop):
return get_ref_type_name(prop)
else:
raise NotImplementedError(prop)
def is_union(prop):
# a union is a field that can be one of several types.
return "anyOf" in prop
def sanitize_prop_name(name):
# like: "analysis-conclusion" -> "analysis_conclusion"
# like: "att&ck" -> "attack"
# like: "capa/subscope" -> "capa-subscope"
# like: "function name" -> "function-name"
return name.replace("-", "_").replace("&", "a").replace("/", "_").replace(" ", "_")
def _find_capa_class(name):
# try to find the capa class that corresponds to the given name.
# we use this to find the class that defines the property order.
try:
return getattr(capa.render.result_document, name)
except AttributeError:
pass
try:
return getattr(capa.features.freeze, name)
except AttributeError:
pass
try:
return getattr(capa.features.freeze.features, name)
except AttributeError:
pass
raise NotImplementedError(name)
def _enum_properties(message):
"""enumerate the properties of the message defined, ordered by class declaration"""
# this is just for convenience.
# the order of properties provided by the class. guaranteed.
property_order = list(_find_capa_class(message["title"]).__signature__.parameters.keys())
# order of properties provided by pydantic. not guaranteed. the fallback.
# used when we can't figure out an alias, such as capa/subscope -> is_subscope.
properties = list(message["properties"].keys())
def get_property_index(name):
try:
# prefer the order of properties provided by the class.
return property_order.index(sanitize_prop_name(name))
except ValueError:
# fallback to whatever pydantic extracts.
return len(message["properties"]) + properties.index(name)
return sorted(message["properties"].items(), key=lambda p: get_property_index(p[0]))
@dataclass
class DeferredArrayType:
name: str
item: dict
@dataclass
class DeferredTupleType:
name: str
count: int
items: dict
def emit_proto_message(out: StringIO, deferred_types: Dict, message):
# like: Address
title = message["title"]
out.writeln(f"message {title} {{")
counter = iter(range(1, sys.maxsize))
for raw_name, prop in _enum_properties(message):
# we use a counter like this so that
# union/oneof fields can increment the counter.
i = next(counter)
name = sanitize_prop_name(raw_name)
if is_ref(prop):
ptype = get_ref_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
elif is_primitive_type(prop):
ptype = get_primitive_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
if is_tuple(prop):
deferred_types[ptype] = DeferredTupleType(ptype, prop["minItems"], prop["items"])
elif is_array(prop):
aitem = prop["items"]
if is_tuple(aitem):
atype = get_tuple_type_name(aitem)
deferred_types[atype] = DeferredTupleType(atype, aitem["minItems"], aitem["items"])
elif is_custom_type(prop):
ptype = get_custom_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
elif is_union(prop):
out.writeln(f" oneof {name} {{")
for j, of in enumerate(prop["anyOf"]):
if is_ref(of):
ptype = get_ref_type_name(of)
out.writeln(f" {ptype} v{j} = {i};")
elif is_primitive_type(of):
ptype = get_primitive_type_name(of)
out.writeln(f" {ptype} v{j} = {i};")
if is_tuple(of):
deferred_types[ptype] = DeferredTupleType(ptype, of["minItems"], of["items"])
# pydantic doesn't seem to encode None option
# fortunately, neither does protobuf.
# still seems weird not to be explicit.
else:
raise NotImplementedError(of)
i = next(counter)
out.writeln(f" }};")
elif is_map(prop):
if is_array(prop["additionalProperties"]):
# map values cannot be repeated, see:
# https://stackoverflow.com/a/41552990/87207
#
# so create a wrapper type around the repeated values.
# like: message Array_Integer { repeated int32 values = 1; }
#
# no:
#
# map <string, repeated int32> things = 1;
#
# yes:
#
# map <string, Array_Integer> things = 1;
#
# we could do this for every array, like Array_Integer and Array_Address,
# but its less idiomatic and more noisy.
# so we only create these types when we need them.
item_def = prop["additionalProperties"]["items"]
vtype = "Array_" + get_type_name(item_def)
# register this type to be emitted once we're done with the
# top level custom types in the schema.
deferred_types[vtype] = DeferredArrayType(vtype, item_def)
else:
vtype = get_type_name(prop["additionalProperties"])
out.writeln(f" map <string, {vtype}> {name} = {i};")
else:
raise ValueError("unexpected type: %s" % prop)
out.writeln(f"}}")
out.writeln("")
def emit_proto_entry(out: StringIO, deferred_types: Dict, schema, name):
if not name.startswith("#/definitions/"):
raise ValueError("unexpected name: %s" % name)
title = name[len("#/definitions/") :]
definition = schema["definitions"][title]
if definition["title"] != title:
raise ValueError("title mismatch: %s" % definition["title"])
if definition["type"] == "string" and "enum" in definition:
emit_proto_enum(out, definition)
elif definition["type"] == "object":
emit_proto_message(out, deferred_types, definition)
else:
raise NotImplementedError(definition["type"])
def generate_proto_from_pydantic(schema):
out: StringIO = capa.render.utils.StringIO()
out.writeln("// Generated by the capa.render.proto translator. DO NOT EDIT!")
out.writeln('syntax = "proto3";')
out.writeln("")
deferred_types: Dict[str, Union[DeferredArrayType, DeferredTupleType]] = dict()
for name in sorted(schema["definitions"].keys()):
emit_proto_entry(out, deferred_types, schema, "#/definitions/" + name)
for name, deferred_type in sorted(deferred_types.items()):
if isinstance(deferred_type, DeferredArrayType):
vtype = get_type_name(deferred_type.item)
out.writeln(f"message {name} {{ repeated {vtype} values = 1; }}\n")
elif isinstance(deferred_type, DeferredTupleType):
out.writeln(f"message {name} {{")
for i, item in enumerate(deferred_type.items):
vtype = get_type_name(item)
out.writeln(f" {vtype} v{i} = {i + 1};")
out.writeln(f"}}\n")
# these are additional primitive types that we'll use throughout.
out.writeln("message Integer { oneof value { uint64 u = 1; int64 i = 2; } }\n")
out.writeln("message Number { oneof value { uint64 u = 1; int64 i = 2; double f = 3; } }\n")
return out.getvalue()
def generate_proto() -> str:
"""
generate a protobuf v3 schema for the ResultDocument format.
we use introspection of the pydantic schema to generate this.
note: we *cannot* use the generated proto from version to version of capa,
because this translator does guarantee field ordering/numbering.
that is, if we add a new property to any of the pydantic models,
the proto field numbers may change, and any clients using the proto will break.
instead, we should use this method to generate the proto,
probably once per major version,
and then commit the proto to the repo.
"""
return generate_proto_from_pydantic(pydantic.schema_of(capa.render.result_document.ResultDocument))

View File

@@ -1,17 +1,16 @@
// Generated by the capa.render.proto translator. DO NOT EDIT!
syntax = "proto3";
message APIFeature {
string type = 1;
string api = 2;
string description = 3;
optional string description = 3;
}
message Address {
AddressType type = 1;
oneof value {
Integer v0 = 2;
Pair_Integer_Integer v1 = 3;
Integer v = 2; // TODO rename value?
Token_Offset token_offset = 3;
};
}
@@ -40,7 +39,7 @@ message Analysis {
message ArchFeature {
string type = 1;
string arch = 2;
string description = 3;
optional string description = 3;
}
message AttackSpec {
@@ -53,7 +52,7 @@ message AttackSpec {
message BasicBlockFeature {
string type = 1;
string description = 2;
optional string description = 2;
}
message BasicBlockLayout {
@@ -63,75 +62,86 @@ message BasicBlockLayout {
message BytesFeature {
string type = 1;
string bytes = 2;
string description = 3;
optional string description = 3;
}
message CharacteristicFeature {
string type = 1;
string characteristic = 2;
string description = 3;
optional string description = 3;
}
message ClassFeature {
string type = 1;
string description = 2;
string class = 3;
string class_ = 2; // class is protected Python keyword
optional string description = 3;
}
message CompoundStatement {
string type = 1;
string description = 2;
optional string description = 2;
}
message ExportFeature {
string type = 1;
string export = 2;
string description = 3;
optional string description = 3;
}
message FeatureCounts {
Integer file = 1;
uint64 file = 1;
repeated FunctionFeatureCount functions = 2;
}
message FeatureNode {
string type = 1;
/*
TODO results in
"feature": {
"type": "feature",
"api": { <---- ugh, but again this is how proto works and we can translate back using custom code?!
"type": "api",
"api": "ws2_32.recv",
"description": ""
}
},
*/
oneof feature {
OSFeature v0 = 1;
ArchFeature v1 = 2;
FormatFeature v2 = 3;
MatchFeature v3 = 4;
CharacteristicFeature v4 = 5;
ExportFeature v5 = 6;
ImportFeature v6 = 7;
SectionFeature v7 = 8;
FunctionNameFeature v8 = 9;
SubstringFeature v9 = 10;
RegexFeature v10 = 11;
StringFeature v11 = 12;
ClassFeature v12 = 13;
NamespaceFeature v13 = 14;
APIFeature v14 = 15;
PropertyFeature v15 = 16;
NumberFeature v16 = 17;
BytesFeature v17 = 18;
OffsetFeature v18 = 19;
MnemonicFeature v19 = 20;
OperandNumberFeature v20 = 21;
OperandOffsetFeature v21 = 22;
BasicBlockFeature v22 = 23;
OSFeature os = 2;
ArchFeature arch = 3;
FormatFeature format = 4;
MatchFeature match = 5;
CharacteristicFeature characteristic = 6;
ExportFeature export = 7;
ImportFeature import_ = 8; // import is Python keyword
SectionFeature section = 9;
FunctionNameFeature function_name = 10;
SubstringFeature substring = 11;
RegexFeature regex = 12;
StringFeature string = 13;
ClassFeature class_ = 14;
NamespaceFeature namespace = 15;
APIFeature api = 16;
PropertyFeature property = 17;
NumberFeature number = 18;
BytesFeature bytes = 19;
OffsetFeature offset = 20;
MnemonicFeature mnemonic = 21;
OperandNumberFeature operand_number = 22;
OperandOffsetFeature operand_offset = 23;
BasicBlockFeature basic_block = 24;
};
string type = 25;
}
message FormatFeature {
string type = 1;
string format = 2;
string description = 3;
optional string description = 3;
}
message FunctionFeatureCount {
Address address = 1;
Integer count = 2;
uint64 count = 2;
}
message FunctionLayout {
@@ -142,13 +152,13 @@ message FunctionLayout {
message FunctionNameFeature {
string type = 1;
string function_name = 2;
string description = 3;
optional string description = 3;
}
message ImportFeature {
string type = 1;
string description = 2;
string import = 3;
string import_ = 2;
optional string description = 3;
}
message Layout {
@@ -179,22 +189,22 @@ message MaecMetadata {
message Match {
bool success = 1;
oneof node {
StatementNode v0 = 2;
FeatureNode v1 = 3;
StatementNode statement = 2;
FeatureNode feature = 3;
};
repeated Match children = 5;
repeated Address locations = 6;
map <string, Array_Address> captures = 7;
map <string, Addresses> captures = 7;
}
message MatchFeature {
string type = 1;
string match = 2;
string description = 3;
optional string description = 3;
}
message Metadata {
string timestamp = 1;
string timestamp = 1; // google.protobuf.timestamp_pb2.Timestamp also would work, but seems more of a headache
string version = 2;
repeated string argv = 3;
Sample sample = 4;
@@ -204,93 +214,69 @@ message Metadata {
message MnemonicFeature {
string type = 1;
string mnemonic = 2;
string description = 3;
optional string description = 3;
}
message NamespaceFeature {
string type = 1;
string namespace = 2;
string description = 3;
optional string description = 3;
}
message NumberFeature {
string type = 1;
oneof number {
Integer v0 = 2;
Number v1 = 3;
};
string description = 5;
Number number = 2; // TODO can/should this be negative?
optional string description = 5;
}
message OSFeature {
string type = 1;
string os = 2;
string description = 3;
optional string description = 3;
}
message OffsetFeature {
string type = 1;
Integer offset = 2;
string description = 3;
Integer offset = 2; // offset can be negative
optional string description = 3;
}
message OperandNumberFeature {
string type = 1;
Integer index = 2;
Integer operand_number = 3;
string description = 4;
uint32 index = 2;
Integer operand_number = 3; // TODO can/should this be negative?
optional string description = 4;
}
message OperandOffsetFeature {
string type = 1;
Integer index = 2;
uint32 index = 2;
Integer operand_offset = 3;
string description = 4;
optional string description = 4;
}
message PropertyFeature {
string type = 1;
string access = 2;
optional string access = 2;
string property = 3;
string description = 4;
optional string description = 4;
}
message RangeStatement {
string description = 1;
Integer min = 2;
Integer max = 3;
oneof child {
OSFeature v0 = 4;
ArchFeature v1 = 5;
FormatFeature v2 = 6;
MatchFeature v3 = 7;
CharacteristicFeature v4 = 8;
ExportFeature v5 = 9;
ImportFeature v6 = 10;
SectionFeature v7 = 11;
FunctionNameFeature v8 = 12;
SubstringFeature v9 = 13;
RegexFeature v10 = 14;
StringFeature v11 = 15;
ClassFeature v12 = 16;
NamespaceFeature v13 = 17;
APIFeature v14 = 18;
PropertyFeature v15 = 19;
NumberFeature v16 = 20;
BytesFeature v17 = 21;
OffsetFeature v18 = 22;
MnemonicFeature v19 = 23;
OperandNumberFeature v20 = 24;
OperandOffsetFeature v21 = 25;
BasicBlockFeature v22 = 26;
};
string type = 28;
string type = 1;
optional string description = 2;
uint64 min = 3;
uint64 max = 4;
// reusing FeatureNode here to avoid duplication and list all features OSFeature, ArchFeature, ... again
// FeatureNode has an extra field `type` which is not present in the pydantic definition, we can
// set it to "" to get rid of it
FeatureNode child = 5;
}
message RegexFeature {
string type = 1;
string regex = 2;
string description = 3;
optional string description = 3;
}
message ResultDocument {
@@ -308,7 +294,7 @@ message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
Scope scope = 4; // TODO string scope -> easier translation to proto and from proto to json?!
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
@@ -316,7 +302,7 @@ message RuleMetadata {
string description = 9;
bool lib = 10;
MaecMetadata maec = 11;
bool capa_subscope = 12;
bool is_subscope_rule = 12;
}
message Sample {
@@ -327,7 +313,8 @@ message Sample {
}
enum Scope {
SCOPE_UNSPECIFIED = 0;
SCOPE_UNSPECIFIED = 0; // TODO do differently so json conversion works, currently gives `"scope": "SCOPE_FUNCTION"` which pydantic cannot parse
// could just make string?! and assert in code that it's one of supported values?!
SCOPE_FILE = 1;
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
@@ -337,56 +324,56 @@ enum Scope {
message SectionFeature {
string type = 1;
string section = 2;
string description = 3;
optional string description = 3;
}
message SomeStatement {
string description = 1;
Integer count = 2;
string type = 3;
string type = 1;
optional string description = 2;
uint32 count = 3;
}
message StatementNode {
oneof statement {
RangeStatement v0 = 1;
SomeStatement v1 = 2;
SubscopeStatement v2 = 3;
CompoundStatement v3 = 4;
string type = 1;
oneof statement { // TODO don't specify these and just set type?! well I guess this is how proto is supposed to work...
// so for json conversion we'll also need a translation function (at least for testing)
RangeStatement range = 2;
SomeStatement some = 3;
SubscopeStatement subscope = 4;
CompoundStatement compound = 5;
};
string type = 6;
}
message StringFeature {
string type = 1;
string string = 2;
string description = 3;
optional string description = 3;
}
message SubscopeStatement {
string description = 1;
Scope scope = 2;
string type = 3;
string type = 1;
optional string description = 2;
Scope scope = 3;
}
message SubstringFeature {
string type = 1;
string substring = 2;
string description = 3;
optional string description = 3;
}
message Array_Address { repeated Address values = 1; }
message Addresses { repeated Address address = 1; }
message Pair_Address_Match {
Address v0 = 1;
Match v1 = 2;
Address address = 1;
Match match = 2;
}
message Pair_Integer_Integer {
Integer v0 = 1;
Integer v1 = 2;
message Token_Offset {
Integer token = 1;
uint64 offset = 2; // offset is always >= 0
}
message Integer { oneof value { uint64 u = 1; int64 i = 2; } }
message Number { oneof value { uint64 u = 1; int64 i = 2; double f = 3; } }
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or signed int
message Number { oneof value { uint64 u = 1; sint64 i = 2; double f = 3; } }

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

459
capa/render/proto/proto.py Normal file
View File

@@ -0,0 +1,459 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Convert capa results to protobuf format.
The functionality here is similar to the various *from_capa functions, e.g. ResultDocument.from_capa() or
feature_from_capa.
For few classes we can rely on the proto json parser (e.g. RuleMetadata).
For most classes (e.g. RuleMatches) conversion is tricky, because we use natively unsupported types (e.g. tuples),
several classes with unions, and more complex layouts. So, it's more straight forward to convert explicitly vs.
massaging the data so the protobuf json parser works.
Of note, the 3 in `syntax = "proto3"` has nothing to do with the 2 in capa_pb2.py;
see details in https://github.com/grpc/grpc/issues/15444#issuecomment-396442980.
First compile the protobuf to generate an API file and a mypy stub file
$ protoc.exe --python_out=. --mypy_out=. <path_to_proto> (e.g. capa/render/proto/capa.proto)
Alternatively, --pyi_out=. can be used to generate a Python Interface file that supports development
"""
import sys
import json
import argparse
from typing import Dict, Union
import google.protobuf.json_format
from google.protobuf.json_format import MessageToJson
import capa.rules
import capa.features.freeze as frz
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.helpers import assert_never
from capa.features.freeze import AddressType
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = dict()
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError(f"value underflow: {v}")
if v > 0xFFFFFFFFFFFFFFFF:
raise ValueError(f"value overflow: {v}")
if v < 0:
return capa_pb2.Integer(i=v)
else:
return capa_pb2.Integer(u=v)
def number_to_pb2(v: Union[int, float]) -> capa_pb2.Number:
if isinstance(v, float):
return capa_pb2.Number(f=v)
elif isinstance(v, int):
i = int_to_pb2(v)
if v < 0:
return capa_pb2.Number(i=i.i)
else:
return capa_pb2.Number(u=i.u)
else:
assert_never(v)
def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
if addr.type is AddressType.ABSOLUTE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.RELATIVE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_RELATIVE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.FILE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_FILE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN_OFFSET:
assert isinstance(addr.value, tuple)
token, offset = addr.value
assert isinstance(token, int)
assert isinstance(offset, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET,
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
else:
assert_never(addr)
def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
if scope == capa.rules.Scope.FILE:
return capa_pb2.Scope.SCOPE_FILE
elif scope == capa.rules.Scope.FUNCTION:
return capa_pb2.Scope.SCOPE_FUNCTION
elif scope == capa.rules.Scope.BASIC_BLOCK:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
else:
assert_never(scope)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.dict(), capa_pb2.Sample()),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=meta.analysis.rules,
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
),
)
def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
return capa_pb2.StatementNode(
range=capa_pb2.RangeStatement(
type="range",
description=statement.description,
min=statement.min,
max=statement.max,
child=feature_to_pb2(statement.child),
),
type="statement",
)
elif isinstance(statement, rd.SomeStatement):
return capa_pb2.StatementNode(
some=capa_pb2.SomeStatement(type=statement.type, description=statement.description, count=statement.count),
type="statement",
)
elif isinstance(statement, rd.SubscopeStatement):
return capa_pb2.StatementNode(
subscope=capa_pb2.SubscopeStatement(
type=statement.type,
description=statement.description,
scope=scope_to_pb2(statement.scope),
),
type="statement",
)
elif isinstance(statement, rd.CompoundStatement):
return capa_pb2.StatementNode(
compound=capa_pb2.CompoundStatement(type=statement.type, description=statement.description),
type="statement",
)
else:
assert_never(statement)
def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode:
if isinstance(f, frzf.OSFeature):
return capa_pb2.FeatureNode(
type="feature", os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description)
)
elif isinstance(f, frzf.ArchFeature):
return capa_pb2.FeatureNode(
type="feature", arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description)
)
elif isinstance(f, frzf.FormatFeature):
return capa_pb2.FeatureNode(
type="feature", format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description)
)
elif isinstance(f, frzf.MatchFeature):
return capa_pb2.FeatureNode(
type="feature",
match=capa_pb2.MatchFeature(
type=f.type,
match=f.match,
description=f.description,
),
)
elif isinstance(f, frzf.CharacteristicFeature):
return capa_pb2.FeatureNode(
type="feature",
characteristic=capa_pb2.CharacteristicFeature(
type=f.type, characteristic=f.characteristic, description=f.description
),
)
elif isinstance(f, frzf.ExportFeature):
return capa_pb2.FeatureNode(
type="feature", export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description)
)
elif isinstance(f, frzf.ImportFeature):
return capa_pb2.FeatureNode(
type="feature", import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description)
)
elif isinstance(f, frzf.SectionFeature):
return capa_pb2.FeatureNode(
type="feature", section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description)
)
elif isinstance(f, frzf.FunctionNameFeature):
return capa_pb2.FeatureNode(
type="function name",
function_name=capa_pb2.FunctionNameFeature(
type=f.type, function_name=f.function_name, description=f.description
),
)
elif isinstance(f, frzf.SubstringFeature):
return capa_pb2.FeatureNode(
type="feature",
substring=capa_pb2.SubstringFeature(type=f.type, substring=f.substring, description=f.description),
)
elif isinstance(f, frzf.RegexFeature):
return capa_pb2.FeatureNode(
type="feature", regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description)
)
elif isinstance(f, frzf.StringFeature):
return capa_pb2.FeatureNode(
type="feature",
string=capa_pb2.StringFeature(
type=f.type,
string=f.string,
description=f.description,
),
)
elif isinstance(f, frzf.ClassFeature):
return capa_pb2.FeatureNode(
type="feature", class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description)
)
elif isinstance(f, frzf.NamespaceFeature):
return capa_pb2.FeatureNode(
type="feature",
namespace=capa_pb2.NamespaceFeature(type=f.type, namespace=f.namespace, description=f.description),
)
elif isinstance(f, frzf.APIFeature):
return capa_pb2.FeatureNode(
type="feature", api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description)
)
elif isinstance(f, frzf.PropertyFeature):
return capa_pb2.FeatureNode(
type="feature",
property=capa_pb2.PropertyFeature(
type=f.type, access=f.access, property=f.property, description=f.description
),
)
elif isinstance(f, frzf.NumberFeature):
return capa_pb2.FeatureNode(
type="feature",
number=capa_pb2.NumberFeature(type=f.type, number=number_to_pb2(f.number), description=f.description),
)
elif isinstance(f, frzf.BytesFeature):
return capa_pb2.FeatureNode(
type="feature", bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description)
)
elif isinstance(f, frzf.OffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
offset=capa_pb2.OffsetFeature(type=f.type, offset=int_to_pb2(f.offset), description=f.description),
)
elif isinstance(f, frzf.MnemonicFeature):
return capa_pb2.FeatureNode(
type="feature",
mnemonic=capa_pb2.MnemonicFeature(type=f.type, mnemonic=f.mnemonic, description=f.description),
)
elif isinstance(f, frzf.OperandNumberFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_number=capa_pb2.OperandNumberFeature(
type=f.type, index=f.index, operand_number=int_to_pb2(f.operand_number), description=f.description
),
)
elif isinstance(f, frzf.OperandOffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_offset=capa_pb2.OperandOffsetFeature(
type=f.type, index=f.index, operand_offset=int_to_pb2(f.operand_offset), description=f.description
),
)
elif isinstance(f, frzf.BasicBlockFeature):
return capa_pb2.FeatureNode(
type="feature", basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description)
)
else:
assert_never(f)
def node_to_pb2(node: rd.Node) -> Union[capa_pb2.FeatureNode, capa_pb2.StatementNode]:
if isinstance(node, rd.StatementNode):
return statement_to_pb2(node.statement)
elif isinstance(node, rd.FeatureNode):
return feature_to_pb2(node.feature)
else:
assert_never(node)
def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
node = node_to_pb2(match.node)
children = list(map(match_to_pb2, match.children))
locations = list(map(addr_to_pb2, match.locations))
if isinstance(node, capa_pb2.StatementNode):
return capa_pb2.Match(
success=match.success,
statement=node,
children=children,
locations=locations,
captures={},
)
elif isinstance(node, capa_pb2.FeatureNode):
return capa_pb2.Match(
success=match.success,
feature=node,
children=children,
locations=locations,
captures={
capture: capa_pb2.Addresses(address=list(map(addr_to_pb2, locs)))
for capture, locs in match.captures.items()
},
)
else:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.dict())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
rule_matches: Dict[str, capa_pb2.RuleMatches] = {}
for rule_name, matches in doc.rules.items():
m = capa_pb2.RuleMatches(
meta=rule_metadata_to_pb2(matches.meta),
source=matches.source,
matches=[
capa_pb2.Pair_Address_Match(address=addr_to_pb2(addr), match=match_to_pb2(match))
for addr, match in matches.matches
],
)
rule_matches[rule_name] = m
r = capa_pb2.ResultDocument(meta=metadata_to_pb2(doc.meta), rules=rule_matches)
return r
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="convert JSON result document to protobuf")
parser.add_argument("json_input", help="path to JSON result document to convert")
parser.add_argument("-j", "--json", action="store_true", help="emit JSON conversion of protobuf instead of text")
args = parser.parse_args(args=argv)
with open(args.json_input, "r", encoding="utf-8") as f:
fdata = f.read()
doc = rd.ResultDocument.parse_obj(json.loads(fdata))
proto_doc = doc_to_pb2(doc)
if args.json:
# TODO use ensure_ascii?
# including_default_value_fields -> so we get empty/unset fields
# see https://googleapis.dev/python/protobuf/latest/google/protobuf/json_format.html
json_obj = MessageToJson(
proto_doc, sort_keys=True, preserving_proto_field_name=True, including_default_value_fields=True
)
print(json_obj)
else:
print(proto_doc)
# TODO test?
# doc2 = rd.ResultDocument.parse_obj(json.loads(json_obj))
# doc2 = rd.ResultDocument.construct(json.loads(json_obj))
# assert doc == doc2
if __name__ == "__main__":
main()

View File

@@ -1103,11 +1103,39 @@ def _039a6_dotnetfile_extractor():
return get_dnfile_extractor(get_data_path_by_name("_039a6"))
@pytest.fixture
def pma0101_rd():
path = os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll.json")
def get_result_doc(path):
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def pma0101_rd():
# TODO move to rd subdir
return get_result_doc(os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_.json"))
@pytest.fixture
def dotnet_1c444e_rd():
return get_result_doc(os.path.join(CD, "data", "dotnet", "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json"))
@pytest.fixture
def a3f3bbc_rd():
return get_result_doc(os.path.join(CD, "data", "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json"))
@pytest.fixture
def al_khaserx86_rd():
return get_result_doc(os.path.join(CD, "data", "al-khaser_x86.exe_.json"))
@pytest.fixture
def al_khaserx64_rd():
return get_result_doc(os.path.join(CD, "data", "al-khaser_x64.exe_.json"))
@pytest.fixture
def a076114_rd():
return get_result_doc(os.path.join(CD, "data", "0761142efbda6c4b1e801223de723578.dll_.json"))

View File

@@ -1,4 +1,4 @@
# Copyright (C) 2023 FireEye, Inc. All Rights Reserved.
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -8,58 +8,307 @@
import json
import pathlib
import subprocess
from typing import Any
import pydantic
from fixtures import *
import capa.rules
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.render.proto.capa_pb2
import capa.render.result_document
import capa.features.address
import capa.render.proto.proto
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features
from capa.render.result_document import ResultDocument
# TODO test_proto_to_rd?
def test_generate_proto(tmp_path: pathlib.Path):
tmp_path.mkdir(exist_ok=True, parents=True)
proto_path = tmp_path / "capa.proto"
json_path = tmp_path / "capa.json"
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_doc_to_pb2(request, rd_file):
src: rd.ResultDocument = request.getfixturevalue(rd_file)
dst = capa.render.proto.proto.doc_to_pb2(src)
schema = pydantic.schema_of(capa.render.result_document.ResultDocument)
json_path.write_text(json.dumps(schema, indent=4))
assert_meta(src.meta, dst.meta)
proto = capa.render.proto.generate_proto()
for rule_name, matches in src.rules.items():
assert rule_name in dst.rules
print("=====================================")
print(proto_path)
print("-------------------------------------")
for i, line in enumerate(proto.split("\n")):
print(f" {i} | {line}")
print("=====================================")
proto_path.write_text(proto)
m: capa_pb2.RuleMetadata = dst.rules[rule_name].meta
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.proto.scope_to_pb2(matches.meta.scope) == m.scope
subprocess.run(
[
"protoc",
"-I=" + str(tmp_path),
"--python_out=" + str(tmp_path),
"--mypy_out=" + str(tmp_path),
str(proto_path),
],
check=True,
)
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
assert list(rd_attack.parts) == proto_attack.parts
assert rd_attack.tactic == proto_attack.tactic
assert rd_attack.technique == proto_attack.technique
assert rd_attack.subtechnique == proto_attack.subtechnique
pb = tmp_path / "capa_pb2.py"
print(pb.read_text())
print("=====================================")
assert len(matches.meta.mbc) == len(m.mbc)
for rd_mbc, proto_mbc in zip(matches.meta.mbc, m.mbc):
assert list(rd_mbc.parts) == proto_mbc.parts
assert rd_mbc.objective == proto_mbc.objective
assert rd_mbc.behavior == proto_mbc.behavior
assert rd_mbc.method == proto_mbc.method
assert rd_mbc.id == proto_mbc.id
assert list(matches.meta.references) == m.references
assert list(matches.meta.examples) == m.examples
assert matches.meta.description == m.description
assert matches.meta.lib == m.lib
assert matches.meta.is_subscope_rule == m.is_subscope_rule
assert cmp_optional(matches.meta.maec.analysis_conclusion, m.maec.analysis_conclusion)
assert cmp_optional(matches.meta.maec.analysis_conclusion_ov, m.maec.analysis_conclusion_ov)
assert cmp_optional(matches.meta.maec.malware_family, m.maec.malware_family)
assert cmp_optional(matches.meta.maec.malware_category, m.maec.malware_category)
assert cmp_optional(matches.meta.maec.malware_category_ov, m.maec.malware_category_ov)
assert matches.source == dst.rules[rule_name].source
assert len(matches.matches) == len(dst.rules[rule_name].matches)
for (addr, match), proto_match in zip(matches.matches, dst.rules[rule_name].matches):
assert capa.render.proto.proto.addr_to_pb2(addr) == proto_match.address
assert_match(match, proto_match.match)
def test_translate_to_proto(pma0101_rd: ResultDocument):
src = pma0101_rd
def test_addr_to_pb2():
a1 = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x400000))
a = capa.render.proto.proto.addr_to_pb2(a1)
assert a.type == capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
dst = capa.render.proto.capa_pb2.ResultDocument()
a2 = capa.features.freeze.Address.from_capa(capa.features.address.RelativeVirtualAddress(0x100))
a = capa.render.proto.proto.addr_to_pb2(a2)
assert a.type == capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
assert True
a3 = capa.features.freeze.Address.from_capa(capa.features.address.FileOffsetAddress(0x200))
a = capa.render.proto.proto.addr_to_pb2(a3)
assert a.type == capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
a4 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenAddress(0x123456))
a = capa.render.proto.proto.addr_to_pb2(a4)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
a5 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenOffsetAddress(0x123456, 0x10))
a = capa.render.proto.proto.addr_to_pb2(a5)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
a6 = capa.features.freeze.Address.from_capa(capa.features.address._NoAddress())
a = capa.render.proto.proto.addr_to_pb2(a6)
assert a.type == capa_pb2.ADDRESSTYPE_NO_ADDRESS
def test_scope_to_pb2():
assert capa.render.proto.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FILE_SCOPE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FUNCTION_SCOPE)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.proto.scope_to_pb2(capa.rules.Scope(capa.rules.BASIC_BLOCK_SCOPE)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.proto.scope_to_pb2(capa.rules.Scope(capa.rules.INSTRUCTION_SCOPE)) == capa_pb2.SCOPE_INSTRUCTION
def cmp_optional(a: Any, b: Any) -> bool:
# proto optional value gets deserialized to "" instead of None (used by pydantic)
a = a if a is not None else ""
return a == b
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
assert [] == dst.argv
else:
assert list(meta.argv) == dst.argv
assert meta.sample.md5 == dst.sample.md5
assert meta.sample.sha1 == dst.sample.sha1
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
assert meta.analysis.format == dst.analysis.format
assert meta.analysis.arch == dst.analysis.arch
assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
for rd_f, proto_f in zip(meta.analysis.layout.functions, dst.analysis.layout.functions):
assert capa.render.proto.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert meta.analysis.feature_counts.file == dst.analysis.feature_counts.file
assert len(meta.analysis.feature_counts.functions) == len(dst.analysis.feature_counts.functions)
for rd_cf, proto_cf in zip(meta.analysis.feature_counts.functions, dst.analysis.feature_counts.functions):
assert capa.render.proto.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(meta.analysis.library_functions) == len(dst.analysis.library_functions)
for rd_lf, proto_lf in zip(meta.analysis.library_functions, dst.analysis.library_functions):
assert capa.render.proto.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
assert ma.success == mb.success
# node
if isinstance(ma.node, rd.StatementNode):
assert_statement(ma.node, mb.statement)
elif isinstance(ma.node, rd.FeatureNode):
assert ma.node.type == mb.feature.type
assert_feature(ma.node.feature, mb.feature)
# children
assert len(ma.children) == len(mb.children)
for ca, cb in zip(ma.children, mb.children):
assert_match(ca, cb)
# locations
assert list(map(capa.render.proto.proto.addr_to_pb2, ma.locations)) == mb.locations
# captures
assert len(ma.captures) == len(mb.captures)
for capture, locs in ma.captures.items():
assert capture in mb.captures
assert list(map(capa.render.proto.proto.addr_to_pb2, locs)) == mb.captures[capture].address
def assert_feature(fa, fb):
# get field that has been set, e.g., os or api, to access inner fields
fb = getattr(fb, fb.WhichOneof("feature"))
assert fa.type == fb.type
assert cmp_optional(fa.description, fb.description)
if isinstance(fa, capa.features.freeze.features.OSFeature):
assert fa.os == fb.os
elif isinstance(fa, capa.features.freeze.features.ArchFeature):
assert fa.arch == fb.arch
elif isinstance(fa, capa.features.freeze.features.FormatFeature):
assert fa.format == fb.format
elif isinstance(fa, capa.features.freeze.features.MatchFeature):
assert fa.match == fb.match
elif isinstance(fa, capa.features.freeze.features.CharacteristicFeature):
assert fa.characteristic == fb.characteristic
elif isinstance(fa, capa.features.freeze.features.ExportFeature):
assert fa.export == fb.export
elif isinstance(fa, capa.features.freeze.features.ImportFeature):
assert fa.import_ == fb.import_ # or could use getattr
elif isinstance(fa, capa.features.freeze.features.SectionFeature):
assert fa.section == fb.section
elif isinstance(fa, capa.features.freeze.features.FunctionNameFeature):
assert fa.function_name == fb.function_name
elif isinstance(fa, capa.features.freeze.features.SubstringFeature):
assert fa.substring == fb.substring
elif isinstance(fa, capa.features.freeze.features.RegexFeature):
assert fa.regex == fb.regex
elif isinstance(fa, capa.features.freeze.features.StringFeature):
assert fa.string == fb.string
elif isinstance(fa, capa.features.freeze.features.ClassFeature):
assert fa.class_ == fb.class_
elif isinstance(fa, capa.features.freeze.features.NamespaceFeature):
assert fa.namespace == fb.namespace
elif isinstance(fa, capa.features.freeze.features.BasicBlockFeature):
pass
elif isinstance(fa, capa.features.freeze.features.APIFeature):
assert fa.api == fb.api
elif isinstance(fa, capa.features.freeze.features.PropertyFeature):
assert fa.property == fb.property
assert fa.access == fb.access
elif isinstance(fa, capa.features.freeze.features.NumberFeature):
# get number value of set field
n = getattr(fb.number, fb.number.WhichOneof("value"))
assert fa.number == n
elif isinstance(fa, capa.features.freeze.features.BytesFeature):
assert fa.bytes == fb.bytes
elif isinstance(fa, capa.features.freeze.features.OffsetFeature):
assert fa.offset == getattr(fb.offset, fb.offset.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.MnemonicFeature):
assert fa.mnemonic == fb.mnemonic
elif isinstance(fa, capa.features.freeze.features.OperandNumberFeature):
assert fa.index == fb.index
assert fa.operand_number == getattr(fb.operand_number, fb.operand_number.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.OperandOffsetFeature):
assert fa.index == fb.index
assert fa.operand_offset == getattr(fb.operand_offset, fb.operand_offset.WhichOneof("value"))
else:
raise NotImplementedError(f"unhandled feature: {type(fa)}: {fa}")
def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
assert a.type == b.type
sa = a.statement
sb = getattr(b, str(b.WhichOneof("statement")))
assert sa.type == sb.type
assert cmp_optional(sa.description, sb.description)
if isinstance(sa, rd.RangeStatement):
assert isinstance(sb, capa_pb2.RangeStatement)
assert sa.min == sb.min
assert sa.max == sa.max
assert_feature(sa.child, sb.child)
elif isinstance(sa, rd.SomeStatement):
assert sa.count == sb.count
elif isinstance(sa, rd.SubscopeStatement):
assert capa.render.proto.proto.scope_to_pb2(sa.scope) == sb.scope
elif isinstance(sa, rd.CompoundStatement):
# only has type and description tested above
pass
else:
# unhandled statement
assert False