mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Merge branch 'master' of https://github.com/mandiant/capa
This commit is contained in:
@@ -25,10 +25,11 @@
|
||||
### Bug Fixes
|
||||
- extractor: fix vivisect loop detection corner case #1310 @mr-tz
|
||||
- match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff
|
||||
- explorer: fix exception when plugin loaded in IDA hosted under idat #1341 @mike-hunhoff
|
||||
- extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
- fix exception when plugin loaded in IDA hosted under idat #1341 @mike-hunhoff
|
||||
- improve embedded PE detection performance and reduce FP potential #1344 @mike-hunhoff
|
||||
|
||||
### Development
|
||||
|
||||
|
||||
@@ -21,12 +21,14 @@ from capa.features.file import Export, Import, Section, FunctionName
|
||||
from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic
|
||||
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress
|
||||
|
||||
MAX_OFFSET_PE_AFTER_MZ = 0x200
|
||||
|
||||
|
||||
def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
|
||||
"""check segment for embedded PE
|
||||
|
||||
adapted for IDA from:
|
||||
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
|
||||
https://github.com/vivisect/vivisect/blob/91e8419a861f49779f18316f155311967e696836/PE/carve.py#L25
|
||||
"""
|
||||
seg_max = seg.end_ea
|
||||
mz_xor = [
|
||||
@@ -40,13 +42,14 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
|
||||
|
||||
todo = []
|
||||
for mzx, pex, i in mz_xor:
|
||||
# find all segment offsets containing XOR'd "MZ" bytes
|
||||
for off in capa.features.extractors.ida.helpers.find_byte_sequence(seg.start_ea, seg.end_ea, mzx):
|
||||
todo.append((off, mzx, pex, i))
|
||||
|
||||
while len(todo):
|
||||
off, mzx, pex, i = todo.pop()
|
||||
|
||||
# The MZ header has one field we will check e_lfanew is at 0x3c
|
||||
# MZ header has one field we will check e_lfanew is at 0x3c
|
||||
e_lfanew = off + 0x3C
|
||||
|
||||
if seg_max < (e_lfanew + 4):
|
||||
@@ -54,6 +57,10 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
|
||||
|
||||
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(idc.get_bytes(e_lfanew, 4), i))[0]
|
||||
|
||||
# assume XOR'd "PE" bytes exist within threshold
|
||||
if newoff > MAX_OFFSET_PE_AFTER_MZ:
|
||||
continue
|
||||
|
||||
peoff = off + newoff
|
||||
if seg_max < (peoff + 2):
|
||||
continue
|
||||
@@ -61,9 +68,6 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
|
||||
if idc.get_bytes(peoff, 2) == pex:
|
||||
yield off, i
|
||||
|
||||
for nextres in capa.features.extractors.ida.helpers.find_byte_sequence(off + 1, seg.end_ea, mzx):
|
||||
todo.append((nextres, mzx, pex, i))
|
||||
|
||||
|
||||
def extract_file_embedded_pe() -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract embedded PE features
|
||||
@@ -102,13 +106,13 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]:
|
||||
for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1]):
|
||||
yield Import(name), addr
|
||||
dll = info[0]
|
||||
symbol = "#%d" % (info[2])
|
||||
symbol = f"#{info[2]}"
|
||||
elif info[1]:
|
||||
dll = info[0]
|
||||
symbol = info[1]
|
||||
elif info[2]:
|
||||
dll = info[0]
|
||||
symbol = "#%d" % (info[2])
|
||||
symbol = f"#{info[2]}"
|
||||
else:
|
||||
continue
|
||||
|
||||
@@ -176,7 +180,7 @@ def extract_file_format() -> Iterator[Tuple[Feature, Address]]:
|
||||
# no file type to return when processing a binary file, but we want to continue processing
|
||||
return
|
||||
else:
|
||||
raise NotImplementedError("unexpected file format: %d" % file_info.filetype)
|
||||
raise NotImplementedError(f"unexpected file format: {file_info.filetype}")
|
||||
|
||||
|
||||
def extract_features() -> Iterator[Tuple[Feature, Address]]:
|
||||
|
||||
@@ -31,7 +31,7 @@ class _AccessFeature(Feature, abc.ABC):
|
||||
super().__init__(value, description=description)
|
||||
if access is not None:
|
||||
if access not in VALID_FEATURE_ACCESS:
|
||||
raise ValueError("%s access type %s not valid" % (self.name, access))
|
||||
raise ValueError(f"{self.name} access type {access} not valid")
|
||||
self.access = access
|
||||
|
||||
def __hash__(self):
|
||||
|
||||
18
capa/main.py
18
capa/main.py
@@ -853,15 +853,15 @@ def install_common_args(parser, wanted=None):
|
||||
help="select sample format, %s" % format_help,
|
||||
)
|
||||
|
||||
if "backend" in wanted:
|
||||
parser.add_argument(
|
||||
"-b",
|
||||
"--backend",
|
||||
type=str,
|
||||
help="select the backend to use",
|
||||
choices=(BACKEND_VIV,),
|
||||
default=BACKEND_VIV,
|
||||
)
|
||||
if "backend" in wanted:
|
||||
parser.add_argument(
|
||||
"-b",
|
||||
"--backend",
|
||||
type=str,
|
||||
help="select the backend to use",
|
||||
choices=(BACKEND_VIV,),
|
||||
default=BACKEND_VIV,
|
||||
)
|
||||
|
||||
if "rules" in wanted:
|
||||
parser.add_argument(
|
||||
|
||||
@@ -93,9 +93,9 @@ def load_analysis(bv):
|
||||
rows = sorted(rows)
|
||||
for ns, name, va in rows:
|
||||
if ns:
|
||||
cmt = "%s (%s)" % (name, ns)
|
||||
cmt = f"{name} ({ns})"
|
||||
else:
|
||||
cmt = "%s" % (name,)
|
||||
cmt = f"{name}"
|
||||
|
||||
binaryninja.log_info("0x%x: %s" % (va, cmt))
|
||||
try:
|
||||
|
||||
@@ -101,9 +101,9 @@ def main():
|
||||
rows = sorted(rows)
|
||||
for ns, name, va in rows:
|
||||
if ns:
|
||||
cmt = "%s (%s)" % (name, ns)
|
||||
cmt = f"{name} ({ns})"
|
||||
else:
|
||||
cmt = "%s" % (name,)
|
||||
cmt = f"{name}"
|
||||
|
||||
logger.info("0x%x: %s", va, cmt)
|
||||
try:
|
||||
|
||||
@@ -125,7 +125,7 @@ def main(argv=None):
|
||||
for analyzer in analyzers:
|
||||
name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
|
||||
if name:
|
||||
print("0x%04x: %s" % (function, name))
|
||||
print(f"0x{function:04x}: {name}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -16,10 +16,10 @@ def display_top(snapshot, key_type="lineno", limit=10):
|
||||
)
|
||||
top_stats = snapshot.statistics(key_type)
|
||||
|
||||
print("Top %s lines" % limit)
|
||||
print(f"Top {limit} lines")
|
||||
for index, stat in enumerate(top_stats[:limit], 1):
|
||||
frame = stat.traceback[0]
|
||||
print("#%s: %s:%s: %.1f KiB" % (index, frame.filename, frame.lineno, stat.size / 1024))
|
||||
print(f"#{index}: {frame.filename}:{frame.lineno}: {stat.size / 1024:.1f} KiB")
|
||||
line = linecache.getline(frame.filename, frame.lineno).strip()
|
||||
if line:
|
||||
print(" %s" % line)
|
||||
@@ -27,9 +27,9 @@ def display_top(snapshot, key_type="lineno", limit=10):
|
||||
other = top_stats[limit:]
|
||||
if other:
|
||||
size = sum(stat.size for stat in other)
|
||||
print("%s other: %.1f KiB" % (len(other), size / 1024))
|
||||
print(f"{len(other)} other: {size / 1024:.1f} KiB")
|
||||
total = sum(stat.size for stat in top_stats)
|
||||
print("Total allocated size: %.1f KiB" % (total / 1024))
|
||||
print(f"Total allocated size: {total / 1024:.1f} KiB")
|
||||
|
||||
|
||||
def main():
|
||||
@@ -45,11 +45,11 @@ def main():
|
||||
import capa.main
|
||||
|
||||
count = int(os.environ.get("CAPA_PROFILE_COUNT", 1))
|
||||
print("total iterations planned: %d (set via env var CAPA_PROFILE_COUNT)." % (count))
|
||||
print(f"total iterations planned: {count} (set via env var CAPA_PROFILE_COUNT).")
|
||||
print()
|
||||
|
||||
for i in range(count):
|
||||
print("iteration %d/%d..." % (i + 1, count))
|
||||
print(f"iteration {i + 1}/{count}...")
|
||||
with contextlib.redirect_stdout(io.StringIO()):
|
||||
with contextlib.redirect_stderr(io.StringIO()):
|
||||
t0 = time.time()
|
||||
@@ -59,9 +59,9 @@ def main():
|
||||
gc.collect()
|
||||
|
||||
process = psutil.Process(os.getpid())
|
||||
print(" duration: %0.02fs" % (t1 - t0))
|
||||
print(" rss: %.1f MiB" % (process.memory_info().rss / 1024 / 1024))
|
||||
print(" vms: %.1f MiB" % (process.memory_info().vms / 1024 / 1024))
|
||||
print(f" duration: {t1 - t0:.02f}s")
|
||||
print(f" rss: {process.memory_info().rss / 1024 / 1024:.1f} MiB")
|
||||
print(f" vms: {process.memory_info().vms / 1024 / 1024:.1f} MiB")
|
||||
|
||||
print("done.")
|
||||
gc.collect()
|
||||
|
||||
Reference in New Issue
Block a user