explorer: optimize cache and extractor interface (#1470)

* Optimize cache and extractor interface

* Update changelog

* Run linter formatters

* Implement review feedback

* Move rulegen extractor construction to tab change

* Change rulegen cache construction behavior

* Adjust return values for CR, format

* Fix mypy errors

* Format

* Fix merge

---------

Co-authored-by: Stephen Eckels <stephen.eckels@mandiant.com>
This commit is contained in:
Stephen Eckels
2023-06-13 14:00:06 -04:00
committed by Yacine Elhamer
parent 51faaae1d0
commit 6e3b1bc240
3 changed files with 63 additions and 74 deletions

View File

@@ -88,12 +88,14 @@ Thanks for all the support, especially to @xusheng6, @captainGeech42, @ggold7046
- nursery/contain-a-thread-local-storage-tls-section-in-dotnet michael.hunhoff@mandiant.com
### Bug Fixes
- extractor: interface of cache modified to prevent extracting file and global features multiple times @stevemk14ebr
- extractor: removed '.dynsym' as the library name for ELF imports #1318 @stevemk14ebr
- extractor: fix vivisect loop detection corner case #1310 @mr-tz
- match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff
- extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6
### capa explorer IDA Pro plugin
- rule generator plugin now loads faster when jumping between functions @stevemk14ebr
- fix exception when plugin loaded in IDA hosted under idat #1341 @mike-hunhoff
- improve embedded PE detection performance and reduce FP potential #1344 @mike-hunhoff

View File

@@ -48,7 +48,8 @@ class CapaRuleGenFeatureCacheNode:
class CapaRuleGenFeatureCache:
def __init__(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
def __init__(self, extractor: CapaExplorerFeatureExtractor):
self.extractor = extractor
self.global_features: FeatureSet = collections.defaultdict(set)
self.file_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(None, None)
@@ -56,12 +57,11 @@ class CapaRuleGenFeatureCache:
self.bb_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
self.insn_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
self._find_global_features(extractor)
self._find_file_features(extractor)
self._find_function_and_below_features(fh_list, extractor)
self._find_global_features()
self._find_file_features()
def _find_global_features(self, extractor: CapaExplorerFeatureExtractor):
for feature, addr in extractor.extract_global_features():
def _find_global_features(self):
for feature, addr in self.extractor.extract_global_features():
# not all global features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
@@ -71,46 +71,45 @@ class CapaRuleGenFeatureCache:
if feature not in self.global_features:
self.global_features[feature] = set()
def _find_file_features(self, extractor: CapaExplorerFeatureExtractor):
def _find_file_features(self):
# not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
for feature, addr in extractor.extract_file_features():
for feature, addr in self.extractor.extract_file_features():
if addr is not None:
self.file_node.features[feature].add(addr)
else:
if feature not in self.file_node.features:
self.file_node.features[feature] = set()
def _find_function_and_below_features(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
for fh in fh_list:
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
def _find_function_and_below_features(self, fh: FunctionHandle):
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
# extract basic block and below features
for bbh in extractor.get_basic_blocks(fh):
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
# extract basic block and below features
for bbh in self.extractor.get_basic_blocks(fh):
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
# extract instruction features
for ih in extractor.get_instructions(fh, bbh):
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
# extract instruction features
for ih in self.extractor.get_instructions(fh, bbh):
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
for feature, addr in extractor.extract_insn_features(fh, bbh, ih):
inode.features[feature].add(addr)
for feature, addr in self.extractor.extract_insn_features(fh, bbh, ih):
inode.features[feature].add(addr)
self.insn_nodes[inode.address] = inode
self.insn_nodes[inode.address] = inode
# extract basic block features
for feature, addr in extractor.extract_basic_block_features(fh, bbh):
bb_node.features[feature].add(addr)
# extract basic block features
for feature, addr in self.extractor.extract_basic_block_features(fh, bbh):
bb_node.features[feature].add(addr)
# store basic block features in cache and function parent
self.bb_nodes[bb_node.address] = bb_node
# store basic block features in cache and function parent
self.bb_nodes[bb_node.address] = bb_node
# extract function features
for feature, addr in extractor.extract_function_features(fh):
f_node.features[feature].add(addr)
# extract function features
for feature, addr in self.extractor.extract_function_features(fh):
f_node.features[feature].add(addr)
self.func_nodes[f_node.address] = f_node
self.func_nodes[f_node.address] = f_node
def _find_instruction_capabilities(
self, ruleset: RuleSet, insn: CapaRuleGenFeatureCacheNode
@@ -155,7 +154,7 @@ class CapaRuleGenFeatureCache:
def find_code_capabilities(
self, ruleset: RuleSet, fh: FunctionHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults, MatchResults]:
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
if f_node is None:
return {}, {}, {}, {}
@@ -195,8 +194,16 @@ class CapaRuleGenFeatureCache:
_, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS)
return features, matches
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
def _get_cached_func_node(self, fh: FunctionHandle) -> Optional[CapaRuleGenFeatureCacheNode]:
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
if f_node is None:
# function is not in our cache, do extraction now
self._find_function_and_below_features(fh)
f_node = self.func_nodes.get(fh.address, None)
return f_node
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
if f_node is None:
return {}

View File

@@ -192,8 +192,10 @@ class CapaExplorerForm(idaapi.PluginForm):
# caches used to speed up capa explorer analysis - these must be init to None
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
self.feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
self.rulegen_feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
self.rulegen_current_function: Optional[FunctionHandle] = None
# models
@@ -727,13 +729,11 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box(f"{text} ({self.process_count} of {self.process_total})")
self.process_count += 1
update_wait_box("initializing feature extractor")
try:
extractor = CapaExplorerFeatureExtractor()
extractor.indicator.progress.connect(slot_progress_feature_extraction)
self.feature_extractor = CapaExplorerFeatureExtractor()
self.feature_extractor.indicator.progress.connect(slot_progress_feature_extraction)
except Exception as e:
logger.error("Failed to initialize feature extractor (error: %s).", e, exc_info=True)
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
return False
if ida_kernwin.user_cancelled():
@@ -743,7 +743,7 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("calculating analysis")
try:
self.process_total += len(tuple(extractor.get_functions()))
self.process_total += len(tuple(self.feature_extractor.get_functions()))
except Exception as e:
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
return False
@@ -770,12 +770,13 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
capabilities, counts = capa.main.find_capabilities(
ruleset, self.feature_extractor, disable_progress=True
)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(ruleset, extractor, capabilities)
meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -978,26 +979,21 @@ class CapaExplorerForm(idaapi.PluginForm):
# so we'll work with a local copy of the ruleset.
ruleset = copy.deepcopy(self.rulegen_ruleset_cache)
# clear feature cache
if self.rulegen_feature_cache is not None:
self.rulegen_feature_cache = None
# clear cached function
if self.rulegen_current_function is not None:
self.rulegen_current_function = None
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("Initializing feature extractor")
try:
# must use extractor to get function, as capa analysis requires casted object
extractor = CapaExplorerFeatureExtractor()
except Exception as e:
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
return False
# these are init once objects, create on tab change
if self.rulegen_feature_cache is None or self.rulegen_feature_extractor is None:
try:
update_wait_box("performing one-time file analysis")
self.rulegen_feature_extractor = CapaExplorerFeatureExtractor()
self.rulegen_feature_cache = CapaRuleGenFeatureCache(self.rulegen_feature_extractor)
except Exception as e:
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
return False
else:
logger.info("Reusing prior rulegen cache")
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
@@ -1009,7 +1005,7 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
f = idaapi.get_func(idaapi.get_screen_ea())
if f is not None:
self.rulegen_current_function = extractor.get_function(f.start_ea)
self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea)
except Exception as e:
logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e, exc_info=True)
return False
@@ -1018,21 +1014,6 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("User cancelled analysis.")
return False
# extract features
try:
fh_list: List[FunctionHandle] = []
if self.rulegen_current_function is not None:
fh_list.append(self.rulegen_current_function)
self.rulegen_feature_cache = CapaRuleGenFeatureCache(fh_list, extractor)
except Exception as e:
logger.error("Failed to extract features (error: %s)", e, exc_info=True)
return False
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
return False
update_wait_box("generating function rule matches")
all_function_features: FeatureSet = collections.defaultdict(set)
@@ -1264,7 +1245,6 @@ class CapaExplorerForm(idaapi.PluginForm):
elif index == 1:
self.set_view_status_label(self.view_status_label_rulegen_cache)
self.view_status_label_analysis_cache = status_prev
self.view_reset_button.setText("Clear")
def slot_rulegen_editor_update(self):