explorer: refactor rule generator caching and matching (#1251)

* explorer: refactor rule generator caching and matching

* fix #1246

* fix #1159
This commit is contained in:
Mike Hunhoff
2023-01-04 08:50:52 -07:00
committed by GitHub
parent 94a712b820
commit a286e066d1
6 changed files with 475 additions and 250 deletions

View File

@@ -78,6 +78,7 @@
- extractor: add support for COFF files and extern functions #1223 @mike-hunhoff
- doc: improve error messaging and documentation related to capa rule set #1249 @mike-hunhoff
- fix: assume 32-bit displacement for offsets #1250 @mike-hunhoff
- generator: refactor caching and matching #1251 @mike-hunhoff
### Development

228
capa/ida/plugin/cache.py Normal file
View File

@@ -0,0 +1,228 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from __future__ import annotations
import copy
import itertools
import collections
from typing import Set, Dict, List, Tuple, Union, Optional
import capa.engine
from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS, Address
from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
class CapaExplorerRuleSetCache:
def __init__(self, rules: List[Rule]):
# capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules
self.rules: List[Rule] = copy.deepcopy(rules)
self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules))
class CapaRuleGenFeatureCacheNode:
def __init__(
self,
inner: Optional[Union[FunctionHandle, BBHandle, InsnHandle]],
parent: Optional[CapaRuleGenFeatureCacheNode],
):
self.inner: Optional[Union[FunctionHandle, BBHandle, InsnHandle]] = inner
self.address = NO_ADDRESS if self.inner is None else self.inner.address
self.parent: Optional[CapaRuleGenFeatureCacheNode] = parent
if self.parent is not None:
self.parent.children.add(self)
self.features: FeatureSet = collections.defaultdict(set)
self.children: Set[CapaRuleGenFeatureCacheNode] = set()
def __hash__(self):
# TODO: unique enough?
return hash((self.address,))
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
# TODO: unique enough?
return self.address == other.address
class CapaRuleGenFeatureCache:
def __init__(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
self.global_features: FeatureSet = collections.defaultdict(set)
self.file_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(None, None)
self.func_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
self.bb_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
self.insn_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
self._find_global_features(extractor)
self._find_file_features(extractor)
self._find_function_and_below_features(fh_list, extractor)
def _find_global_features(self, extractor: CapaExplorerFeatureExtractor):
for (feature, addr) in extractor.extract_global_features():
# not all global features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
if addr is not None:
self.global_features[feature].add(addr)
else:
if feature not in self.global_features:
self.global_features[feature] = set()
def _find_file_features(self, extractor: CapaExplorerFeatureExtractor):
# not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
for (feature, addr) in extractor.extract_file_features():
if addr is not None:
self.file_node.features[feature].add(addr)
else:
if feature not in self.file_node.features:
self.file_node.features[feature] = set()
def _find_function_and_below_features(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
for fh in fh_list:
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
# extract basic block and below features
for bbh in extractor.get_basic_blocks(fh):
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
# extract instruction features
for ih in extractor.get_instructions(fh, bbh):
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
for (feature, addr) in extractor.extract_insn_features(fh, bbh, ih):
inode.features[feature].add(addr)
self.insn_nodes[inode.address] = inode
# extract basic block features
for (feature, addr) in extractor.extract_basic_block_features(fh, bbh):
bb_node.features[feature].add(addr)
# store basic block features in cache and function parent
self.bb_nodes[bb_node.address] = bb_node
# extract function features
for (feature, addr) in extractor.extract_function_features(fh):
f_node.features[feature].add(addr)
self.func_nodes[f_node.address] = f_node
def _find_instruction_capabilities(
self, ruleset: RuleSet, insn: CapaRuleGenFeatureCacheNode
) -> Tuple[FeatureSet, MatchResults]:
features: FeatureSet = collections.defaultdict(set)
for (feature, locs) in itertools.chain(insn.features.items(), self.global_features.items()):
features[feature].update(locs)
_, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address)
for (name, result) in matches.items():
rule = ruleset[name]
for (addr, _) in result:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches
def _find_basic_block_capabilities(
self, ruleset: RuleSet, bb: CapaRuleGenFeatureCacheNode
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
features: FeatureSet = collections.defaultdict(set)
insn_matches: MatchResults = collections.defaultdict(list)
for insn in bb.children:
ifeatures, imatches = self._find_instruction_capabilities(ruleset, insn)
for (feature, locs) in ifeatures.items():
features[feature].update(locs)
for (name, result) in imatches.items():
insn_matches[name].extend(result)
for (feature, locs) in itertools.chain(bb.features.items(), self.global_features.items()):
features[feature].update(locs)
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address)
for (name, result) in matches.items():
rule = ruleset[name]
for (loc, _) in result:
capa.engine.index_rule_matches(features, rule, [loc])
return features, matches, insn_matches
def find_code_capabilities(
self, ruleset: RuleSet, fh: FunctionHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults, MatchResults]:
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
if f_node is None:
return {}, {}, {}, {}
insn_matches: MatchResults = collections.defaultdict(list)
bb_matches: MatchResults = collections.defaultdict(list)
function_features: FeatureSet = collections.defaultdict(set)
for bb in f_node.children:
features, bmatches, imatches = self._find_basic_block_capabilities(ruleset, bb)
for (feature, locs) in features.items():
function_features[feature].update(locs)
for (name, result) in bmatches.items():
bb_matches[name].extend(result)
for (name, result) in imatches.items():
insn_matches[name].extend(result)
for (feature, locs) in itertools.chain(f_node.features.items(), self.global_features.items()):
function_features[feature].update(locs)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, f_node.address)
return function_features, function_matches, bb_matches, insn_matches
def find_file_capabilities(self, ruleset: RuleSet) -> Tuple[FeatureSet, MatchResults]:
features: FeatureSet = collections.defaultdict(set)
for func_node in self.file_node.children:
assert func_node.inner is not None
assert isinstance(func_node.inner, FunctionHandle)
func_features, _, _, _ = self.find_code_capabilities(ruleset, func_node.inner)
for (feature, locs) in func_features.items():
features[feature].update(locs)
for (feature, locs) in itertools.chain(self.file_node.features.items(), self.global_features.items()):
features[feature].update(locs)
_, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS)
return features, matches
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
if f_node is None:
return {}
all_function_features: FeatureSet = collections.defaultdict(set)
all_function_features.update(f_node.features)
for bb_node in f_node.children:
for i_node in bb_node.children:
for (feature, locs) in i_node.features.items():
all_function_features[feature].update(locs)
for (feature, locs) in bb_node.features.items():
all_function_features[feature].update(locs)
# include global features just once
for (feature, locs) in self.global_features.items():
all_function_features[feature].update(locs)
return all_function_features
def get_all_file_features(self):
yield from itertools.chain(self.file_node.features.items(), self.global_features.items())

