wip: result document: deserialize into capa object instances

This commit is contained in:
Willi Ballenthin
2023-04-03 15:04:15 +02:00
parent 43128404be
commit b09e3e69f2

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import datetime
import collections
from typing import Any, Dict, List, Tuple, Union, Optional
from pydantic import Field, BaseModel
@@ -262,6 +263,55 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N
assert_never(node)
def node_to_capa(
node: Node, children: List[Union[capa.engine.Statement, capa.engine.Feature]]
) -> Union[capa.engine.Statement, capa.engine.Feature]:
if isinstance(node, StatementNode):
if isinstance(node.statement, CompoundStatement):
if node.statement.type == CompoundStatementType.AND:
return capa.engine.And(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.OR:
return capa.engine.Or(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.NOT:
return capa.engine.Not(description=node.statement.description, child=children[0])
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)
else:
assert_never(node.statement.type)
elif isinstance(node.statement, SomeStatement):
return capa.engine.Some(
description=node.statement.description, count=node.statement.count, children=children
)
elif isinstance(node.statement, RangeStatement):
return capa.engine.Range(
description=node.statement.description,
min=node.statement.min,
max=node.statement.max,
child=node.statement.child.to_capa(),
)
elif isinstance(node.statement, SubscopeStatement):
raise NotImplementedError("deserializing subscope statements are not supported")
return capa.engine.Subscope(
description=node.statement.description, scope=node.statement.scope, child=children[0]
)
else:
assert_never(node.statement)
elif isinstance(node, FeatureNode):
return node.feature.to_capa()
else:
assert_never(node)
class Match(FrozenModel):
"""
args:
@@ -394,6 +444,42 @@ class Match(FrozenModel):
captures={capture: tuple(captures[capture]) for capture in captures},
)
def to_capa(self, rules: RuleSet) -> capa.engine.Result:
children = [child.to_capa(rules) for child in self.children]
statement = node_to_capa(self.node, [child.statement for child in children])
if isinstance(self.node, FeatureNode):
feature = self.node.feature
if isinstance(feature, (frzf.SubstringFeature, frzf.RegexFeature)):
matches = {capture: {loc.to_capa() for loc in locs} for capture, locs in self.captures.items()}
if isinstance(feature, frzf.SubstringFeature):
assert isinstance(statement, capa.features.common.Substring)
statement = capa.features.common._MatchedSubstring(statement, matches)
elif isinstance(feature, frzf.RegexFeature):
assert isinstance(statement, capa.features.common.Regex)
statement = capa.features.common._MatchedRegex(statement, matches)
else:
assert_never(feature)
if (
isinstance(self.node, FeatureNode)
and isinstance(self.node.feature, frzf.MatchFeature)
# only add subtree on success,
# because there won't be results for the other rule on failure.
and self.success
):
# TODO: work is needed fixup subscope matches here.
raise NotImplementedError("deserializing subscope matches are not yet supported")
return capa.features.common.Result(
success=self.success,
statement=statement,
locations={loc.to_capa() for loc in self.locations},
children=children,
)
def parse_parts_id(s: str):
id_ = ""
@@ -581,35 +667,16 @@ class ResultDocument(FrozenModel):
def to_capa(self) -> Tuple[Dict, Dict]:
meta = self.meta.to_capa()
capabilities: Dict[str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]] = {}
capabilities: Dict[
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
] = collections.defaultdict(list)
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_match.source) for rule_match in self.rules.values()])
for rule_name, rule_match in self.rules.items():
# Parse the YAML source into a Rule instance
rule = capa.rules.Rule.from_yaml(rule_match.source)
# Extract the capabilities from the RuleMatches object
for addr, match in rule_match.matches:
if isinstance(match.node, StatementNode):
if isinstance(match.node.statement, CompoundStatement):
statement = rule.statement
else:
statement = statement_from_capa(match.node.statement)
elif isinstance(match.node, FeatureNode):
statement = match.node.feature.to_capa()
if isinstance(statement, (capa.features.common.String, capa.features.common.Regex)):
statement.matches = match.captures
else:
raise ValueError("Invalid node type")
result: capa.engine.Result = match.to_capa(rules)
result = capa.features.common.Result(
statement=statement,
success=match.success,
locations={loc.to_capa() for loc in match.locations},
children=[],
)
if rule_name not in capabilities:
capabilities[rule_name] = []
capabilities[rule_name].append((addr.to_capa(), result))
return meta, capabilities