mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 23:59:48 -08:00
Update black (#1307)
* build(deps-dev): bump black from 22.12.0 to 23.1.0 Bumps [black](https://github.com/psf/black) from 22.12.0 to 23.1.0. - [Release notes](https://github.com/psf/black/releases) - [Changelog](https://github.com/psf/black/blob/main/CHANGES.md) - [Commits](https://github.com/psf/black/compare/22.12.0...23.1.0) --- updated-dependencies: - dependency-name: black dependency-type: direct:development update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * reformat black 23.1.0 --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
@@ -94,7 +94,7 @@ class DnfileFeatureExtractor(FeatureExtractor):
|
||||
def get_functions(self) -> Iterator[FunctionHandle]:
|
||||
# create a method lookup table
|
||||
methods: Dict[Address, FunctionHandle] = {}
|
||||
for (token, method) in get_dotnet_managed_method_bodies(self.pe):
|
||||
for token, method in get_dotnet_managed_method_bodies(self.pe):
|
||||
fh: FunctionHandle = FunctionHandle(
|
||||
address=DNTokenAddress(token),
|
||||
inner=method,
|
||||
|
||||
@@ -48,7 +48,7 @@ def extract_file_class_features(pe: dnfile.dnPE) -> Iterator[Tuple[Class, Addres
|
||||
|
||||
def extract_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, Address]]:
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for (feature, address) in file_handler(pe):
|
||||
for feature, address in file_handler(pe):
|
||||
yield feature, address
|
||||
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ def extract_function_loop(fh: FunctionHandle) -> Iterator[Tuple[Characteristic,
|
||||
|
||||
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for (feature, addr) in func_handler(fh):
|
||||
for feature, addr in func_handler(fh):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[DnType]:
|
||||
TypeName (index into String heap)
|
||||
TypeNamespace (index into String heap)
|
||||
"""
|
||||
for (rid, member_ref) in iter_dotnet_table(pe, dnfile.mdtable.MemberRef.number):
|
||||
for rid, member_ref in iter_dotnet_table(pe, dnfile.mdtable.MemberRef.number):
|
||||
assert isinstance(member_ref, dnfile.mdtable.MemberRefRow)
|
||||
|
||||
if not isinstance(member_ref.Class.row, dnfile.mdtable.TypeRefRow):
|
||||
@@ -151,7 +151,7 @@ def get_dotnet_methoddef_property_accessors(pe: dnfile.dnPE) -> Iterator[Tuple[i
|
||||
Method (index into the MethodDef table)
|
||||
Association (index into the Event or Property table; more precisely, a HasSemantics coded index)
|
||||
"""
|
||||
for (rid, method_semantics) in iter_dotnet_table(pe, dnfile.mdtable.MethodSemantics.number):
|
||||
for rid, method_semantics in iter_dotnet_table(pe, dnfile.mdtable.MethodSemantics.number):
|
||||
assert isinstance(method_semantics, dnfile.mdtable.MethodSemanticsRow)
|
||||
|
||||
if method_semantics.Association.row is None:
|
||||
@@ -189,13 +189,13 @@ def get_dotnet_managed_methods(pe: dnfile.dnPE) -> Iterator[DnType]:
|
||||
MethodList (index into MethodDef table; it marks the first of a contiguous run of Methods owned by this Type)
|
||||
"""
|
||||
accessor_map: Dict[int, str] = {}
|
||||
for (methoddef, methoddef_access) in get_dotnet_methoddef_property_accessors(pe):
|
||||
for methoddef, methoddef_access in get_dotnet_methoddef_property_accessors(pe):
|
||||
accessor_map[methoddef] = methoddef_access
|
||||
|
||||
for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
|
||||
|
||||
for (idx, method) in enumerate(typedef.MethodList):
|
||||
for idx, method in enumerate(typedef.MethodList):
|
||||
if method.table is None:
|
||||
logger.debug("TypeDef[0x%X] MethodList[0x%X] table is None", rid, idx)
|
||||
continue
|
||||
@@ -225,10 +225,10 @@ def get_dotnet_fields(pe: dnfile.dnPE) -> Iterator[DnType]:
|
||||
TypeNamespace (index into String heap)
|
||||
FieldList (index into Field table; it marks the first of a contiguous run of Fields owned by this Type)
|
||||
"""
|
||||
for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
|
||||
|
||||
for (idx, field) in enumerate(typedef.FieldList):
|
||||
for idx, field in enumerate(typedef.FieldList):
|
||||
if field.table is None:
|
||||
logger.debug("TypeDef[0x%X] FieldList[0x%X] table is None", rid, idx)
|
||||
continue
|
||||
@@ -241,7 +241,7 @@ def get_dotnet_fields(pe: dnfile.dnPE) -> Iterator[DnType]:
|
||||
|
||||
def get_dotnet_managed_method_bodies(pe: dnfile.dnPE) -> Iterator[Tuple[int, CilMethodBody]]:
|
||||
"""get managed methods from MethodDef table"""
|
||||
for (rid, method_def) in iter_dotnet_table(pe, dnfile.mdtable.MethodDef.number):
|
||||
for rid, method_def in iter_dotnet_table(pe, dnfile.mdtable.MethodDef.number):
|
||||
assert isinstance(method_def, dnfile.mdtable.MethodDefRow)
|
||||
|
||||
if not method_def.ImplFlags.miIL or any((method_def.Flags.mdAbstract, method_def.Flags.mdPinvokeImpl)):
|
||||
@@ -268,7 +268,7 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod]
|
||||
ImportName (index into the String heap)
|
||||
ImportScope (index into the ModuleRef table)
|
||||
"""
|
||||
for (rid, impl_map) in iter_dotnet_table(pe, dnfile.mdtable.ImplMap.number):
|
||||
for rid, impl_map in iter_dotnet_table(pe, dnfile.mdtable.ImplMap.number):
|
||||
assert isinstance(impl_map, dnfile.mdtable.ImplMapRow)
|
||||
|
||||
module: str
|
||||
@@ -302,13 +302,13 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod]
|
||||
|
||||
def get_dotnet_types(pe: dnfile.dnPE) -> Iterator[DnType]:
|
||||
"""get .NET types from TypeDef and TypeRef tables"""
|
||||
for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
|
||||
|
||||
typedef_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid)
|
||||
yield DnType(typedef_token, typedef.TypeName, namespace=typedef.TypeNamespace)
|
||||
|
||||
for (rid, typeref) in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
assert isinstance(typeref, dnfile.mdtable.TypeRefRow)
|
||||
|
||||
typeref_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid)
|
||||
@@ -330,6 +330,6 @@ def iter_dotnet_table(pe: dnfile.dnPE, table_index: int) -> Iterator[Tuple[int,
|
||||
assert pe.net is not None
|
||||
assert pe.net.mdtables is not None
|
||||
|
||||
for (rid, row) in enumerate(pe.net.mdtables.tables.get(table_index, [])):
|
||||
for rid, row in enumerate(pe.net.mdtables.tables.get(table_index, [])):
|
||||
# .NET tables are 1-indexed
|
||||
yield rid + 1, row
|
||||
|
||||
@@ -212,7 +212,7 @@ def extract_unmanaged_call_characteristic_features(
|
||||
def extract_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract instruction features"""
|
||||
for inst_handler in INSTRUCTION_HANDLERS:
|
||||
for (feature, addr) in inst_handler(fh, bbh, ih):
|
||||
for feature, addr in inst_handler(fh, bbh, ih):
|
||||
assert isinstance(addr, Address)
|
||||
yield feature, addr
|
||||
|
||||
|
||||
@@ -64,12 +64,12 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple
|
||||
# namespaces may be referenced multiple times, so we need to filter
|
||||
namespaces = set()
|
||||
|
||||
for (_, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
for _, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
# emit internal .NET namespaces
|
||||
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
|
||||
namespaces.add(typedef.TypeNamespace)
|
||||
|
||||
for (_, typeref) in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
for _, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
# emit external .NET namespaces
|
||||
assert isinstance(typeref, dnfile.mdtable.TypeRefRow)
|
||||
namespaces.add(typeref.TypeNamespace)
|
||||
@@ -84,14 +84,14 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple
|
||||
|
||||
def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Class, Address]]:
|
||||
"""emit class features from TypeRef and TypeDef tables"""
|
||||
for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
|
||||
# emit internal .NET classes
|
||||
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
|
||||
|
||||
token = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid)
|
||||
yield Class(DnType.format_name(typedef.TypeName, namespace=typedef.TypeNamespace)), DNTokenAddress(token)
|
||||
|
||||
for (rid, typeref) in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
|
||||
# emit external .NET classes
|
||||
assert isinstance(typeref, dnfile.mdtable.TypeRefRow)
|
||||
|
||||
|
||||
@@ -108,7 +108,6 @@ class ELF:
|
||||
self._parse()
|
||||
|
||||
def _parse(self):
|
||||
|
||||
self.f.seek(0x0)
|
||||
self.file_header = self.f.read(0x40)
|
||||
|
||||
|
||||
@@ -112,7 +112,6 @@ def carve_pe(pbytes: bytes, offset: int = 0) -> Iterator[Tuple[int, int]]:
|
||||
todo = [(off, mzx, pex, key) for (off, mzx, pex, key) in todo if off != -1]
|
||||
|
||||
while len(todo):
|
||||
|
||||
off, mzx, pex, key = todo.pop()
|
||||
|
||||
# The MZ header has one field we will check
|
||||
|
||||
@@ -95,7 +95,7 @@ def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[F
|
||||
def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract basic block features"""
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for (feature, addr) in bb_handler(fh, bbh):
|
||||
for feature, addr in bb_handler(fh, bbh):
|
||||
yield feature, addr
|
||||
yield BasicBlock(), bbh.address
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
|
||||
]
|
||||
|
||||
todo = []
|
||||
for (mzx, pex, i) in mz_xor:
|
||||
for mzx, pex, i in mz_xor:
|
||||
for off in capa.features.extractors.ida.helpers.find_byte_sequence(seg.start_ea, seg.end_ea, mzx):
|
||||
todo.append((off, mzx, pex, i))
|
||||
|
||||
@@ -73,13 +73,13 @@ def extract_file_embedded_pe() -> Iterator[Tuple[Feature, Address]]:
|
||||
- Check 'Load resource sections' when opening binary in IDA manually
|
||||
"""
|
||||
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
|
||||
for (ea, _) in check_segment_for_pe(seg):
|
||||
for ea, _ in check_segment_for_pe(seg):
|
||||
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
|
||||
|
||||
|
||||
def extract_file_export_names() -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract function exports"""
|
||||
for (_, _, ea, name) in idautils.Entries():
|
||||
for _, _, ea, name in idautils.Entries():
|
||||
yield Export(name), AbsoluteVirtualAddress(ea)
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]:
|
||||
- modulename.importname
|
||||
- importname
|
||||
"""
|
||||
for (ea, info) in capa.features.extractors.ida.helpers.get_file_imports().items():
|
||||
for ea, info in capa.features.extractors.ida.helpers.get_file_imports().items():
|
||||
addr = AbsoluteVirtualAddress(ea)
|
||||
if info[1] and info[2]:
|
||||
# e.g. in mimikatz: ('cabinet', 'FCIAddFile', 11L)
|
||||
@@ -115,7 +115,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]:
|
||||
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
|
||||
yield Import(name), addr
|
||||
|
||||
for (ea, info) in capa.features.extractors.ida.helpers.get_file_externs().items():
|
||||
for ea, info in capa.features.extractors.ida.helpers.get_file_externs().items():
|
||||
yield Import(info[1]), AbsoluteVirtualAddress(ea)
|
||||
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ def extract_recursive_call(fh: FunctionHandle):
|
||||
|
||||
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for (feature, addr) in func_handler(fh):
|
||||
for feature, addr in func_handler(fh):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
|
||||
@@ -482,7 +482,7 @@ def extract_function_indirect_call_characteristic_features(
|
||||
def extract_features(f: FunctionHandle, bbh: BBHandle, insn: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract instruction features"""
|
||||
for inst_handler in INSTRUCTION_HANDLERS:
|
||||
for (feature, ea) in inst_handler(f, bbh, insn):
|
||||
for feature, ea in inst_handler(f, bbh, insn):
|
||||
yield feature, ea
|
||||
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ def get_previous_instructions(vw: VivWorkspace, va: int) -> List[int]:
|
||||
#
|
||||
# from vivisect.const:
|
||||
# xref: (XR_FROM, XR_TO, XR_RTYPE, XR_RFLAG)
|
||||
for (xfrom, _, _, xflag) in vw.getXrefsTo(va, REF_CODE):
|
||||
for xfrom, _, _, xflag in vw.getXrefsTo(va, REF_CODE):
|
||||
if (xflag & FAR_BRANCH_MASK) != 0:
|
||||
continue
|
||||
ret.append(xfrom)
|
||||
|
||||
@@ -17,7 +17,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CapaExplorerPlugin(idaapi.plugin_t):
|
||||
|
||||
# Mandatory definitions
|
||||
PLUGIN_NAME = "FLARE capa explorer"
|
||||
PLUGIN_VERSION = "1.0.0"
|
||||
|
||||
@@ -61,7 +61,7 @@ class CapaRuleGenFeatureCache:
|
||||
self._find_function_and_below_features(fh_list, extractor)
|
||||
|
||||
def _find_global_features(self, extractor: CapaExplorerFeatureExtractor):
|
||||
for (feature, addr) in extractor.extract_global_features():
|
||||
for feature, addr in extractor.extract_global_features():
|
||||
# not all global features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
@@ -75,7 +75,7 @@ class CapaRuleGenFeatureCache:
|
||||
# not all file features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
for (feature, addr) in extractor.extract_file_features():
|
||||
for feature, addr in extractor.extract_file_features():
|
||||
if addr is not None:
|
||||
self.file_node.features[feature].add(addr)
|
||||
else:
|
||||
@@ -94,20 +94,20 @@ class CapaRuleGenFeatureCache:
|
||||
for ih in extractor.get_instructions(fh, bbh):
|
||||
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
|
||||
|
||||
for (feature, addr) in extractor.extract_insn_features(fh, bbh, ih):
|
||||
for feature, addr in extractor.extract_insn_features(fh, bbh, ih):
|
||||
inode.features[feature].add(addr)
|
||||
|
||||
self.insn_nodes[inode.address] = inode
|
||||
|
||||
# extract basic block features
|
||||
for (feature, addr) in extractor.extract_basic_block_features(fh, bbh):
|
||||
for feature, addr in extractor.extract_basic_block_features(fh, bbh):
|
||||
bb_node.features[feature].add(addr)
|
||||
|
||||
# store basic block features in cache and function parent
|
||||
self.bb_nodes[bb_node.address] = bb_node
|
||||
|
||||
# extract function features
|
||||
for (feature, addr) in extractor.extract_function_features(fh):
|
||||
for feature, addr in extractor.extract_function_features(fh):
|
||||
f_node.features[feature].add(addr)
|
||||
|
||||
self.func_nodes[f_node.address] = f_node
|
||||
@@ -117,13 +117,13 @@ class CapaRuleGenFeatureCache:
|
||||
) -> Tuple[FeatureSet, MatchResults]:
|
||||
features: FeatureSet = collections.defaultdict(set)
|
||||
|
||||
for (feature, locs) in itertools.chain(insn.features.items(), self.global_features.items()):
|
||||
for feature, locs in itertools.chain(insn.features.items(), self.global_features.items()):
|
||||
features[feature].update(locs)
|
||||
|
||||
_, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address)
|
||||
for (name, result) in matches.items():
|
||||
for name, result in matches.items():
|
||||
rule = ruleset[name]
|
||||
for (addr, _) in result:
|
||||
for addr, _ in result:
|
||||
capa.engine.index_rule_matches(features, rule, [addr])
|
||||
|
||||
return features, matches
|
||||
@@ -136,18 +136,18 @@ class CapaRuleGenFeatureCache:
|
||||
|
||||
for insn in bb.children:
|
||||
ifeatures, imatches = self._find_instruction_capabilities(ruleset, insn)
|
||||
for (feature, locs) in ifeatures.items():
|
||||
for feature, locs in ifeatures.items():
|
||||
features[feature].update(locs)
|
||||
for (name, result) in imatches.items():
|
||||
for name, result in imatches.items():
|
||||
insn_matches[name].extend(result)
|
||||
|
||||
for (feature, locs) in itertools.chain(bb.features.items(), self.global_features.items()):
|
||||
for feature, locs in itertools.chain(bb.features.items(), self.global_features.items()):
|
||||
features[feature].update(locs)
|
||||
|
||||
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address)
|
||||
for (name, result) in matches.items():
|
||||
for name, result in matches.items():
|
||||
rule = ruleset[name]
|
||||
for (loc, _) in result:
|
||||
for loc, _ in result:
|
||||
capa.engine.index_rule_matches(features, rule, [loc])
|
||||
|
||||
return features, matches, insn_matches
|
||||
@@ -165,14 +165,14 @@ class CapaRuleGenFeatureCache:
|
||||
|
||||
for bb in f_node.children:
|
||||
features, bmatches, imatches = self._find_basic_block_capabilities(ruleset, bb)
|
||||
for (feature, locs) in features.items():
|
||||
for feature, locs in features.items():
|
||||
function_features[feature].update(locs)
|
||||
for (name, result) in bmatches.items():
|
||||
for name, result in bmatches.items():
|
||||
bb_matches[name].extend(result)
|
||||
for (name, result) in imatches.items():
|
||||
for name, result in imatches.items():
|
||||
insn_matches[name].extend(result)
|
||||
|
||||
for (feature, locs) in itertools.chain(f_node.features.items(), self.global_features.items()):
|
||||
for feature, locs in itertools.chain(f_node.features.items(), self.global_features.items()):
|
||||
function_features[feature].update(locs)
|
||||
|
||||
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, f_node.address)
|
||||
@@ -186,10 +186,10 @@ class CapaRuleGenFeatureCache:
|
||||
assert isinstance(func_node.inner, FunctionHandle)
|
||||
|
||||
func_features, _, _, _ = self.find_code_capabilities(ruleset, func_node.inner)
|
||||
for (feature, locs) in func_features.items():
|
||||
for feature, locs in func_features.items():
|
||||
features[feature].update(locs)
|
||||
|
||||
for (feature, locs) in itertools.chain(self.file_node.features.items(), self.global_features.items()):
|
||||
for feature, locs in itertools.chain(self.file_node.features.items(), self.global_features.items()):
|
||||
features[feature].update(locs)
|
||||
|
||||
_, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS)
|
||||
@@ -205,13 +205,13 @@ class CapaRuleGenFeatureCache:
|
||||
|
||||
for bb_node in f_node.children:
|
||||
for i_node in bb_node.children:
|
||||
for (feature, locs) in i_node.features.items():
|
||||
for feature, locs in i_node.features.items():
|
||||
all_function_features[feature].update(locs)
|
||||
for (feature, locs) in bb_node.features.items():
|
||||
for feature, locs in bb_node.features.items():
|
||||
all_function_features[feature].update(locs)
|
||||
|
||||
# include global features just once
|
||||
for (feature, locs) in self.global_features.items():
|
||||
for feature, locs in self.global_features.items():
|
||||
all_function_features[feature].update(locs)
|
||||
|
||||
return all_function_features
|
||||
|
||||
@@ -907,7 +907,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def get_ask_use_persistent_cache(self, analyze):
|
||||
if analyze and analyze != Options.NO_ANALYSIS:
|
||||
|
||||
update_wait_box("checking for cached results")
|
||||
|
||||
try:
|
||||
@@ -922,12 +921,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
raise UserCancelledError
|
||||
|
||||
if has_cache:
|
||||
|
||||
if analyze == Options.ANALYZE_AUTO:
|
||||
return True
|
||||
|
||||
elif analyze == Options.ANALYZE_ASK:
|
||||
|
||||
update_wait_box("verifying cached results")
|
||||
|
||||
try:
|
||||
@@ -1045,11 +1042,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function)
|
||||
)
|
||||
|
||||
for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()):
|
||||
for name, result in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()):
|
||||
rule = ruleset[name]
|
||||
if rule.is_subscope_rule():
|
||||
continue
|
||||
for (addr, _) in result:
|
||||
for addr, _ in result:
|
||||
all_function_features[capa.features.common.MatchedRule(name)].add(addr)
|
||||
except Exception as e:
|
||||
logger.error("Failed to generate rule matches (error: %s)", e, exc_info=True)
|
||||
@@ -1066,11 +1063,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset)
|
||||
all_file_features.update(self.rulegen_feature_cache.get_all_file_features())
|
||||
|
||||
for (name, result) in file_matches.items():
|
||||
for name, result in file_matches.items():
|
||||
rule = ruleset[name]
|
||||
if rule.is_subscope_rule():
|
||||
continue
|
||||
for (addr, _) in result:
|
||||
for addr, _ in result:
|
||||
all_file_features[capa.features.common.MatchedRule(name)].add(addr)
|
||||
except Exception as e:
|
||||
logger.error("Failed to generate file rule matches (error: %s)", e, exc_info=True)
|
||||
|
||||
@@ -450,7 +450,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
match_eas: List[int] = []
|
||||
|
||||
# initial pass of rule matches
|
||||
for (addr_, _) in rule.matches:
|
||||
for addr_, _ in rule.matches:
|
||||
addr: Address = addr_.to_capa()
|
||||
if isinstance(addr, AbsoluteVirtualAddress):
|
||||
match_eas.append(int(addr))
|
||||
@@ -494,7 +494,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
rule_namespace = rule.meta.namespace or ""
|
||||
parent = CapaExplorerRuleItem(self.root_node, rule_name, rule_namespace, len(rule.matches), rule.source)
|
||||
|
||||
for (location_, match) in rule.matches:
|
||||
for location_, match in rule.matches:
|
||||
location = location_.to_capa()
|
||||
|
||||
parent2: CapaExplorerDataItem
|
||||
|
||||
@@ -174,7 +174,6 @@ def resize_columns_to_content(header):
|
||||
|
||||
|
||||
class CapaExplorerRulegenPreview(QtWidgets.QTextEdit):
|
||||
|
||||
INDENT = " " * 2
|
||||
|
||||
def __init__(self, parent=None):
|
||||
@@ -255,7 +254,7 @@ class CapaExplorerRulegenPreview(QtWidgets.QTextEdit):
|
||||
lines_modified = 0
|
||||
first_modified = False
|
||||
change = []
|
||||
for (lineno, line) in enumerate(plain[start_lineno : end_lineno + 1]):
|
||||
for lineno, line in enumerate(plain[start_lineno : end_lineno + 1]):
|
||||
if line.startswith(self.INDENT):
|
||||
if lineno == 0:
|
||||
# keep track if first line is modified, so we can properly display
|
||||
@@ -307,7 +306,6 @@ class CapaExplorerRulegenPreview(QtWidgets.QTextEdit):
|
||||
|
||||
|
||||
class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
|
||||
updated = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, preview, parent=None):
|
||||
@@ -619,7 +617,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
""" """
|
||||
o = QtWidgets.QTreeWidgetItem(parent)
|
||||
self.set_expression_node(o)
|
||||
for (i, v) in enumerate(values):
|
||||
for i, v in enumerate(values):
|
||||
o.setText(i, v)
|
||||
return o
|
||||
|
||||
@@ -627,7 +625,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
""" """
|
||||
o = QtWidgets.QTreeWidgetItem(parent)
|
||||
self.set_feature_node(o)
|
||||
for (i, v) in enumerate(values):
|
||||
for i, v in enumerate(values):
|
||||
o.setText(i, v)
|
||||
return o
|
||||
|
||||
@@ -635,7 +633,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
""" """
|
||||
o = QtWidgets.QTreeWidgetItem(parent)
|
||||
self.set_comment_node(o)
|
||||
for (i, v) in enumerate(values):
|
||||
for i, v in enumerate(values):
|
||||
o.setText(i, v)
|
||||
return o
|
||||
|
||||
@@ -654,7 +652,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
counted = list(zip(Counter(features).keys(), Counter(features).values()))
|
||||
|
||||
# single features
|
||||
for (k, v) in filter(lambda t: t[1] == 1, counted):
|
||||
for k, v in filter(lambda t: t[1] == 1, counted):
|
||||
if isinstance(k, (capa.features.common.String,)):
|
||||
value = '"%s"' % capa.features.common.escape_string(k.get_value_str())
|
||||
else:
|
||||
@@ -662,7 +660,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
self.new_feature_node(top_node, ("- %s: %s" % (k.name.lower(), value), ""))
|
||||
|
||||
# n > 1 features
|
||||
for (k, v) in filter(lambda t: t[1] > 1, counted):
|
||||
for k, v in filter(lambda t: t[1] > 1, counted):
|
||||
if k.value:
|
||||
if isinstance(k, (capa.features.common.String,)):
|
||||
value = '"%s"' % capa.features.common.escape_string(k.get_value_str())
|
||||
@@ -707,7 +705,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
node = QtWidgets.QTreeWidgetItem(parent)
|
||||
|
||||
# set node text to data parsed from feature
|
||||
for (idx, text) in enumerate((feature, comment, description)):
|
||||
for idx, text in enumerate((feature, comment, description)):
|
||||
node.setText(idx, text)
|
||||
|
||||
# we need to set our own type so we can control the GUI accordingly
|
||||
@@ -999,7 +997,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
|
||||
o = QtWidgets.QTreeWidgetItem(parent)
|
||||
|
||||
self.set_parent_node(o)
|
||||
for (i, v) in enumerate(data):
|
||||
for i, v in enumerate(data):
|
||||
o.setText(i, v)
|
||||
if feature:
|
||||
o.setData(0, 0x100, feature)
|
||||
@@ -1011,7 +1009,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
|
||||
o = QtWidgets.QTreeWidgetItem(parent)
|
||||
|
||||
self.set_leaf_node(o)
|
||||
for (i, v) in enumerate(data):
|
||||
for i, v in enumerate(data):
|
||||
o.setText(i, v)
|
||||
if feature:
|
||||
o.setData(0, 0x100, feature)
|
||||
@@ -1043,7 +1041,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
|
||||
value = '"%s"' % capa.features.common.escape_string(value)
|
||||
return "%s(%s)" % (name, value)
|
||||
|
||||
for (feature, addrs) in sorted(features.items(), key=lambda k: sorted(k[1])):
|
||||
for feature, addrs in sorted(features.items(), key=lambda k: sorted(k[1])):
|
||||
if isinstance(feature, capa.features.basicblock.BasicBlock):
|
||||
# filter basic blocks for now, we may want to add these back in some time
|
||||
# in the future
|
||||
@@ -1076,7 +1074,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
|
||||
else:
|
||||
# some features may not have an address e.g. "format"
|
||||
addr = _NoAddress()
|
||||
for (i, v) in enumerate((format_feature(feature), format_address(addr))):
|
||||
for i, v in enumerate((format_feature(feature), format_address(addr))):
|
||||
self.parent_items[feature].setText(i, v)
|
||||
self.parent_items[feature].setData(0, 0x100, feature)
|
||||
|
||||
|
||||
@@ -758,7 +758,7 @@ def compute_layout(rules, extractor, capabilities):
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
|
||||
for (addr, _) in matches:
|
||||
for addr, _ in matches:
|
||||
assert addr in functions_by_bb
|
||||
matched_bbs.add(addr)
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ def render_attack(doc: rd.ResultDocument, ostream: StringIO):
|
||||
rows = []
|
||||
for tactic, techniques in sorted(tactics.items()):
|
||||
inner_rows = []
|
||||
for (technique, subtechnique, id) in sorted(techniques):
|
||||
for technique, subtechnique, id in sorted(techniques):
|
||||
if not subtechnique:
|
||||
inner_rows.append("%s %s" % (rutils.bold(technique), id))
|
||||
else:
|
||||
@@ -176,7 +176,7 @@ def render_mbc(doc: rd.ResultDocument, ostream: StringIO):
|
||||
rows = []
|
||||
for objective, behaviors in sorted(objectives.items()):
|
||||
inner_rows = []
|
||||
for (behavior, method, id) in sorted(behaviors):
|
||||
for behavior, method, id in sorted(behaviors):
|
||||
if not method:
|
||||
inner_rows.append("%s [%s]" % (rutils.bold(behavior), id))
|
||||
else:
|
||||
|
||||
@@ -37,7 +37,7 @@ def format_parts_id(data: Union[rd.AttackSpec, rd.MBCSpec]):
|
||||
|
||||
def capability_rules(doc: rd.ResultDocument) -> Iterator[rd.RuleMatches]:
|
||||
"""enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc)."""
|
||||
for (_, _, rule) in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())):
|
||||
for _, _, rule in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())):
|
||||
if rule.meta.lib:
|
||||
continue
|
||||
if rule.meta.is_subscope_rule:
|
||||
|
||||
@@ -277,7 +277,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
|
||||
had_match = False
|
||||
|
||||
for (_, _, rule) in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())):
|
||||
for _, _, rule in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())):
|
||||
# default scope hides things like lib rules, malware-category rules, etc.
|
||||
# but in vverbose mode, we really want to show everything.
|
||||
#
|
||||
|
||||
@@ -160,12 +160,12 @@ def main(argv=None):
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except (IOError) as e:
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
samples = []
|
||||
for (base, directories, files) in os.walk(args.input):
|
||||
for base, directories, files in os.walk(args.input):
|
||||
for file in files:
|
||||
samples.append(os.path.join(base, file))
|
||||
|
||||
|
||||
@@ -128,7 +128,6 @@ def convert_capa_number_to_yara_bytes(number):
|
||||
|
||||
|
||||
def convert_rule_name(rule_name):
|
||||
|
||||
# yara rule names: "Identifiers must follow the same lexical conventions of the C programming language, they can contain any alphanumeric character and the underscore character, but the first character cannot be a digit. Rule identifiers are case sensitive and cannot exceed 128 characters." so we replace any non-alphanum with _
|
||||
rule_name = re.sub(r"\W", "_", rule_name)
|
||||
rule_name = "capa_" + rule_name
|
||||
@@ -151,7 +150,6 @@ def convert_description(statement):
|
||||
|
||||
|
||||
def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
depth += 1
|
||||
logger.info("recursion depth: " + str(depth))
|
||||
|
||||
@@ -515,7 +513,6 @@ def output_yar(yara):
|
||||
|
||||
|
||||
def output_unsupported_capa_rules(yaml, capa_rulename, url, reason):
|
||||
|
||||
if reason != "NOLOG":
|
||||
if capa_rulename not in unsupported_capa_rules_list:
|
||||
logger.info("unsupported: " + capa_rulename + " - reason: " + reason + " - url: " + url)
|
||||
@@ -539,7 +536,6 @@ def output_unsupported_capa_rules(yaml, capa_rulename, url, reason):
|
||||
def convert_rules(rules, namespaces, cround, make_priv):
|
||||
count_incomplete = 0
|
||||
for rule in rules.rules.values():
|
||||
|
||||
rule_name = convert_rule_name(rule.name)
|
||||
|
||||
if rule.is_subscope_rule():
|
||||
@@ -579,7 +575,6 @@ def convert_rules(rules, namespaces, cround, make_priv):
|
||||
output_unsupported_capa_rules(rule.to_yaml(), rule.name, url, yara_condition)
|
||||
logger.info("Unknown feature at5: " + rule.name)
|
||||
else:
|
||||
|
||||
yara_meta = ""
|
||||
metas = rule.meta
|
||||
rule_tags = ""
|
||||
@@ -661,7 +656,6 @@ def convert_rules(rules, namespaces, cround, make_priv):
|
||||
# check if there's some beef in condition:
|
||||
tmp_yc = re.sub(r"(and|or|not)", "", yara_condition)
|
||||
if re.search(r"\w", tmp_yc):
|
||||
|
||||
yara = ""
|
||||
if make_priv:
|
||||
yara = "private "
|
||||
|
||||
@@ -106,7 +106,7 @@ def render_attack(doc, result):
|
||||
|
||||
for tactic, techniques in sorted(tactics.items()):
|
||||
inner_rows = []
|
||||
for (technique, subtechnique, id) in sorted(techniques):
|
||||
for technique, subtechnique, id in sorted(techniques):
|
||||
if subtechnique is None:
|
||||
inner_rows.append("%s %s" % (technique, id))
|
||||
else:
|
||||
@@ -140,7 +140,7 @@ def render_mbc(doc, result):
|
||||
|
||||
for objective, behaviors in sorted(objectives.items()):
|
||||
inner_rows = []
|
||||
for (behavior, method, id) in sorted(behaviors):
|
||||
for behavior, method, id in sorted(behaviors):
|
||||
if method is None:
|
||||
inner_rows.append("%s [%s]" % (behavior, id))
|
||||
else:
|
||||
|
||||
@@ -890,7 +890,6 @@ def redirecting_print_to_tqdm():
|
||||
old_print = print
|
||||
|
||||
def new_print(*args, **kwargs):
|
||||
|
||||
# If tqdm.tqdm.write raises error, use builtin print
|
||||
try:
|
||||
tqdm.tqdm.write(*args, **kwargs)
|
||||
|
||||
@@ -89,13 +89,13 @@ def main(argv=None):
|
||||
try:
|
||||
with capa.main.timing("load rules"):
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
except (IOError) as e:
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except (IOError) as e:
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
@@ -120,7 +120,7 @@ def main(argv=None):
|
||||
logger.debug("perf: find capabilities: avg: %0.2fs" % (sum(samples) / float(args.repeat) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: max: %0.2fs" % (max(samples) / float(args.number)))
|
||||
|
||||
for (counter, count) in capa.perf.counters.most_common():
|
||||
for counter, count in capa.perf.counters.most_common():
|
||||
logger.debug("perf: counter: {:}: {:,}".format(counter, count))
|
||||
|
||||
print(
|
||||
|
||||
@@ -152,7 +152,7 @@ def main(argv=None):
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except (IOError) as e:
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
|
||||
2
setup.py
2
setup.py
@@ -73,7 +73,7 @@ setuptools.setup(
|
||||
"pytest-instafail==0.4.2",
|
||||
"pytest-cov==4.0.0",
|
||||
"pycodestyle==2.10.0",
|
||||
"black==22.12.0",
|
||||
"black==23.1.0",
|
||||
"isort==5.11.4",
|
||||
"mypy==0.991",
|
||||
"psutil==5.9.2",
|
||||
|
||||
@@ -45,7 +45,7 @@ def get_ida_extractor(_path):
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_features():
|
||||
for (sample, scope, feature, expected) in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA:
|
||||
for sample, scope, feature, expected in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA:
|
||||
id = fixtures.make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
@@ -68,7 +68,7 @@ def test_ida_features():
|
||||
|
||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||
def test_ida_feature_counts():
|
||||
for (sample, scope, feature, expected) in fixtures.FEATURE_COUNT_TESTS:
|
||||
for sample, scope, feature, expected in fixtures.FEATURE_COUNT_TESTS:
|
||||
id = fixtures.make_test_id((sample, scope, feature, expected))
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user