mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
explorer: use main.get_rules and simplify cache
This commit is contained in:
@@ -8,26 +8,18 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import itertools
|
||||
import collections
|
||||
from typing import Set, Dict, List, Tuple, Union, Optional
|
||||
|
||||
import capa.engine
|
||||
from capa.rules import Rule, Scope, RuleSet
|
||||
from capa.rules import Scope, RuleSet
|
||||
from capa.engine import FeatureSet, MatchResults
|
||||
from capa.features.address import NO_ADDRESS, Address
|
||||
from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
|
||||
|
||||
|
||||
class CapaExplorerRuleSetCache:
|
||||
def __init__(self, rules: List[Rule]):
|
||||
# capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules
|
||||
self.rules: List[Rule] = copy.deepcopy(rules)
|
||||
self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules))
|
||||
|
||||
|
||||
class CapaRuleGenFeatureCacheNode:
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -27,7 +27,7 @@ import capa.render.json
|
||||
import capa.features.common
|
||||
import capa.render.result_document
|
||||
import capa.features.extractors.ida.extractor
|
||||
from capa.rules import Rule
|
||||
from capa.rules import Rule, RuleSet
|
||||
from capa.engine import FeatureSet
|
||||
from capa.ida.plugin.icon import QICON
|
||||
from capa.ida.plugin.view import (
|
||||
@@ -36,7 +36,7 @@ from capa.ida.plugin.view import (
|
||||
CapaExplorerRulegenPreview,
|
||||
CapaExplorerRulegenFeatures,
|
||||
)
|
||||
from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache
|
||||
from capa.ida.plugin.cache import CapaRuleGenFeatureCache
|
||||
from capa.ida.plugin.error import UserCancelledError
|
||||
from capa.ida.plugin.hooks import CapaExplorerIdaHooks
|
||||
from capa.ida.plugin.model import CapaExplorerDataModel
|
||||
@@ -160,7 +160,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
# caches used to speed up capa explorer analysis - these must be init to None
|
||||
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
|
||||
self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None
|
||||
self.ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
|
||||
self.rulegen_current_function: Optional[FunctionHandle] = None
|
||||
|
||||
@@ -525,8 +525,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
meta["prev_base"] = idaapi.get_imagebase()
|
||||
self.model_data.reset()
|
||||
|
||||
def load_capa_rules(self):
|
||||
"""load capa rules from directory specified by user, either using IDA UI or settings"""
|
||||
def ensure_capa_settings_rule_path(self):
|
||||
try:
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")):
|
||||
@@ -567,56 +566,26 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
if not os.path.exists(path):
|
||||
logger.error("rule path %s does not exist or cannot be accessed" % path)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def load_capa_rules(self):
|
||||
"""load capa rules from directory specified by user, either using IDA UI or settings"""
|
||||
if not self.ensure_capa_settings_rule_path():
|
||||
return False
|
||||
|
||||
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
|
||||
try:
|
||||
# following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows
|
||||
# user to cancel analysis from IDA UI
|
||||
if not os.path.exists(rule_path):
|
||||
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
|
||||
|
||||
rule_paths: List[str] = []
|
||||
if os.path.isfile(rule_path):
|
||||
rule_paths.append(rule_path)
|
||||
elif os.path.isdir(rule_path):
|
||||
for root, dirs, files in os.walk(rule_path):
|
||||
if ".git" in root:
|
||||
# the .github directory contains CI config in capa-rules
|
||||
# this includes some .yml files
|
||||
# these are not rules
|
||||
# additionally, .git has files that are not .yml and generate the warning
|
||||
# skip those too
|
||||
continue
|
||||
for file in files:
|
||||
if not file.endswith(".yml"):
|
||||
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
|
||||
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
|
||||
# other things maybe are rules, but are mis-named.
|
||||
logger.warning("skipping non-.yml file: %s", file)
|
||||
continue
|
||||
rule_path = os.path.join(root, file)
|
||||
rule_paths.append(rule_path)
|
||||
|
||||
rules: List[Rule] = []
|
||||
total_paths: int = len(rule_paths)
|
||||
for (i, rule_path) in enumerate(rule_paths):
|
||||
update_wait_box(
|
||||
"loading capa rules from %s (%d of %d)"
|
||||
% (settings.user[CAPA_SETTINGS_RULE_PATH], i + 1, total_paths)
|
||||
)
|
||||
def on_load_rule(rule_path, i, total):
|
||||
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total))
|
||||
if ida_kernwin.user_cancelled():
|
||||
raise UserCancelledError("user cancelled")
|
||||
try:
|
||||
rule = capa.rules.Rule.from_yaml_file(rule_path)
|
||||
except capa.rules.InvalidRule:
|
||||
raise
|
||||
else:
|
||||
rule.meta["capa/path"] = rule_path
|
||||
if capa.main.is_nursery_rule_path(rule_path):
|
||||
rule.meta["capa/nursery"] = True
|
||||
rules.append(rule)
|
||||
|
||||
# cache rules and rule set
|
||||
self.ruleset_cache = CapaExplorerRuleSetCache(rules)
|
||||
self.ruleset_cache = capa.main.get_rules([rule_path], disable_progress=True, on_load_rule=on_load_rule)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
@@ -686,6 +655,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# function should handle exceptions and return False
|
||||
if not self.load_capa_rules():
|
||||
return False
|
||||
assert self.ruleset_cache is not None
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.ruleset_cache)
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
@@ -694,17 +667,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box("extracting features")
|
||||
|
||||
try:
|
||||
# just generated above
|
||||
assert self.ruleset_cache is not None
|
||||
|
||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||
capabilities, counts = capa.main.find_capabilities(
|
||||
self.ruleset_cache.ruleset, extractor, disable_progress=True
|
||||
)
|
||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(
|
||||
self.ruleset_cache.ruleset, extractor, capabilities
|
||||
)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
@@ -735,7 +701,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
|
||||
|
||||
if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False):
|
||||
if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
except Exception as e:
|
||||
logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True)
|
||||
@@ -748,9 +714,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box("rendering results")
|
||||
|
||||
try:
|
||||
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
|
||||
meta, self.ruleset_cache.ruleset, capabilities
|
||||
)
|
||||
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(meta, ruleset, capabilities)
|
||||
except Exception as e:
|
||||
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
@@ -759,12 +723,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# either the results are cached and the doc already exists,
|
||||
# or the doc was just created above
|
||||
assert self.resdoc_cache is not None
|
||||
# same with rules cache, either it's cached or it was just loaded
|
||||
assert self.ruleset_cache is not None
|
||||
|
||||
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
|
||||
self.set_view_status_label(
|
||||
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
|
||||
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to render results (error: %s)", e, exc_info=True)
|
||||
@@ -809,6 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
else:
|
||||
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')
|
||||
|
||||
assert self.ruleset_cache is not None
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.ruleset_cache)
|
||||
|
||||
# clear feature cache
|
||||
if self.rulegen_feature_cache is not None:
|
||||
self.rulegen_feature_cache = None
|
||||
@@ -868,18 +835,16 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
all_function_features: FeatureSet = collections.defaultdict(set)
|
||||
try:
|
||||
assert self.ruleset_cache is not None
|
||||
|
||||
if self.rulegen_current_function is not None:
|
||||
_, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities(
|
||||
self.ruleset_cache.ruleset, self.rulegen_current_function
|
||||
ruleset, self.rulegen_current_function
|
||||
)
|
||||
all_function_features.update(
|
||||
self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function)
|
||||
)
|
||||
|
||||
for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()):
|
||||
rule = self.ruleset_cache.ruleset[name]
|
||||
rule = ruleset[name]
|
||||
if rule.is_subscope_rule():
|
||||
continue
|
||||
for (addr, _) in result:
|
||||
@@ -896,13 +861,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
all_file_features: FeatureSet = collections.defaultdict(set)
|
||||
try:
|
||||
assert self.ruleset_cache is not None
|
||||
|
||||
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset)
|
||||
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset)
|
||||
all_file_features.update(self.rulegen_feature_cache.get_all_file_features())
|
||||
|
||||
for (name, result) in file_matches.items():
|
||||
rule = self.ruleset_cache.ruleset[name]
|
||||
rule = ruleset[name]
|
||||
if rule.is_subscope_rule():
|
||||
continue
|
||||
for (addr, _) in result:
|
||||
@@ -928,7 +891,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.view_rulegen_features.load_features(all_file_features, all_function_features)
|
||||
|
||||
self.set_view_status_label(
|
||||
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
|
||||
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to render views (error: %s)", e, exc_info=True)
|
||||
@@ -1037,7 +1000,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
return
|
||||
|
||||
# we must create a deep copy of rules because any rule matching operations modify the original rule
|
||||
rules = copy.deepcopy(self.ruleset_cache.rules)
|
||||
rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()])
|
||||
rules.append(rule)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user