mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Merge branch 'dynamic-feature-extraction' into dynamic-extractor
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
### Breaking Changes
|
||||
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat
|
||||
|
||||
### New Rules (8)
|
||||
### New Rules (9)
|
||||
|
||||
- load-code/shellcode/execute-shellcode-via-windows-callback-function ervin.ocampo@mandiant.com jakub.jozwiak@mandiant.com
|
||||
- nursery/execute-shellcode-via-indirect-call ronnie.salomonsen@mandiant.com
|
||||
@@ -19,9 +19,12 @@
|
||||
- nursery/hash-data-using-sha512managed-in-dotnet jonathanlepore@google.com
|
||||
- nursery/compiled-with-exescript jonathanlepore@google.com
|
||||
- nursery/check-for-sandbox-via-mac-address-ouis-in-dotnet jonathanlepore@google.com
|
||||
- host-interaction/hardware/enumerate-devices-by-category @mr-tz
|
||||
-
|
||||
|
||||
### Bug Fixes
|
||||
- extractor: add a Binary Ninja test that asserts its version #1487 @xusheng6
|
||||
- extractor: update Binary Ninja stack string detection after the new constant outlining feature #1473 @xusheng6
|
||||
- extractor: update vivisect Arch extraction #1334 @mr-tz
|
||||
- extractor: avoid Binary Ninja exception when analyzing certain files #1441 @xusheng6
|
||||
- symtab: fix struct.unpack() format for 64-bit ELF files @yelhamer
|
||||
@@ -85,12 +88,14 @@ Thanks for all the support, especially to @xusheng6, @captainGeech42, @ggold7046
|
||||
- nursery/contain-a-thread-local-storage-tls-section-in-dotnet michael.hunhoff@mandiant.com
|
||||
|
||||
### Bug Fixes
|
||||
- extractor: interface of cache modified to prevent extracting file and global features multiple times @stevemk14ebr
|
||||
- extractor: removed '.dynsym' as the library name for ELF imports #1318 @stevemk14ebr
|
||||
- extractor: fix vivisect loop detection corner case #1310 @mr-tz
|
||||
- match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff
|
||||
- extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
- rule generator plugin now loads faster when jumping between functions @stevemk14ebr
|
||||
- fix exception when plugin loaded in IDA hosted under idat #1341 @mike-hunhoff
|
||||
- improve embedded PE detection performance and reduce FP potential #1344 @mike-hunhoff
|
||||
|
||||
|
||||
@@ -11,10 +11,13 @@ import string
|
||||
import struct
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
from binaryninja import Function
|
||||
from binaryninja import Function, Settings
|
||||
from binaryninja import BasicBlock as BinjaBasicBlock
|
||||
from binaryninja import (
|
||||
BinaryView,
|
||||
DataBuffer,
|
||||
SymbolType,
|
||||
RegisterValueType,
|
||||
VariableSourceType,
|
||||
MediumLevelILSetVar,
|
||||
MediumLevelILOperation,
|
||||
@@ -28,6 +31,66 @@ from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
|
||||
|
||||
use_const_outline: bool = False
|
||||
settings: Settings = Settings()
|
||||
if settings.contains("analysis.outlining.builtins") and settings.get_bool("analysis.outlining.builtins"):
|
||||
use_const_outline = True
|
||||
|
||||
|
||||
def get_printable_len_ascii(s: bytes) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
count = 0
|
||||
for c in s:
|
||||
if c == 0:
|
||||
return count
|
||||
if c < 127 and chr(c) in string.printable:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def get_printable_len_wide(s: bytes) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
if all(c == 0x00 for c in s[1::2]):
|
||||
return get_printable_len_ascii(s[::2])
|
||||
return 0
|
||||
|
||||
|
||||
def get_stack_string_len(f: Function, il: MediumLevelILInstruction) -> int:
|
||||
bv: BinaryView = f.view
|
||||
|
||||
if il.operation != MediumLevelILOperation.MLIL_CALL:
|
||||
return 0
|
||||
|
||||
target = il.dest
|
||||
if target.operation not in [MediumLevelILOperation.MLIL_CONST, MediumLevelILOperation.MLIL_CONST_PTR]:
|
||||
return 0
|
||||
|
||||
addr = target.value.value
|
||||
sym = bv.get_symbol_at(addr)
|
||||
if not sym or sym.type != SymbolType.LibraryFunctionSymbol:
|
||||
return 0
|
||||
|
||||
if sym.name not in ["__builtin_strncpy", "__builtin_strcpy", "__builtin_wcscpy"]:
|
||||
return 0
|
||||
|
||||
if len(il.params) < 2:
|
||||
return 0
|
||||
|
||||
dest = il.params[0]
|
||||
if dest.operation != MediumLevelILOperation.MLIL_ADDRESS_OF:
|
||||
return 0
|
||||
|
||||
var = dest.src
|
||||
if var.source_type != VariableSourceType.StackVariableSourceType:
|
||||
return 0
|
||||
|
||||
src = il.params[1]
|
||||
if src.value.type != RegisterValueType.ConstantDataAggregateValue:
|
||||
return 0
|
||||
|
||||
s = f.get_constant_data(RegisterValueType.ConstantDataAggregateValue, src.value.value)
|
||||
return max(get_printable_len_ascii(bytes(s)), get_printable_len_wide(bytes(s)))
|
||||
|
||||
|
||||
def get_printable_len(il: MediumLevelILSetVar) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
@@ -82,8 +145,11 @@ def bb_contains_stackstring(f: Function, bb: MediumLevelILBasicBlock) -> bool:
|
||||
"""
|
||||
count = 0
|
||||
for il in bb:
|
||||
if is_mov_imm_to_stack(il):
|
||||
count += get_printable_len(il)
|
||||
if use_const_outline:
|
||||
count += get_stack_string_len(f, il)
|
||||
else:
|
||||
if is_mov_imm_to_stack(il):
|
||||
count += get_printable_len(il)
|
||||
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
return True
|
||||
|
||||
@@ -48,7 +48,8 @@ class CapaRuleGenFeatureCacheNode:
|
||||
|
||||
|
||||
class CapaRuleGenFeatureCache:
|
||||
def __init__(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
|
||||
def __init__(self, extractor: CapaExplorerFeatureExtractor):
|
||||
self.extractor = extractor
|
||||
self.global_features: FeatureSet = collections.defaultdict(set)
|
||||
|
||||
self.file_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(None, None)
|
||||
@@ -56,12 +57,11 @@ class CapaRuleGenFeatureCache:
|
||||
self.bb_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
|
||||
self.insn_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
|
||||
|
||||
self._find_global_features(extractor)
|
||||
self._find_file_features(extractor)
|
||||
self._find_function_and_below_features(fh_list, extractor)
|
||||
self._find_global_features()
|
||||
self._find_file_features()
|
||||
|
||||
def _find_global_features(self, extractor: CapaExplorerFeatureExtractor):
|
||||
for feature, addr in extractor.extract_global_features():
|
||||
def _find_global_features(self):
|
||||
for feature, addr in self.extractor.extract_global_features():
|
||||
# not all global features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
@@ -71,46 +71,45 @@ class CapaRuleGenFeatureCache:
|
||||
if feature not in self.global_features:
|
||||
self.global_features[feature] = set()
|
||||
|
||||
def _find_file_features(self, extractor: CapaExplorerFeatureExtractor):
|
||||
def _find_file_features(self):
|
||||
# not all file features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
for feature, addr in extractor.extract_file_features():
|
||||
for feature, addr in self.extractor.extract_file_features():
|
||||
if addr is not None:
|
||||
self.file_node.features[feature].add(addr)
|
||||
else:
|
||||
if feature not in self.file_node.features:
|
||||
self.file_node.features[feature] = set()
|
||||
|
||||
def _find_function_and_below_features(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
|
||||
for fh in fh_list:
|
||||
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
|
||||
def _find_function_and_below_features(self, fh: FunctionHandle):
|
||||
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
|
||||
|
||||
# extract basic block and below features
|
||||
for bbh in extractor.get_basic_blocks(fh):
|
||||
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
|
||||
# extract basic block and below features
|
||||
for bbh in self.extractor.get_basic_blocks(fh):
|
||||
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
|
||||
|
||||
# extract instruction features
|
||||
for ih in extractor.get_instructions(fh, bbh):
|
||||
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
|
||||
# extract instruction features
|
||||
for ih in self.extractor.get_instructions(fh, bbh):
|
||||
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
|
||||
|
||||
for feature, addr in extractor.extract_insn_features(fh, bbh, ih):
|
||||
inode.features[feature].add(addr)
|
||||
for feature, addr in self.extractor.extract_insn_features(fh, bbh, ih):
|
||||
inode.features[feature].add(addr)
|
||||
|
||||
self.insn_nodes[inode.address] = inode
|
||||
self.insn_nodes[inode.address] = inode
|
||||
|
||||
# extract basic block features
|
||||
for feature, addr in extractor.extract_basic_block_features(fh, bbh):
|
||||
bb_node.features[feature].add(addr)
|
||||
# extract basic block features
|
||||
for feature, addr in self.extractor.extract_basic_block_features(fh, bbh):
|
||||
bb_node.features[feature].add(addr)
|
||||
|
||||
# store basic block features in cache and function parent
|
||||
self.bb_nodes[bb_node.address] = bb_node
|
||||
# store basic block features in cache and function parent
|
||||
self.bb_nodes[bb_node.address] = bb_node
|
||||
|
||||
# extract function features
|
||||
for feature, addr in extractor.extract_function_features(fh):
|
||||
f_node.features[feature].add(addr)
|
||||
# extract function features
|
||||
for feature, addr in self.extractor.extract_function_features(fh):
|
||||
f_node.features[feature].add(addr)
|
||||
|
||||
self.func_nodes[f_node.address] = f_node
|
||||
self.func_nodes[f_node.address] = f_node
|
||||
|
||||
def _find_instruction_capabilities(
|
||||
self, ruleset: RuleSet, insn: CapaRuleGenFeatureCacheNode
|
||||
@@ -155,7 +154,7 @@ class CapaRuleGenFeatureCache:
|
||||
def find_code_capabilities(
|
||||
self, ruleset: RuleSet, fh: FunctionHandle
|
||||
) -> Tuple[FeatureSet, MatchResults, MatchResults, MatchResults]:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
|
||||
if f_node is None:
|
||||
return {}, {}, {}, {}
|
||||
|
||||
@@ -195,8 +194,16 @@ class CapaRuleGenFeatureCache:
|
||||
_, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS)
|
||||
return features, matches
|
||||
|
||||
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
|
||||
def _get_cached_func_node(self, fh: FunctionHandle) -> Optional[CapaRuleGenFeatureCacheNode]:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
|
||||
if f_node is None:
|
||||
# function is not in our cache, do extraction now
|
||||
self._find_function_and_below_features(fh)
|
||||
f_node = self.func_nodes.get(fh.address, None)
|
||||
return f_node
|
||||
|
||||
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
|
||||
if f_node is None:
|
||||
return {}
|
||||
|
||||
|
||||
@@ -192,8 +192,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# caches used to speed up capa explorer analysis - these must be init to None
|
||||
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
|
||||
self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
|
||||
self.rulegen_feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
|
||||
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
|
||||
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_current_function: Optional[FunctionHandle] = None
|
||||
|
||||
# models
|
||||
@@ -727,13 +729,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box(f"{text} ({self.process_count} of {self.process_total})")
|
||||
self.process_count += 1
|
||||
|
||||
update_wait_box("initializing feature extractor")
|
||||
|
||||
try:
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
self.feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.feature_extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s).", e, exc_info=True)
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -743,7 +743,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box("calculating analysis")
|
||||
|
||||
try:
|
||||
self.process_total += len(tuple(extractor.get_functions()))
|
||||
self.process_total += len(tuple(self.feature_extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
@@ -770,12 +770,13 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
try:
|
||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(
|
||||
ruleset, self.feature_extractor, disable_progress=True
|
||||
)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(ruleset, extractor, capabilities)
|
||||
|
||||
meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
@@ -978,26 +979,21 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.rulegen_ruleset_cache)
|
||||
|
||||
# clear feature cache
|
||||
if self.rulegen_feature_cache is not None:
|
||||
self.rulegen_feature_cache = None
|
||||
|
||||
# clear cached function
|
||||
if self.rulegen_current_function is not None:
|
||||
self.rulegen_current_function = None
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("Initializing feature extractor")
|
||||
|
||||
try:
|
||||
# must use extractor to get function, as capa analysis requires casted object
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
# these are init once objects, create on tab change
|
||||
if self.rulegen_feature_cache is None or self.rulegen_feature_extractor is None:
|
||||
try:
|
||||
update_wait_box("performing one-time file analysis")
|
||||
self.rulegen_feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.rulegen_feature_cache = CapaRuleGenFeatureCache(self.rulegen_feature_extractor)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
else:
|
||||
logger.info("Reusing prior rulegen cache")
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
@@ -1009,7 +1005,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
try:
|
||||
f = idaapi.get_func(idaapi.get_screen_ea())
|
||||
if f is not None:
|
||||
self.rulegen_current_function = extractor.get_function(f.start_ea)
|
||||
self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea)
|
||||
except Exception as e:
|
||||
logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e, exc_info=True)
|
||||
return False
|
||||
@@ -1018,21 +1014,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
# extract features
|
||||
try:
|
||||
fh_list: List[FunctionHandle] = []
|
||||
if self.rulegen_current_function is not None:
|
||||
fh_list.append(self.rulegen_current_function)
|
||||
|
||||
self.rulegen_feature_cache = CapaRuleGenFeatureCache(fh_list, extractor)
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract features (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("generating function rule matches")
|
||||
|
||||
all_function_features: FeatureSet = collections.defaultdict(set)
|
||||
@@ -1264,7 +1245,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
elif index == 1:
|
||||
self.set_view_status_label(self.view_status_label_rulegen_cache)
|
||||
self.view_status_label_analysis_cache = status_prev
|
||||
|
||||
self.view_reset_button.setText("Clear")
|
||||
|
||||
def slot_rulegen_editor_update(self):
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 5f433fdf8e...368a27e739
@@ -55,3 +55,9 @@ def test_standalone_binja_backend():
|
||||
CD = os.path.dirname(__file__)
|
||||
test_path = os.path.join(CD, "..", "tests", "data", "Practical Malware Analysis Lab 01-01.exe_")
|
||||
assert capa.main.main([test_path, "-b", capa.main.BACKEND_BINJA]) == 0
|
||||
|
||||
|
||||
@pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed")
|
||||
def test_binja_version():
|
||||
version = binaryninja.core_version_info()
|
||||
assert version.major == 3 and version.minor == 4
|
||||
|
||||
Reference in New Issue
Block a user