13
capa/ida/plugin/error.py Normal file
View File

@@ -0,0 +1,13 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
class UserCancelledError(Exception):
"""throw exception when user cancels action"""
pass

View File

@@ -0,0 +1,44 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import ida_kernwin
from PyQt5 import QtCore
from capa.ida.plugin.error import UserCancelledError
from capa.features.extractors.ida.extractor import IdaFeatureExtractor
from capa.features.extractors.base_extractor import FunctionHandle
class CapaExplorerProgressIndicator(QtCore.QObject):
"""implement progress signal, used during feature extraction"""
progress = QtCore.pyqtSignal(str)
def update(self, text):
"""emit progress update
check if user cancelled action, raise exception for parent function to catch
"""
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")
self.progress.emit("extracting features from %s" % text)
class CapaExplorerFeatureExtractor(IdaFeatureExtractor):
"""subclass the IdaFeatureExtractor
track progress during feature extraction, also allow user to cancel feature extraction
"""
def __init__(self):
super().__init__()
self.indicator = CapaExplorerProgressIndicator()
def extract_function_features(self, fh: FunctionHandle):
self.indicator.update("function at 0x%X" % fh.inner.start_ea)
return super().extract_function_features(fh)

View File

@@ -11,7 +11,7 @@ import copy
import logging
import itertools
import collections
from typing import Any, Set, Dict, List, Optional
from typing import Any, List, Optional
import idaapi
import ida_kernwin
@@ -27,8 +27,8 @@ import capa.render.json
import capa.features.common
import capa.render.result_document
import capa.features.extractors.ida.extractor
from capa.rules import Rule
from capa.engine import FeatureSet
from capa.features.common import Feature
from capa.ida.plugin.icon import QICON
from capa.ida.plugin.view import (
CapaExplorerQtreeView,
@@ -36,10 +36,12 @@ from capa.ida.plugin.view import (
CapaExplorerRulegenPreview,
CapaExplorerRulegenFeatures,
)
from capa.features.address import NO_ADDRESS, Address
from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache
from capa.ida.plugin.error import UserCancelledError
from capa.ida.plugin.hooks import CapaExplorerIdaHooks
from capa.ida.plugin.model import CapaExplorerDataModel
from capa.ida.plugin.proxy import CapaExplorerRangeProxyModel, CapaExplorerSearchProxyModel
from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor
from capa.features.extractors.base_extractor import FunctionHandle
logger = logging.getLogger(__name__)
@@ -76,118 +78,11 @@ def trim_function_name(f, max_length=25):
return n
def find_func_features(fh: FunctionHandle, extractor):
""" """
func_features: Dict[Feature, Set[Address]] = collections.defaultdict(set)
bb_features: Dict[Address, Dict[Feature, Set[Address]]] = collections.defaultdict(dict)
for (feature, addr) in extractor.extract_function_features(fh):
func_features[feature].add(addr)
for bbh in extractor.get_basic_blocks(fh):
_bb_features: Dict[Feature, Set[Address]] = collections.defaultdict(set)
for (feature, addr) in extractor.extract_basic_block_features(fh, bbh):
_bb_features[feature].add(addr)
func_features[feature].add(addr)
for insn in extractor.get_instructions(fh, bbh):
for (feature, addr) in extractor.extract_insn_features(fh, bbh, insn):
_bb_features[feature].add(addr)
func_features[feature].add(addr)
bb_features[bbh.address] = _bb_features
return func_features, bb_features
def find_func_matches(f: FunctionHandle, ruleset, func_features, bb_features):
""" """
func_matches = collections.defaultdict(list)
bb_matches = collections.defaultdict(list)
# create copy of function features, to add rule matches for basic blocks
func_features = collections.defaultdict(set, copy.copy(func_features))
# find rule matches for basic blocks
for (bb, features) in bb_features.items():
_, matches = capa.engine.match(ruleset.basic_block_rules, features, bb)
for (name, res) in matches.items():
bb_matches[name].extend(res)
for (ea, _) in res:
func_features[capa.features.common.MatchedRule(name)].add(ea)
# find rule matches for function, function features include rule matches for basic blocks
_, matches = capa.engine.match(ruleset.function_rules, func_features, f.address)
for (name, res) in matches.items():
func_matches[name].extend(res)
return func_matches, bb_matches
def find_file_features(extractor):
""" """
file_features = collections.defaultdict(set) # type: FeatureSet
for (feature, addr) in extractor.extract_file_features():
if addr:
file_features[feature].add(addr)
else:
if feature not in file_features:
file_features[feature] = set()
return file_features
def find_file_matches(ruleset, file_features: FeatureSet):
""" """
_, matches = capa.engine.match(ruleset.file_rules, file_features, NO_ADDRESS)
return matches
def update_wait_box(text):
"""update the IDA wait box"""
ida_kernwin.replace_wait_box("capa explorer...%s" % text)
class UserCancelledError(Exception):
"""throw exception when user cancels action"""
pass
class CapaExplorerProgressIndicator(QtCore.QObject):
"""implement progress signal, used during feature extraction"""
progress = QtCore.pyqtSignal(str)
def __init__(self):
"""initialize signal object"""
super().__init__()
def update(self, text):
"""emit progress update
check if user cancelled action, raise exception for parent function to catch
"""
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")
self.progress.emit("extracting features from %s" % text)
class CapaExplorerFeatureExtractor(capa.features.extractors.ida.extractor.IdaFeatureExtractor):
"""subclass the IdaFeatureExtractor
track progress during feature extraction, also allow user to cancel feature extraction
"""
def __init__(self):
super().__init__()
self.indicator = CapaExplorerProgressIndicator()
def extract_function_features(self, fh: FunctionHandle):
self.indicator.update("function at 0x%X" % fh.inner.start_ea)
return super().extract_function_features(fh)
class QLineEditClicked(QtWidgets.QLineEdit):
def __init__(self, content, parent=None):
""" """
@@ -226,7 +121,7 @@ class CapaSettingsInputDialog(QtWidgets.QDialog):
)
self.edit_rules_link.setOpenExternalLinks(True)
scopes = ("file", "function", "basic block")
scopes = ("file", "function", "basic block", "instruction")
self.edit_rule_scope.addItems(scopes)
self.edit_rule_scope.setCurrentIndex(scopes.index(settings.user.get(CAPA_SETTINGS_RULEGEN_SCOPE, "function")))
@@ -262,11 +157,12 @@ class CapaExplorerForm(idaapi.PluginForm):
self.parent: Any # QtWidget
self.ida_hooks: CapaExplorerIdaHooks
self.doc: Optional[capa.render.result_document.ResultDocument] = None
self.rule_paths: Optional[List[str]]
self.rules_cache: Optional[List[capa.rules.Rule]]
self.ruleset_cache: Optional[capa.rules.RuleSet]
# caches used to speed up capa explorer analysis - these must be init to None
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
self.rulegen_current_function: Optional[FunctionHandle] = None
# models
self.model_data: CapaExplorerDataModel
@@ -287,16 +183,13 @@ class CapaExplorerForm(idaapi.PluginForm):
self.view_settings_button: QtWidgets.QPushButton
self.view_save_button: QtWidgets.QPushButton
# UI controls for rule generator
self.view_rulegen_preview: CapaExplorerRulegenPreview
self.view_rulegen_features: CapaExplorerRulegenFeatures
self.view_rulegen_editor: CapaExplorerRulegenEditor
self.view_rulegen_header_label: QtWidgets.QLabel
self.view_rulegen_search: QtWidgets.QLineEdit
self.view_rulegen_limit_features_by_ea: QtWidgets.QCheckBox
self.rulegen_current_function: Optional[FunctionHandle]
self.rulegen_bb_features_cache: Dict[Address, Dict[Feature, Set[Address]]] = {}
self.rulegen_func_features_cache: Dict[Feature, Set[Address]] = {}
self.rulegen_file_features_cache: Dict[Feature, Set[Address]] = {}
self.view_rulegen_status_label: QtWidgets.QLabel
self.Show()
@@ -633,11 +526,7 @@ class CapaExplorerForm(idaapi.PluginForm):
self.model_data.reset()
def load_capa_rules(self):
""" """
self.rule_paths = None
self.ruleset_cache = None
self.rules_cache = None
"""load capa rules from directory specified by user, either using IDA UI or settings"""
try:
# resolve rules directory - check self and settings first, then ask user
if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")):
@@ -678,13 +567,14 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("User cancelled analysis.")
return False
rule_path = settings.user[CAPA_SETTINGS_RULE_PATH]
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
try:
# TODO refactor: this first part is identical to capa.main.get_rules
# following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows
# user to cancel analysis from IDA UI
if not os.path.exists(rule_path):
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
rule_paths = []
rule_paths: List[str] = []
if os.path.isfile(rule_path):
rule_paths.append(rule_path)
elif os.path.isdir(rule_path):
@@ -706,8 +596,8 @@ class CapaExplorerForm(idaapi.PluginForm):
rule_path = os.path.join(root, file)
rule_paths.append(rule_path)
rules = []
total_paths = len(rule_paths)
rules: List[Rule] = []
total_paths: int = len(rule_paths)
for (i, rule_path) in enumerate(rule_paths):
update_wait_box(
"loading capa rules from %s (%d of %d)"
@@ -724,8 +614,9 @@ class CapaExplorerForm(idaapi.PluginForm):
if capa.main.is_nursery_rule_path(rule_path):
rule.meta["capa/nursery"] = True
rules.append(rule)
_rules = copy.copy(rules)
ruleset = capa.rules.RuleSet(_rules)
# cache rules and rule set
self.ruleset_cache = CapaExplorerRuleSetCache(rules)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -746,10 +637,6 @@ class CapaExplorerForm(idaapi.PluginForm):
settings.user[CAPA_SETTINGS_RULE_PATH] = ""
return False
self.rule_paths = rule_paths
self.ruleset_cache = ruleset
self.rules_cache = rules
return True
def load_capa_results(self, use_cache=False):
@@ -760,7 +647,7 @@ class CapaExplorerForm(idaapi.PluginForm):
"""
if not use_cache:
# new analysis, new doc
self.doc = None
self.resdoc_cache = None
self.process_total = 0
self.process_count = 1
@@ -789,7 +676,7 @@ class CapaExplorerForm(idaapi.PluginForm):
if not self.load_capa_rules():
return False
assert self.rules_cache is not None
# just generated above
assert self.ruleset_cache is not None
if ida_kernwin.user_cancelled():
@@ -799,10 +686,14 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("extracting features")
try:
meta = capa.ida.helpers.collect_metadata(self.rule_paths)
capabilities, counts = capa.main.find_capabilities(self.ruleset_cache, extractor, disable_progress=True)
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
capabilities, counts = capa.main.find_capabilities(
self.ruleset_cache.ruleset, extractor, disable_progress=True
)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(self.ruleset_cache, extractor, capabilities)
meta["analysis"]["layout"] = capa.main.compute_layout(
self.ruleset_cache.ruleset, extractor, capabilities
)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -833,7 +724,7 @@ class CapaExplorerForm(idaapi.PluginForm):
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
if capa.main.has_file_limitation(self.ruleset_cache, capabilities, is_standalone=False):
if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
except Exception as e:
logger.error("Failed to check for file limitations (error: %s)", e)
@@ -846,7 +737,9 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("rendering results")
try:
self.doc = capa.render.result_document.ResultDocument.from_capa(meta, self.ruleset_cache, capabilities)
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
meta, self.ruleset_cache.ruleset, capabilities
)
except Exception as e:
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
return False
@@ -854,14 +747,13 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
# either the results are cached and the doc already exists,
# or the doc was just created above
assert self.doc is not None
assert self.resdoc_cache is not None
# same with rules cache, either it's cached or it was just loaded
assert self.rules_cache is not None
assert self.ruleset_cache is not None
self.model_data.render_capa_doc(self.doc, self.view_show_results_by_function.isChecked())
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
self.set_view_status_label(
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.rules_cache))
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
)
except Exception as e:
logger.error("Failed to render results (error: %s)", e, exc_info=True)
@@ -901,19 +793,28 @@ class CapaExplorerForm(idaapi.PluginForm):
def load_capa_function_results(self):
""" """
if not self.rules_cache or not self.ruleset_cache:
# only reload rules if caches are empty
if self.ruleset_cache is None:
# only reload rules if cache is empty
if not self.load_capa_rules():
return False
else:
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')
assert self.rules_cache is not None
# cache is set or generated directly above
assert self.ruleset_cache is not None
# clear feature cache
if self.rulegen_feature_cache is not None:
self.rulegen_feature_cache = None
# clear cached function
if self.rulegen_current_function is not None:
self.rulegen_current_function = None
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("loading IDA extractor")
try:
@@ -926,92 +827,98 @@ class CapaExplorerForm(idaapi.PluginForm):
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("extracting function features")
update_wait_box("extracting features")
# resolve function selected in disassembly view
try:
f = idaapi.get_func(idaapi.get_screen_ea())
if f:
fh: Optional[FunctionHandle] = extractor.get_function(f.start_ea)
assert fh is not None
self.rulegen_current_function = fh
func_features, bb_features = find_func_features(fh, extractor)
self.rulegen_func_features_cache = collections.defaultdict(set, copy.copy(func_features))
self.rulegen_bb_features_cache = collections.defaultdict(dict, copy.copy(bb_features))
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("matching function/basic block rule scope")
try:
# add function and bb rule matches to function features, for display purposes
func_matches, bb_matches = find_func_matches(fh, self.ruleset_cache, func_features, bb_features)
for (name, addrs) in itertools.chain(func_matches.items(), bb_matches.items()):
rule = self.ruleset_cache[name]
if rule.is_subscope_rule():
continue
for (addr, _) in addrs:
func_features[capa.features.common.MatchedRule(name)].add(addr)
except Exception as e:
logger.error("Failed to match function/basic block rule scope (error: %s)", e)
return False
else:
fh = None
func_features = {}
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
if f is not None:
self.rulegen_current_function = extractor.get_function(f.start_ea)
except Exception as e:
logger.error("Failed to extract function features (error: %s)", e)
logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e)
return False
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("extracting file features")
# extract features
try:
file_features = find_file_features(extractor)
self.rulegen_file_features_cache = copy.copy(file_features)
fh_list: List[FunctionHandle] = []
if self.rulegen_current_function is not None:
fh_list.append(self.rulegen_current_function)
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("matching file rule scope")
self.rulegen_feature_cache = CapaRuleGenFeatureCache(fh_list, extractor)
except Exception as e:
logger.error("Failed to extract features (error: %s)", e, exc_info=True)
return False
try:
# add file matches to file features, for display purposes
for (name, addrs) in find_file_matches(self.ruleset_cache, file_features).items():
rule = self.ruleset_cache[name]
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("generating function rule matches")
all_function_features: FeatureSet = collections.defaultdict(set)
try:
if self.rulegen_current_function is not None:
_, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities(
self.ruleset_cache.ruleset, self.rulegen_current_function
)
all_function_features.update(
self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function)
)
for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()):
rule = self.ruleset_cache.ruleset[name]
if rule.is_subscope_rule():
continue
for (addr, _) in addrs:
file_features[capa.features.common.MatchedRule(name)].add(addr)
except Exception as e:
logger.error("Failed to match file scope rules (error: %s)", e)
return False
for (addr, _) in result:
all_function_features[capa.features.common.MatchedRule(name)].add(addr)
except Exception as e:
logger.error("Failed to extract file features (error: %s)", e)
logger.error("Failed to generate rule matches (error: %s)", e, exc_info=True)
return False
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("generating file rule matches")
all_file_features: FeatureSet = collections.defaultdict(set)
try:
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset)
all_file_features.update(self.rulegen_feature_cache.get_all_file_features())
for (name, result) in file_matches.items():
rule = self.ruleset_cache.ruleset[name]
if rule.is_subscope_rule():
continue
for (addr, _) in result:
all_file_features[capa.features.common.MatchedRule(name)].add(addr)
except Exception as e:
logger.error("Failed to generate file rule matches (error: %s)", e, exc_info=True)
return False
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("rendering views")
try:
# load preview and feature tree
self.view_rulegen_preview.load_preview_meta(
fh.address if fh else None,
self.rulegen_current_function.address if self.rulegen_current_function else None,
settings.user.get(CAPA_SETTINGS_RULEGEN_AUTHOR, "<insert_author>"),
settings.user.get(CAPA_SETTINGS_RULEGEN_SCOPE, "function"),
)
self.view_rulegen_features.load_features(file_features, func_features)
# self.view_rulegen_header_label.setText("Function Features (%s)" % trim_function_name(f))
self.view_rulegen_features.load_features(all_file_features, all_function_features)
self.set_view_status_label(
"capa rules directory: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.rules_cache))
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
)
except Exception as e:
logger.error("Failed to render views (error: %s)", e, exc_info=True)
@@ -1058,16 +965,11 @@ class CapaExplorerForm(idaapi.PluginForm):
self.view_rulegen_limit_features_by_ea.setChecked(False)
self.set_rulegen_preview_border_neutral()
self.rulegen_current_function = None
self.rulegen_func_features_cache = {}
self.rulegen_bb_features_cache = {}
self.rulegen_file_features_cache = {}
self.view_rulegen_status_label.clear()
if not is_analyze:
# clear rules and rule set cache only if user clicked "Reset"
self.rules_cache = None
# clear rules and ruleset cache only if user clicked "Reset"
self.ruleset_cache = None
self.set_view_status_label("Click Analyze to get started...")
logger.info("Reset completed.")
@@ -1092,62 +994,83 @@ class CapaExplorerForm(idaapi.PluginForm):
""" """
self.view_rulegen_preview.setStyleSheet("border: 3px solid green")
def update_rule_status(self, rule_text):
def update_rule_status(self, rule_text: str):
""" """
assert self.rules_cache is not None
rule: capa.rules.Rule
rules: List[Rule]
ruleset: capa.rules.RuleSet
if not self.view_rulegen_editor.invisibleRootItem().childCount():
if self.view_rulegen_editor.invisibleRootItem().childCount() == 0:
# assume nothing to do if no items found in editor pane
self.set_rulegen_preview_border_neutral()
self.view_rulegen_status_label.clear()
return
# we don't expect either of these caches to be None at this point
if self.rulegen_feature_cache is None:
logger.error("feature cache is None (unexpected)")
self.set_rulegen_status("error: see console output for more details")
return
if self.ruleset_cache is None:
logger.error("ruleset cache is None (unexpected)")
self.set_rulegen_status("error: see console output for more details")
return
self.set_rulegen_preview_border_error()
try:
rule = capa.rules.Rule.from_yaml(rule_text)
except Exception as e:
self.set_rulegen_status("Failed to compile rule (%s)" % e)
self.set_rulegen_status(f"Failed to compile rule ({e})")
return
# create deep copy of current rules, add our new rule
rules = copy.copy(self.rules_cache)
if self.ruleset_cache is None:
logger.error("RuleSet cache is None (unexpected)")
return
# ensure subscope rules are included
for sub in rule.extract_subscope_rules():
rules.append(sub)
# include our new rule in the list
# we must create a deep copy of rules because any rule matching operations modify the original rule
rules = copy.deepcopy(self.ruleset_cache.rules)
rules.append(rule)
try:
file_features = copy.copy(dict(self.rulegen_file_features_cache))
if self.rulegen_current_function:
func_matches, bb_matches = find_func_matches(
self.rulegen_current_function,
capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name))),
self.rulegen_func_features_cache,
self.rulegen_bb_features_cache,
)
file_features.update(copy.copy(self.rulegen_func_features_cache))
else:
func_matches = {}
bb_matches = {}
_, file_matches = capa.engine.match(
capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name))).file_rules,
file_features,
NO_ADDRESS,
)
# create a new ruleset using our rule and its dependencies
ruleset = capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name)))
except Exception as e:
self.set_rulegen_status("Failed to match rule (%s)" % e)
self.set_rulegen_status(f"Failed to create ruleset ({e})")
return
if tuple(
filter(
lambda m: m[0] == rule.name,
itertools.chain(file_matches.items(), func_matches.items(), bb_matches.items()),
)
is_match: bool = False
if self.rulegen_current_function is not None and rule.scope in (
capa.rules.Scope.FUNCTION,
capa.rules.Scope.BASIC_BLOCK,
capa.rules.Scope.INSTRUCTION,
):
try:
_, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities(
ruleset, self.rulegen_current_function
)
except Exception as e:
self.set_rulegen_status(f"Failed to create function rule matches from rule set ({e})")
return
if rule.scope == capa.rules.Scope.FUNCTION and rule.name in func_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.BASIC_BLOCK and rule.name in bb_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.INSTRUCTION and rule.name in insn_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.FILE:
try:
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset)
except Exception as e:
self.set_rulegen_status(f"Failed to create file rule matches from rule set ({e})")
return
if rule.name in file_matches.keys():
is_match = True
else:
is_match = False
if is_match:
# made it here, rule compiled and match was found
self.set_rulegen_preview_border_success()
self.set_rulegen_status("Rule compiled and matched")
@@ -1210,11 +1133,11 @@ class CapaExplorerForm(idaapi.PluginForm):
def save_program_analysis(self):
""" """
if not self.doc:
if not self.resdoc_cache:
idaapi.info("No program analysis to save.")
return
s = self.doc.json().encode("utf-8")
s = self.resdoc_cache.json().encode("utf-8")
path = self.ask_user_capa_json_file()
if not path:
@@ -1263,7 +1186,7 @@ class CapaExplorerForm(idaapi.PluginForm):
@param state: checked state
"""
if self.doc:
if self.resdoc_cache is not None:
self.analyze_program(use_cache=True)
def limit_results_to_function(self, f):

View File

@@ -74,7 +74,7 @@ def parse_node_for_feature(feature, description, comment, depth):
if feature.startswith("#"):
display += "%s%s\n" % (" " * depth, feature)
elif description:
if feature.startswith(("- and", "- or", "- optional", "- basic block", "- not")):
if feature.startswith(("- and", "- or", "- optional", "- basic block", "- not", "- instruction:")):
display += "%s%s" % (" " * depth, feature)
if comment:
display += " # %s" % comment
@@ -428,6 +428,10 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
# add default child expression when nesting under basic block
new_parent.setExpanded(True)
new_parent = self.new_expression_node(new_parent, ("- or:", ""))
elif "instruction" in action.data()[0]:
# add default child expression when nesting under instruction
new_parent.setExpanded(True)
new_parent = self.new_expression_node(new_parent, ("- or:", ""))
for o in self.get_features(selected=True):
# take child from its parent by index, add to new parent
@@ -448,6 +452,16 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
for child in children:
new_parent.addChild(child)
new_parent.setExpanded(True)
elif "instruction" in expression and "instruction" not in o.text(
CapaExplorerRulegenEditor.get_column_feature_index()
):
# current expression is "instruction", and not changing to "instruction" expression
children = o.takeChildren()
new_parent = self.new_expression_node(o, ("- or:", ""))
for child in children:
new_parent.addChild(child)
new_parent.setExpanded(True)
o.setText(CapaExplorerRulegenEditor.get_column_feature_index(), expression)
def slot_clear_all(self, action):
@@ -521,6 +535,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
("not", ("- not:",), self.slot_nest_features),
("optional", ("- optional:",), self.slot_nest_features),
("basic block", ("- basic block:",), self.slot_nest_features),
("instruction", ("- instruction:",), self.slot_nest_features),
)
# build submenu with modify actions
@@ -542,6 +557,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
("not", ("- not:", self.itemAt(pos)), self.slot_edit_expression),
("optional", ("- optional:", self.itemAt(pos)), self.slot_edit_expression),
("basic block", ("- basic block:", self.itemAt(pos)), self.slot_edit_expression),
("instruction", ("- instruction:", self.itemAt(pos)), self.slot_edit_expression),
)
# build submenu with modify actions
@@ -694,7 +710,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
node.setText(idx, text)
# we need to set our own type so we can control the GUI accordingly
if feature.startswith(("- and:", "- or:", "- not:", "- basic block:", "- optional:")):
if feature.startswith(("- and:", "- or:", "- not:", "- basic block:", "- instruction:", "- optional:")):
setattr(node, "capa_type", CapaExplorerRulegenEditor.get_node_type_expression())
elif feature.startswith("#"):
setattr(node, "capa_type", CapaExplorerRulegenEditor.get_node_type_comment())