feat: add handles and type annotations

This commit is contained in:
Moritz Raabe
2022-05-12 10:52:28 +02:00
parent 7b05fc4180
commit 716a73dfb4
23 changed files with 464 additions and 541 deletions

View File

@@ -8,7 +8,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List, Tuple
from typing import TYPE_CHECKING, List, Tuple, Iterator
from capa.features.address import Address, DNTokenAddress, DNTokenOffsetAddress, AbsoluteVirtualAddress
@@ -44,7 +44,7 @@ class DnfileFeatureExtractor(FeatureExtractor):
def extract_file_features(self):
yield from capa.features.extractors.dnfile.file.extract_features(self.pe)
def get_functions(self):
def get_functions(self) -> Iterator[FunctionHandle]:
for token, f in get_dotnet_managed_method_bodies(self.pe):
yield FunctionHandle(address=DNTokenAddress(Token(token)), inner=f, ctx={"pe": self.pe})
@@ -52,20 +52,20 @@ class DnfileFeatureExtractor(FeatureExtractor):
# TODO
yield from []
def get_basic_blocks(self, f):
def get_basic_blocks(self, f) -> Iterator[BBHandle]:
# each dotnet method is considered 1 basic block
yield BBHandle(
address=f.address,
inner=f.inner,
)
def extract_basic_block_features(self, f, bb):
def extract_basic_block_features(self, fh, bbh):
# we don't support basic block features
yield from []
def get_instructions(self, f, bb):
for insn in bb.inner.instructions:
yield InsnHandle(address=DNTokenOffsetAddress(bb.address.token, insn.offset - f.inner.offset), inner=insn)
def get_instructions(self, fh, bbh):
for insn in bbh.inner.instructions:
yield InsnHandle(address=DNTokenOffsetAddress(bbh.address.token, insn.offset - fh.inner.offset), inner=insn)
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.dnfile.insn.extract_features(f, bb, insn)
def extract_insn_features(self, fh, bbh, ih) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.dnfile.insn.extract_features(fh, bbh, ih)

View File

@@ -47,7 +47,10 @@ class OS(str, Enum):
NACL = "nacl"
def detect_elf_os(f: BinaryIO) -> str:
def detect_elf_os(f) -> str:
"""
f: type Union[BinaryIO, IDAIO]
"""
f.seek(0x0)
file_header = f.read(0x40)

View File

@@ -8,22 +8,21 @@
import string
import struct
from typing import Tuple, Iterator
import idaapi
import capa.features.extractors.ida.helpers
from capa.features.common import Characteristic
from capa.features.common import Feature, Characteristic
from capa.features.address import Address
from capa.features.basicblock import BasicBlock
from capa.features.extractors.ida import helpers
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
def get_printable_len(op):
"""Return string length if all operand bytes are ascii or utf16-le printable
args:
op (IDA op_t)
"""
def get_printable_len(op: idaapi.op_t) -> int:
"""Return string length if all operand bytes are ascii or utf16-le printable"""
op_val = capa.features.extractors.ida.helpers.mask_op_val(op)
if op.dtype == idaapi.dt_byte:
@@ -37,12 +36,12 @@ def get_printable_len(op):
else:
raise ValueError("Unhandled operand data type 0x%x." % op.dtype)
def is_printable_ascii(chars):
return all(c < 127 and chr(c) in string.printable for c in chars)
def is_printable_ascii(chars_: bytes):
return all(c < 127 and chr(c) in string.printable for c in chars_)
def is_printable_utf16le(chars):
if all(c == 0x00 for c in chars[1::2]):
return is_printable_ascii(chars[::2])
def is_printable_utf16le(chars_: bytes):
if all(c == 0x00 for c in chars_[1::2]):
return is_printable_ascii(chars_[::2])
if is_printable_ascii(chars):
return idaapi.get_dtype_size(op.dtype)
@@ -53,12 +52,8 @@ def get_printable_len(op):
return 0
def is_mov_imm_to_stack(insn):
"""verify instruction moves immediate onto stack
args:
insn (IDA insn_t)
"""
def is_mov_imm_to_stack(insn: idaapi.insn_t) -> bool:
"""verify instruction moves immediate onto stack"""
if insn.Op2.type != idaapi.o_imm:
return False
@@ -71,14 +66,10 @@ def is_mov_imm_to_stack(insn):
return True
def bb_contains_stackstring(f, bb):
def bb_contains_stackstring(f: idaapi.func_t, bb: idaapi.BasicBlock) -> bool:
"""check basic block for stackstring indicators
true if basic block contains enough moves of constant bytes to the stack
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
count = 0
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
@@ -89,39 +80,24 @@ def bb_contains_stackstring(f, bb):
return False
def extract_bb_stackstring(f, bb):
"""extract stackstring indicators from basic block
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
if bb_contains_stackstring(f, bb):
yield Characteristic("stack string"), bb.start_ea
def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract stackstring indicators from basic block"""
if bb_contains_stackstring(fh.inner, bbh.inner):
yield Characteristic("stack string"), bbh.address
def extract_bb_tight_loop(f, bb):
"""extract tight loop indicators from a basic block
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(bb):
yield Characteristic("tight loop"), bb.start_ea
def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract tight loop indicators from a basic block"""
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(bbh.inner):
yield Characteristic("tight loop"), bbh.address
def extract_features(f, bb):
"""extract basic block features
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract basic block features"""
for bb_handler in BASIC_BLOCK_HANDLERS:
for (feature, ea) in bb_handler(f, bb):
yield feature, ea
yield BasicBlock(), bb.start_ea
for (feature, addr) in bb_handler(fh, bbh):
yield feature, addr
yield BasicBlock(), bbh.address
BASIC_BLOCK_HANDLERS = (
@@ -132,9 +108,10 @@ BASIC_BLOCK_HANDLERS = (
def main():
features = []
for f in helpers.get_functions(skip_thunks=True, skip_libs=True):
for fhandle in helpers.get_functions(skip_thunks=True, skip_libs=True):
f: idaapi.func_t = fhandle.inner
for bb in idaapi.FlowChart(f, flags=idaapi.FC_PREDS):
features.extend(list(extract_features(f, bb)))
features.extend(list(extract_features(fhandle, bb)))
import pprint

View File

@@ -5,6 +5,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple, Iterator
import idaapi
import capa.ida.helpers
@@ -14,57 +16,20 @@ import capa.features.extractors.ida.insn
import capa.features.extractors.ida.global_
import capa.features.extractors.ida.function
import capa.features.extractors.ida.basicblock
from capa.features.extractors.base_extractor import FeatureExtractor
class FunctionHandle:
"""this acts like an idaapi.func_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class BasicBlockHandle:
"""this acts like an idaapi.BasicBlock but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class InstructionHandle:
"""this acts like an idaapi.insn_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.ea
def __getattr__(self, name):
return getattr(self._inner, name)
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
class IdaFeatureExtractor(FeatureExtractor):
def __init__(self):
super(IdaFeatureExtractor, self).__init__()
self.global_features = []
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.ida.global_.extract_os())
self.global_features.extend(capa.features.extractors.ida.global_.extract_arch())
def get_base_address(self):
return idaapi.get_imagebase()
return AbsoluteVirtualAddress(idaapi.get_imagebase())
def extract_global_features(self):
yield from self.global_features
@@ -72,41 +37,34 @@ class IdaFeatureExtractor(FeatureExtractor):
def extract_file_features(self):
yield from capa.features.extractors.ida.file.extract_features()
def get_functions(self):
def get_functions(self) -> Iterator[FunctionHandle]:
import capa.features.extractors.ida.helpers as ida_helpers
# data structure shared across functions yielded here.
# useful for caching analysis relevant across a single workspace.
ctx = {}
# ignore library functions and thunk functions as identified by IDA
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
setattr(f, "ctx", ctx)
yield FunctionHandle(f)
yield from ida_helpers.get_functions(skip_thunks=True, skip_libs=True)
@staticmethod
def get_function(ea):
def get_function(ea: int) -> FunctionHandle:
f = idaapi.get_func(ea)
setattr(f, "ctx", {})
return FunctionHandle(f)
return FunctionHandle(address=AbsoluteVirtualAddress(f.start_ea), inner=f)
def extract_function_features(self, f):
yield from capa.features.extractors.ida.function.extract_features(f)
def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.ida.function.extract_features(fh)
def get_basic_blocks(self, f):
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
import capa.features.extractors.ida.helpers as ida_helpers
for bb in ida_helpers.get_function_blocks(f):
yield BasicBlockHandle(bb)
for bb in ida_helpers.get_function_blocks(fh.inner):
yield BBHandle(address=AbsoluteVirtualAddress(bb.start_ea), inner=bb)
def extract_basic_block_features(self, f, bb):
yield from capa.features.extractors.ida.basicblock.extract_features(f, bb)
def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.ida.basicblock.extract_features(fh, bbh)
def get_instructions(self, f, bb):
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
import capa.features.extractors.ida.helpers as ida_helpers
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
yield InstructionHandle(insn)
for insn in ida_helpers.get_instructions_in_range(bbh.inner.start_ea, bbh.inner.end_ea):
yield InsnHandle(address=AbsoluteVirtualAddress(insn.ea), inner=insn)
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.ida.insn.extract_features(f, bb, insn)
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
yield from capa.features.extractors.ida.insn.extract_features(fh, bbh, ih)

View File

@@ -7,27 +7,26 @@
# See the License for the specific language governing permissions and limitations under the License.
import struct
from typing import Tuple, Iterator
import idc
import idaapi
import idautils
import ida_loader
import capa.features.extractors.common
import capa.features.extractors.helpers
import capa.features.extractors.strings
import capa.features.extractors.ida.helpers
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, Format, String, Characteristic
from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress
def check_segment_for_pe(seg):
def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[Tuple[int, int]]:
"""check segment for embedded PE
adapted for IDA from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
args:
seg (IDA segment_t)
"""
seg_max = seg.end_ea
mz_xor = [
@@ -60,13 +59,13 @@ def check_segment_for_pe(seg):
continue
if idc.get_bytes(peoff, 2) == pex:
yield (off, i)
yield off, i
for nextres in capa.features.extractors.ida.helpers.find_byte_sequence(off + 1, seg.end_ea, mzx):
todo.append((nextres, mzx, pex, i))
def extract_file_embedded_pe():
def extract_file_embedded_pe() -> Iterator[Tuple[Feature, Address]]:
"""extract embedded PE features
IDA must load resource sections for this to be complete
@@ -75,16 +74,16 @@ def extract_file_embedded_pe():
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
for (ea, _) in check_segment_for_pe(seg):
yield Characteristic("embedded pe"), ea
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
def extract_file_export_names():
def extract_file_export_names() -> Iterator[Tuple[Feature, Address]]:
"""extract function exports"""
for (_, _, ea, name) in idautils.Entries():
yield Export(name), ea
yield Export(name), AbsoluteVirtualAddress(ea)
def extract_file_import_names():
def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]:
"""extract function imports
1. imports by ordinal:
@@ -96,11 +95,12 @@ def extract_file_import_names():
- importname
"""
for (ea, info) in capa.features.extractors.ida.helpers.get_file_imports().items():
addr = AbsoluteVirtualAddress(ea)
if info[1] and info[2]:
# e.g. in mimikatz: ('cabinet', 'FCIAddFile', 11L)
# extract by name here and by ordinal below
for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1]):
yield Import(name), ea
yield Import(name), addr
dll = info[0]
symbol = "#%d" % (info[2])
elif info[1]:
@@ -113,10 +113,10 @@ def extract_file_import_names():
continue
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield Import(name), ea
yield Import(name), addr
def extract_file_section_names():
def extract_file_section_names() -> Iterator[Tuple[Feature, Address]]:
"""extract section names
IDA must load resource sections for this to be complete
@@ -124,10 +124,10 @@ def extract_file_section_names():
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
yield Section(idaapi.get_segm_name(seg)), seg.start_ea
yield Section(idaapi.get_segm_name(seg)), AbsoluteVirtualAddress(seg.start_ea)
def extract_file_strings():
def extract_file_strings() -> Iterator[Tuple[Feature, Address]]:
"""extract ASCII and UTF-16 LE strings
IDA must load resource sections for this to be complete
@@ -136,37 +136,33 @@ def extract_file_strings():
"""
for seg in capa.features.extractors.ida.helpers.get_segments():
seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(seg)
for s in capa.features.extractors.strings.extract_ascii_strings(seg_buff):
yield String(s.s), (seg.start_ea + s.offset)
for s in capa.features.extractors.strings.extract_unicode_strings(seg_buff):
yield String(s.s), (seg.start_ea + s.offset)
yield from capa.features.extractors.common.extract_file_strings(seg_buff)
def extract_file_function_names():
def extract_file_function_names() -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of statically-linked library functions.
"""
for ea in idautils.Functions():
addr = AbsoluteVirtualAddress(ea)
if idaapi.get_func(ea).flags & idaapi.FUNC_LIB:
name = idaapi.get_name(ea)
yield FunctionName(name), ea
yield FunctionName(name), addr
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), ea
yield FunctionName(name[1:]), addr
def extract_file_format():
def extract_file_format() -> Iterator[Tuple[Feature, Address]]:
file_info = idaapi.get_inf_structure()
if file_info.filetype == idaapi.f_PE:
yield Format(FORMAT_PE), 0x0
yield Format(FORMAT_PE), NO_ADDRESS
elif file_info.filetype == idaapi.f_ELF:
yield Format(FORMAT_ELF), 0x0
yield Format(FORMAT_ELF), NO_ADDRESS
elif file_info.filetype == idaapi.f_BIN:
# no file type to return when processing a binary file, but we want to continue processing
return
@@ -174,11 +170,11 @@ def extract_file_format():
raise NotImplementedError("file format: %d" % file_info.filetype)
def extract_features():
def extract_features() -> Iterator[Tuple[Feature, Address]]:
"""extract file features"""
for file_handler in FILE_HANDLERS:
for feature, va in file_handler():
yield feature, va
for feature, addr in file_handler():
yield feature, addr
FILE_HANDLERS = (

View File

@@ -5,31 +5,27 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Tuple, Iterator
import idaapi
import idautils
import capa.features.extractors.ida.helpers
from capa.features.common import Characteristic
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle
def extract_function_calls_to(f):
"""extract callers to a function
args:
f (IDA func_t)
"""
for ea in idautils.CodeRefsTo(f.start_ea, True):
yield Characteristic("calls to"), ea
def extract_function_calls_to(fh: FunctionHandle):
"""extract callers to a function"""
for ea in idautils.CodeRefsTo(fh.inner.start_ea, True):
yield Characteristic("calls to"), AbsoluteVirtualAddress(ea)
def extract_function_loop(f):
"""extract loop indicators from a function
args:
f (IDA func_t)
"""
def extract_function_loop(fh: FunctionHandle):
"""extract loop indicators from a function"""
f: idaapi.func_t = fh.inner
edges = []
# construct control flow graph
@@ -41,25 +37,16 @@ def extract_function_loop(f):
yield Characteristic("loop"), f.start_ea
def extract_recursive_call(f):
"""extract recursive function call
args:
f (IDA func_t)
"""
if capa.features.extractors.ida.helpers.is_function_recursive(f):
yield Characteristic("recursive call"), f.start_ea
def extract_recursive_call(fh: FunctionHandle):
"""extract recursive function call"""
if capa.features.extractors.ida.helpers.is_function_recursive(fh.inner):
yield Characteristic("recursive call"), fh.address
def extract_features(f):
"""extract function features
arg:
f (IDA func_t)
"""
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
for func_handler in FUNCTION_HANDLERS:
for (feature, ea) in func_handler(f):
yield feature, ea
for (feature, addr) in func_handler(fh):
yield feature, addr
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
@@ -68,8 +55,8 @@ FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_r
def main():
""" """
features = []
for f in capa.features.extractors.ida.get_functions(skip_thunks=True, skip_libs=True):
features.extend(list(extract_features(f)))
for fhandle in capa.features.extractors.ida.helpers.get_functions(skip_thunks=True, skip_libs=True):
features.extend(list(extract_features(fhandle)))
import pprint

View File

@@ -1,27 +1,29 @@
import logging
import contextlib
from typing import Tuple, Iterator
import idaapi
import ida_loader
import capa.ida.helpers
import capa.features.extractors.elf
from capa.features.common import OS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch
from capa.features.common import OS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Feature
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
def extract_os():
format_name = ida_loader.get_file_type_name()
def extract_os() -> Iterator[Tuple[Feature, Address]]:
format_name: str = ida_loader.get_file_type_name()
if "PE" in format_name:
yield OS(OS_WINDOWS), 0x0
yield OS(OS_WINDOWS), NO_ADDRESS
elif "ELF" in format_name:
with contextlib.closing(capa.ida.helpers.IDAIO()) as f:
os = capa.features.extractors.elf.detect_elf_os(f)
yield OS(os), 0x0
yield OS(os), NO_ADDRESS
else:
# we likely end up here:
@@ -38,12 +40,12 @@ def extract_os():
return
def extract_arch():
info = idaapi.get_inf_structure()
def extract_arch() -> Iterator[Tuple[Feature, Address]]:
info: idaapi.idainfo = idaapi.get_inf_structure()
if info.procname == "metapc" and info.is_64bit():
yield Arch(ARCH_AMD64), 0x0
yield Arch(ARCH_AMD64), NO_ADDRESS
elif info.procname == "metapc" and info.is_32bit():
yield Arch(ARCH_I386), 0x0
yield Arch(ARCH_I386), NO_ADDRESS
elif info.procname == "metapc":
logger.debug("unsupported architecture: non-32-bit nor non-64-bit intel")
return

View File

@@ -5,14 +5,18 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Any, Dict, Tuple, Iterator
import idc
import idaapi
import idautils
import ida_bytes
from capa.features.address import AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FunctionHandle
def find_byte_sequence(start, end, seq):
def find_byte_sequence(start: int, end: int, seq: bytes) -> Iterator[int]:
"""yield all ea of a given byte sequence
args:
@@ -20,32 +24,32 @@ def find_byte_sequence(start, end, seq):
end: max virtual address
seq: bytes to search e.g. b"\x01\x03"
"""
seq = " ".join(["%02x" % b for b in seq])
seqstr = " ".join(["%02x" % b for b in seq])
while True:
ea = idaapi.find_binary(start, end, seq, 0, idaapi.SEARCH_DOWN)
# TODO find_binary: Deprecated. Please use ida_bytes.bin_search() instead.
ea = idaapi.find_binary(start, end, seqstr, 0, idaapi.SEARCH_DOWN)
if ea == idaapi.BADADDR:
break
start = ea + 1
yield ea
def get_functions(start=None, end=None, skip_thunks=False, skip_libs=False):
def get_functions(
start: int = None, end: int = None, skip_thunks: bool = False, skip_libs: bool = False
) -> Iterator[FunctionHandle]:
"""get functions, range optional
args:
start: min virtual address
end: max virtual address
ret:
yield func_t*
"""
for ea in idautils.Functions(start=start, end=end):
f = idaapi.get_func(ea)
if not (skip_thunks and (f.flags & idaapi.FUNC_THUNK) or skip_libs and (f.flags & idaapi.FUNC_LIB)):
yield f
yield FunctionHandle(address=AbsoluteVirtualAddress(ea), inner=f)
def get_segments(skip_header_segments=False):
def get_segments(skip_header_segments=False) -> Iterator[idaapi.segment_t]:
"""get list of segments (sections) in the binary image
args:
@@ -57,7 +61,7 @@ def get_segments(skip_header_segments=False):
yield seg
def get_segment_buffer(seg):
def get_segment_buffer(seg: idaapi.segment_t) -> bytes:
"""return bytes stored in a given segment
decrease buffer size until IDA is able to read bytes from the segment
@@ -75,7 +79,7 @@ def get_segment_buffer(seg):
return buff if buff else b""
def get_file_imports():
def get_file_imports() -> Dict[int, Tuple[str, str, int]]:
"""get file imports"""
imports = {}
@@ -105,14 +109,12 @@ def get_file_imports():
return imports
def get_instructions_in_range(start, end):
def get_instructions_in_range(start: int, end: int) -> Iterator[idaapi.insn_t]:
"""yield instructions in range
args:
start: virtual address (inclusive)
end: virtual address (exclusive)
yield:
(insn_t*)
"""
for head in idautils.Heads(start, end):
insn = idautils.DecodeInstruction(head)
@@ -120,7 +122,7 @@ def get_instructions_in_range(start, end):
yield insn
def is_operand_equal(op1, op2):
def is_operand_equal(op1: idaapi.op_t, op2: idaapi.op_t) -> bool:
"""compare two IDA op_t"""
if op1.flags != op2.flags:
return False
@@ -146,7 +148,7 @@ def is_operand_equal(op1, op2):
return True
def is_basic_block_equal(bb1, bb2):
def is_basic_block_equal(bb1: idaapi.BasicBlock, bb2: idaapi.BasicBlock) -> bool:
"""compare two IDA BasicBlock"""
if bb1.start_ea != bb2.start_ea:
return False
@@ -160,12 +162,12 @@ def is_basic_block_equal(bb1, bb2):
return True
def basic_block_size(bb):
def basic_block_size(bb: idaapi.BasicBlock) -> int:
"""calculate size of basic block"""
return bb.end_ea - bb.start_ea
def read_bytes_at(ea, count):
def read_bytes_at(ea: int, count: int) -> bytes:
""" """
# check if byte has a value, see get_wide_byte doc
if not idc.is_loaded(ea):
@@ -178,10 +180,10 @@ def read_bytes_at(ea, count):
return idc.get_bytes(ea, count)
def find_string_at(ea, min=4):
def find_string_at(ea: int, min_: int = 4) -> str:
"""check if ASCII string exists at a given virtual address"""
found = idaapi.get_strlit_contents(ea, -1, idaapi.STRTYPE_C)
if found and len(found) > min:
if found and len(found) > min_:
try:
found = found.decode("ascii")
# hacky check for IDA bug; get_strlit_contents also reads Unicode as
@@ -195,7 +197,7 @@ def find_string_at(ea, min=4):
return ""
def get_op_phrase_info(op):
def get_op_phrase_info(op: idaapi.op_t) -> Dict:
"""parse phrase features from operand
Pretty much dup of sark's implementation:
@@ -232,23 +234,23 @@ def get_op_phrase_info(op):
return {"base": base, "index": index, "scale": scale, "offset": offset}
def is_op_write(insn, op):
def is_op_write(insn: idaapi.insn_t, op: idaapi.op_t) -> bool:
"""Check if an operand is written to (destination operand)"""
return idaapi.has_cf_chg(insn.get_canon_feature(), op.n)
def is_op_read(insn, op):
def is_op_read(insn: idaapi.insn_t, op: idaapi.op_t) -> bool:
"""Check if an operand is read from (source operand)"""
return idaapi.has_cf_use(insn.get_canon_feature(), op.n)
def is_op_offset(insn, op):
def is_op_offset(insn: idaapi.insn_t, op: idaapi.op_t) -> bool:
"""Check is an operand has been marked as an offset (by auto-analysis or manually)"""
flags = idaapi.get_flags(insn.ea)
return ida_bytes.is_off(flags, op.n)
def is_sp_modified(insn):
def is_sp_modified(insn: idaapi.insn_t) -> bool:
"""determine if instruction modifies SP, ESP, RSP"""
for op in get_insn_ops(insn, target_ops=(idaapi.o_reg,)):
if op.reg == idautils.procregs.sp.reg and is_op_write(insn, op):
@@ -257,7 +259,7 @@ def is_sp_modified(insn):
return False
def is_bp_modified(insn):
def is_bp_modified(insn: idaapi.insn_t) -> bool:
"""check if instruction modifies BP, EBP, RBP"""
for op in get_insn_ops(insn, target_ops=(idaapi.o_reg,)):
if op.reg == idautils.procregs.bp.reg and is_op_write(insn, op):
@@ -266,12 +268,12 @@ def is_bp_modified(insn):
return False
def is_frame_register(reg):
def is_frame_register(reg: int) -> bool:
"""check if register is sp or bp"""
return reg in (idautils.procregs.sp.reg, idautils.procregs.bp.reg)
def get_insn_ops(insn, target_ops=()):
def get_insn_ops(insn: idaapi.insn_t, target_ops: Tuple[Any] = None) -> idaapi.op_t:
"""yield op_t for instruction, filter on type if specified"""
for op in insn.ops:
if op.type == idaapi.o_void:
@@ -282,12 +284,12 @@ def get_insn_ops(insn, target_ops=()):
yield op
def is_op_stack_var(ea, index):
def is_op_stack_var(ea: int, index: int) -> bool:
"""check if operand is a stack variable"""
return idaapi.is_stkvar(idaapi.get_flags(ea), index)
def mask_op_val(op):
def mask_op_val(op: idaapi.op_t) -> int:
"""mask value by data type
necessary due to a bug in AMD64
@@ -307,26 +309,18 @@ def mask_op_val(op):
return masks.get(op.dtype, op.value) & op.value
def is_function_recursive(f):
"""check if function is recursive
args:
f (IDA func_t)
"""
def is_function_recursive(f: idaapi.func_t) -> bool:
"""check if function is recursive"""
for ref in idautils.CodeRefsTo(f.start_ea, True):
if f.contains(ref):
return True
return False
def is_basic_block_tight_loop(bb):
def is_basic_block_tight_loop(bb: idaapi.BasicBlock) -> bool:
"""check basic block loops to self
true if last instruction in basic block branches to basic block start
args:
f (IDA func_t)
bb (IDA BasicBlock)
"""
bb_end = idc.prev_head(bb.end_ea)
if bb.start_ea < bb_end:
@@ -336,7 +330,7 @@ def is_basic_block_tight_loop(bb):
return False
def find_data_reference_from_insn(insn, max_depth=10):
def find_data_reference_from_insn(insn: idaapi.insn_t, max_depth: int = 10) -> int:
"""search for data reference from instruction, return address of instruction if no reference exists"""
depth = 0
ea = insn.ea
@@ -366,24 +360,18 @@ def find_data_reference_from_insn(insn, max_depth=10):
return ea
def get_function_blocks(f):
"""yield basic blocks contained in specified function
args:
f (IDA func_t)
yield:
block (IDA BasicBlock)
"""
def get_function_blocks(f: idaapi.func_t) -> Iterator[idaapi.BasicBlock]:
"""yield basic blocks contained in specified function"""
# leverage idaapi.FC_NOEXT flag to ignore useless external blocks referenced by the function
for block in idaapi.FlowChart(f, flags=(idaapi.FC_PREDS | idaapi.FC_NOEXT)):
yield block
def is_basic_block_return(bb):
def is_basic_block_return(bb: idaapi.BasicBlock) -> bool:
"""check if basic block is return block"""
return bb.type == idaapi.fcb_ret
def has_sib(oper) -> bool:
def has_sib(oper: idaapi.op_t) -> bool:
# via: https://reverseengineering.stackexchange.com/a/14300
return oper.specflag1 == 1

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Any, Dict, Tuple, Iterator
import idc
import idaapi
@@ -13,20 +14,22 @@ import idautils
import capa.features.extractors.helpers
import capa.features.extractors.ida.helpers
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Characteristic
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def get_imports(ctx):
def get_imports(ctx: Dict[str, Any]) -> Dict[str, Any]:
if "imports_cache" not in ctx:
ctx["imports_cache"] = capa.features.extractors.ida.helpers.get_file_imports()
return ctx["imports_cache"]
def check_for_api_call(ctx, insn):
def check_for_api_call(ctx: Dict[str, Any], insn: idaapi.insn_t) -> Iterator[str]:
"""check instruction for API call"""
info = ()
ref = insn.ea
@@ -55,24 +58,22 @@ def check_for_api_call(ctx, insn):
yield "%s.%s" % (info[0], info[1])
def extract_insn_api_features(f, bb, insn):
"""parse instruction API features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction API features
example:
call dword [0x00473038]
call dword [0x00473038]
"""
insn: idaapi.insn_t = ih.inner
if not insn.get_canon_mnem() in ("call", "jmp"):
return
for api in check_for_api_call(f.ctx, insn):
for api in check_for_api_call(fh.ctx, insn):
dll, _, symbol = api.rpartition(".")
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), insn.ea
yield API(name), ih.address
# extract IDA/FLIRT recognized API functions
targets = tuple(idautils.CodeRefsFrom(insn.ea, False))
@@ -87,26 +88,25 @@ def extract_insn_api_features(f, bb, insn):
if target_func.flags & idaapi.FUNC_LIB:
name = idaapi.get_name(target_func.start_ea)
yield API(name), insn.ea
yield API(name), ih.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield API(name[1:]), insn.ea
yield API(name[1:]), ih.address
def extract_insn_number_features(f, bb, insn):
"""parse instruction number features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_number_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction number features
example:
push 3136B0h ; dwControlCode
"""
insn: idaapi.insn_t = ih.inner
if idaapi.is_ret_insn(insn):
# skip things like:
# .text:0042250E retn 8
@@ -132,8 +132,8 @@ def extract_insn_number_features(f, bb, insn):
else:
const = op.addr
yield Number(const), insn.ea
yield OperandNumber(i, const), insn.ea
yield Number(const), ih.address
yield OperandNumber(i, const), ih.address
if insn.itype == idaapi.NN_add and 0 < const < MAX_STRUCTURE_SIZE and op.type == idaapi.o_imm:
# for pattern like:
@@ -141,21 +141,18 @@ def extract_insn_number_features(f, bb, insn):
# add eax, 0x10
#
# assume 0x10 is also an offset (imagine eax is a pointer).
yield Offset(const), insn.ea
yield OperandOffset(i, const), insn.ea
yield Offset(const), ih.address
yield OperandOffset(i, const), ih.address
def extract_insn_bytes_features(f, bb, insn):
"""parse referenced byte sequences
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse referenced byte sequences
example:
push offset iid_004118d4_IShellLinkA ; riid
"""
insn: idaapi.insn_t = ih.inner
if idaapi.is_call_insn(insn):
return
@@ -163,38 +160,38 @@ def extract_insn_bytes_features(f, bb, insn):
if ref != insn.ea:
extracted_bytes = capa.features.extractors.ida.helpers.read_bytes_at(ref, MAX_BYTES_FEATURE_SIZE)
if extracted_bytes and not capa.features.extractors.helpers.all_zeros(extracted_bytes):
yield Bytes(extracted_bytes), insn.ea
yield Bytes(extracted_bytes), ih.address
def extract_insn_string_features(f, bb, insn):
"""parse instruction string features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_string_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction string features
example:
push offset aAcr ; "ACR > "
"""
insn: idaapi.insn_t = ih.inner
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref != insn.ea:
found = capa.features.extractors.ida.helpers.find_string_at(ref)
if found:
yield String(found), insn.ea
yield String(found), ih.address
def extract_insn_offset_features(f, bb, insn):
"""parse instruction structure offset features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_offset_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction structure offset features
example:
.text:0040112F cmp [esi+4], ebx
"""
insn: idaapi.insn_t = ih.inner
for i, op in enumerate(insn.ops):
if op.type == idaapi.o_void:
break
@@ -215,8 +212,8 @@ def extract_insn_offset_features(f, bb, insn):
# https://stackoverflow.com/questions/31853189/x86-64-assembly-why-displacement-not-64-bits
op_off = capa.features.extractors.helpers.twos_complement(op_off, 32)
yield Offset(op_off), insn.ea
yield OperandOffset(i, op_off), insn.ea
yield Offset(op_off), ih.address
yield OperandOffset(i, op_off), ih.address
if (
insn.itype == idaapi.NN_lea
@@ -234,12 +231,13 @@ def extract_insn_offset_features(f, bb, insn):
# lea eax, [ebx + 1]
#
# assume 1 is also an offset (imagine ebx is a zero register).
yield Number(op_off), insn.ea
yield OperandNumber(i, op_off), insn.ea
yield Number(op_off), ih.address
yield OperandNumber(i, op_off), ih.address
def contains_stack_cookie_keywords(s):
"""check if string contains stack cookie keywords
def contains_stack_cookie_keywords(s: str) -> bool:
"""
check if string contains stack cookie keywords
Examples:
xor ecx, ebp ; StackCookie
@@ -253,7 +251,7 @@ def contains_stack_cookie_keywords(s):
return any(keyword in s for keyword in ("stack", "security"))
def bb_stack_cookie_registers(bb):
def bb_stack_cookie_registers(bb: idaapi.BasicBlock) -> Iterator[int]:
"""scan basic block for stack cookie operations
yield registers ids that may have been used for stack cookie operations
@@ -287,7 +285,7 @@ def bb_stack_cookie_registers(bb):
yield op.reg
def is_nzxor_stack_cookie_delta(f, bb, insn):
def is_nzxor_stack_cookie_delta(f: idaapi.func_t, bb: idaapi.BasicBlock, insn: idaapi.insn_t) -> bool:
"""check if nzxor exists within stack cookie delta"""
# security cookie check should use SP or BP
if not capa.features.extractors.ida.helpers.is_frame_register(insn.Op2.reg):
@@ -310,7 +308,7 @@ def is_nzxor_stack_cookie_delta(f, bb, insn):
return False
def is_nzxor_stack_cookie(f, bb, insn):
def is_nzxor_stack_cookie(f: idaapi.func_t, bb: idaapi.BasicBlock, insn: idaapi.insn_t) -> bool:
"""check if nzxor is related to stack cookie"""
if contains_stack_cookie_keywords(idaapi.get_cmt(insn.ea, False)):
# Example:
@@ -327,48 +325,49 @@ def is_nzxor_stack_cookie(f, bb, insn):
return False
def extract_insn_nzxor_characteristic_features(f, bb, insn):
"""parse instruction non-zeroing XOR instruction
ignore expected non-zeroing XORs, e.g. security cookies
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
def extract_insn_nzxor_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction non-zeroing XOR instruction
ignore expected non-zeroing XORs, e.g. security cookies
"""
insn: idaapi.insn_t = ih.inner
if insn.itype not in (idaapi.NN_xor, idaapi.NN_xorpd, idaapi.NN_xorps, idaapi.NN_pxor):
return
if capa.features.extractors.ida.helpers.is_operand_equal(insn.Op1, insn.Op2):
return
if is_nzxor_stack_cookie(f, bb, insn):
if is_nzxor_stack_cookie(fh.inner, bbh.inner, insn):
return
yield Characteristic("nzxor"), insn.ea
yield Characteristic("nzxor"), ih.address
def extract_insn_mnemonic_features(f, bb, insn):
"""parse instruction mnemonic features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
yield Mnemonic(idc.print_insn_mnem(insn.ea)), insn.ea
def extract_insn_mnemonic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction mnemonic features"""
yield Mnemonic(idc.print_insn_mnem(ih.inner.ea)), ih.address
def extract_insn_obfs_call_plus_5_characteristic_features(f, bb, insn):
def extract_insn_obfs_call_plus_5_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse call $+5 instruction from the given instruction.
"""
insn: idaapi.insn_t = ih.inner
if not idaapi.is_call_insn(insn):
return
if insn.ea + 5 == idc.get_operand_value(insn.ea, 0):
yield Characteristic("call $+5"), insn.ea
yield Characteristic("call $+5"), ih.address
def extract_insn_peb_access_characteristic_features(f, bb, insn):
def extract_insn_peb_access_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction peb access
fs:[0x30] on x86, gs:[0x60] on x64
@@ -376,6 +375,8 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
TODO:
IDA should be able to do this..
"""
insn: idaapi.insn_t = ih.inner
if insn.itype not in (idaapi.NN_push, idaapi.NN_mov):
return
@@ -387,15 +388,19 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
if " fs:30h" in disasm or " gs:60h" in disasm:
# TODO: replace above with proper IDA
yield Characteristic("peb access"), insn.ea
yield Characteristic("peb access"), ih.address
def extract_insn_segment_access_features(f, bb, insn):
def extract_insn_segment_access_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction fs or gs access
TODO:
IDA should be able to do this...
"""
insn: idaapi.insn_t = ih.inner
if all(map(lambda op: op.type != idaapi.o_mem, insn.ops)):
# try to optimize for only memory references
return
@@ -404,23 +409,21 @@ def extract_insn_segment_access_features(f, bb, insn):
if " fs:" in disasm:
# TODO: replace above with proper IDA
yield Characteristic("fs access"), insn.ea
yield Characteristic("fs access"), ih.address
if " gs:" in disasm:
# TODO: replace above with proper IDA
yield Characteristic("gs access"), insn.ea
yield Characteristic("gs access"), ih.address
def extract_insn_cross_section_cflow(f, bb, insn):
"""inspect the instruction for a CALL or JMP that crosses section boundaries
def extract_insn_cross_section_cflow(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""inspect the instruction for a CALL or JMP that crosses section boundaries"""
insn: idaapi.insn_t = ih.inner
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
for ref in idautils.CodeRefsFrom(insn.ea, False):
if ref in get_imports(f.ctx).keys():
if ref in get_imports(fh.ctx).keys():
# ignore API calls
continue
if not idaapi.getseg(ref):
@@ -428,50 +431,40 @@ def extract_insn_cross_section_cflow(f, bb, insn):
continue
if idaapi.getseg(ref) == idaapi.getseg(insn.ea):
continue
yield Characteristic("cross section flow"), insn.ea
yield Characteristic("cross section flow"), ih.address
def extract_function_calls_from(f, bb, insn):
def extract_function_calls_from(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract functions calls from features
most relevant at the function scope, however, its most efficient to extract at the instruction scope
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
insn: idaapi.insn_t = ih.inner
if idaapi.is_call_insn(insn):
for ref in idautils.CodeRefsFrom(insn.ea, False):
yield Characteristic("calls from"), ref
yield Characteristic("calls from"), AbsoluteVirtualAddress(ref)
def extract_function_indirect_call_characteristic_features(f, bb, insn):
def extract_function_indirect_call_characteristic_features(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
most relevant at the function or basic block scope;
however, its most efficient to extract at the instruction scope
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
insn: idaapi.insn_t = ih.inner
if idaapi.is_call_insn(insn) and idc.get_operand_type(insn.ea, 0) in (idc.o_reg, idc.o_phrase, idc.o_displ):
yield Characteristic("indirect call"), insn.ea
yield Characteristic("indirect call"), ih.address
def extract_features(f, bb, insn):
"""extract instruction features
args:
f (IDA func_t)
bb (IDA BasicBlock)
insn (IDA insn_t)
"""
def extract_features(f: FunctionHandle, bbh: BBHandle, insn: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract instruction features"""
for inst_handler in INSTRUCTION_HANDLERS:
for (feature, ea) in inst_handler(f, bb, insn):
for (feature, ea) in inst_handler(f, bbh, insn):
yield feature, ea

View File

@@ -39,19 +39,19 @@ class SmdaFeatureExtractor(FeatureExtractor):
for function in self.smda_report.getFunctions():
yield FunctionHandle(address=AbsoluteVirtualAddress(function.offset), inner=function)
def extract_function_features(self, f):
yield from capa.features.extractors.smda.function.extract_features(f)
def extract_function_features(self, fh):
yield from capa.features.extractors.smda.function.extract_features(fh)
def get_basic_blocks(self, f):
for bb in f.getBlocks():
def get_basic_blocks(self, fh):
for bb in fh.inner.getBlocks():
yield BBHandle(address=AbsoluteVirtualAddress(bb.offset), inner=bb)
def extract_basic_block_features(self, f, bb):
yield from capa.features.extractors.smda.basicblock.extract_features(f, bb)
def extract_basic_block_features(self, fh, bbh):
yield from capa.features.extractors.smda.basicblock.extract_features(fh, bbh)
def get_instructions(self, f, bb):
for smda_ins in bb.getInstructions():
def get_instructions(self, fh, bbh):
for smda_ins in bbh.inner.getInstructions():
yield InsnHandle(address=AbsoluteVirtualAddress(smda_ins.offset), inner=smda_ins)
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.smda.insn.extract_features(f, bb, insn)
def extract_insn_features(self, fh, bbh, ih):
yield from capa.features.extractors.smda.insn.extract_features(fh, bbh, ih)

View File

@@ -271,10 +271,10 @@ def is_security_cookie(f, bb, insn):
for index, block in enumerate(f.getBlocks()):
# expect security cookie init in first basic block within first bytes (instructions)
block_instructions = [i for i in block.getInstructions()]
if index == 0 and insn.address < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA):
if index == 0 and insn.offset < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
if block_instructions[-1].mnemonic.startswith("ret") and insn.address > (
if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > (
block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA
):
return True

View File

@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import List, Tuple
from typing import List, Tuple, Iterator
import viv_utils
import viv_utils.flirt
@@ -24,7 +24,7 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi
logger = logging.getLogger(__name__)
class InstructionHandle:
class VivInstructionHandle:
"""this acts like a vivisect.Opcode but with an __int__() method"""
def __init__(self, inner):
@@ -60,28 +60,30 @@ class VivisectFeatureExtractor(FeatureExtractor):
def extract_file_features(self):
yield from capa.features.extractors.viv.file.extract_features(self.vw, self.buf)
def get_functions(self):
def get_functions(self) -> Iterator[FunctionHandle]:
for va in sorted(self.vw.getFunctions()):
yield FunctionHandle(address=AbsoluteVirtualAddress(va), inner=viv_utils.Function(self.vw, va))
def extract_function_features(self, f):
yield from capa.features.extractors.viv.function.extract_features(f)
def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.viv.function.extract_features(fh)
def get_basic_blocks(self, fh: FunctionHandle):
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
f: viv_utils.Function = fh.inner
for bb in f.basic_blocks:
yield BBHandle(address=AbsoluteVirtualAddress(bb.va), inner=bb)
def extract_basic_block_features(self, f, bb):
yield from capa.features.extractors.viv.basicblock.extract_features(f, bb)
def extract_basic_block_features(self, fh: FunctionHandle, bbh) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.viv.basicblock.extract_features(fh, bbh)
def get_instructions(self, f, bbh: BBHandle):
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
bb: viv_utils.BasicBlock = bbh.inner
for insn in bb.instructions:
yield InsnHandle(address=AbsoluteVirtualAddress(insn.va), inner=insn)
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.viv.insn.extract_features(f, bb, insn)
def extract_insn_features(
self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.viv.insn.extract_features(fh, bbh, ih)
def is_library_function(self, addr):
return viv_utils.flirt.is_library_function(self.vw, addr)

View File

@@ -17,7 +17,7 @@ from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle
def interface_extract_function_XXX(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
def interface_extract_function_XXX(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse features from the given function.
@@ -33,7 +33,7 @@ def interface_extract_function_XXX(f: FunctionHandle) -> Iterator[Tuple[Feature,
def extract_function_calls_to(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
f: viv_utils.Function = fhandle.inner
for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE):
yield Characteristic("calls to"), fhandle.address
yield Characteristic("calls to"), AbsoluteVirtualAddress(src)
def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
@@ -60,18 +60,18 @@ def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Ad
yield Characteristic("loop"), fhandle.address
def extract_features(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract features from the given function.
args:
f (viv_utils.Function): the function from which to extract features
fh: the function handle from which to extract features
yields:
Tuple[Feature, int]: the features and their location found in this function.
"""
for func_handler in FUNCTION_HANDLERS:
for feature, addr in func_handler(f):
for feature, addr in func_handler(fh):
yield feature, addr

View File

@@ -16,7 +16,7 @@ import envi.archs.amd64.disasm
from vivisect import VivWorkspace
if TYPE_CHECKING:
from capa.features.extractors.viv.extractor import InstructionHandle
from capa.features.extractors.viv.extractor import VivInstructionHandle
# pull out consts for lookup performance
i386RegOper = envi.archs.i386.disasm.i386RegOper
@@ -135,7 +135,7 @@ def find_definition(vw: VivWorkspace, va: int, reg: int) -> Tuple[int, Union[int
raise NotFoundError()
def is_indirect_call(vw: VivWorkspace, va: int, insn: Optional["InstructionHandle"] = None) -> bool:
def is_indirect_call(vw: VivWorkspace, va: int, insn: Optional["VivInstructionHandle"] = None) -> bool:
if insn is None:
insn = vw.parseOpcode(va)
@@ -143,7 +143,7 @@ def is_indirect_call(vw: VivWorkspace, va: int, insn: Optional["InstructionHandl
def resolve_indirect_call(
vw: VivWorkspace, va: int, insn: Optional["InstructionHandle"] = None
vw: VivWorkspace, va: int, insn: Optional["VivInstructionHandle"] = None
) -> Tuple[int, Optional[int]]:
"""
inspect the given indirect call instruction and attempt to resolve the target address.

View File

@@ -30,14 +30,16 @@ from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_i
SECURITY_COOKIE_BYTES_DELTA = 0x40
def interface_extract_instruction_XXX(f, bb, insn):
def interface_extract_instruction_XXX(
fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse features from the given instruction.
args:
f: the function to process.
bb: the basic block to process.
insn: the instruction to process.
fh: the function handle to process.
bbh: the basic block handle to process.
ih: the instruction handle to process.
yields:
(Feature, Address): the feature and the address at which its found.
@@ -62,14 +64,15 @@ def get_imports(vw):
return imports
def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse API features from the given instruction."""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
def extract_insn_api_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse API features from the given instruction.
# example:
#
# call dword [0x00473038]
example:
call dword [0x00473038]
"""
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
if insn.mnem not in ("call", "jmp"):
return
@@ -86,7 +89,7 @@ def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle)
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), ihandle.address
yield API(name), ih.address
# call via thunk on x86,
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
@@ -108,20 +111,20 @@ def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle)
if viv_utils.flirt.is_library_function(f.vw, target):
name = viv_utils.get_function_name(f.vw, target)
yield API(name), ihandle.address
yield API(name), ih.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield API(name[1:]), ihandle.address
yield API(name[1:]), ih.address
return
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), ihandle.address
yield API(name), ih.address
# if jump leads to an ENDBRANCH instruction, skip it
if f.vw.getByteDef(target)[1].startswith(b"\xf3\x0f\x1e"):
@@ -141,7 +144,7 @@ def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle)
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), ihandle.address
yield API(name), ih.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper):
try:
@@ -158,7 +161,7 @@ def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle)
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), ihandle.address
yield API(name), ih.address
def derefs(vw, p):
@@ -231,14 +234,14 @@ def read_bytes(vw, va: int) -> bytes:
raise
def extract_insn_bytes_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_bytes_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse byte sequence features from the given instruction.
example:
# push offset iid_004118d4_IShellLinkA ; riid
"""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
if insn.mnem == "call":
return
@@ -268,7 +271,7 @@ def extract_insn_bytes_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle
if capa.features.extractors.helpers.all_zeros(buf):
continue
yield Bytes(buf), ihandle.address
yield Bytes(buf), ih.address
def read_string(vw, offset: int) -> str:
@@ -331,15 +334,15 @@ def is_security_cookie(f, bb, insn) -> bool:
def extract_insn_nzxor_characteristic_features(
fhandle: FunctionHandle, bbhandle: BBHandle, ihandle: InsnHandle
fh: FunctionHandle, bbhandle: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse non-zeroing XOR instruction from the given instruction.
ignore expected non-zeroing XORs, e.g. security cookies.
"""
insn: envi.Opcode = ihandle.inner
insn: envi.Opcode = ih.inner
bb: viv_utils.BasicBlock = bbhandle.inner
f: viv_utils.Function = fhandle.inner
f: viv_utils.Function = fh.inner
if insn.mnem not in ("xor", "xorpd", "xorps", "pxor"):
return
@@ -350,42 +353,40 @@ def extract_insn_nzxor_characteristic_features(
if is_security_cookie(f, bb, insn):
return
yield Characteristic("nzxor"), ihandle.address
yield Characteristic("nzxor"), ih.address
def extract_insn_mnemonic_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_mnemonic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse mnemonic features from the given instruction."""
yield Mnemonic(ihandle.inner.mnem), ihandle.address
yield Mnemonic(ih.inner.mnem), ih.address
def extract_insn_obfs_call_plus_5_characteristic_features(
f, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_obfs_call_plus_5_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse call $+5 instruction from the given instruction.
"""
insn: envi.Opcode = ihandle.inner
insn: envi.Opcode = ih.inner
if insn.mnem != "call":
return
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386PcRelOper):
if insn.va + 5 == insn.opers[0].getOperValue(insn):
yield Characteristic("call $+5"), ihandle.address
yield Characteristic("call $+5"), ih.address
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386ImmMemOper) or isinstance(
insn.opers[0], envi.archs.amd64.disasm.Amd64RipRelOper
):
if insn.va + 5 == insn.opers[0].getOperAddr(insn):
yield Characteristic("call $+5"), ihandle.address
yield Characteristic("call $+5"), ih.address
def extract_insn_peb_access_characteristic_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_peb_access_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
"""
# TODO handle where fs/gs are loaded into a register or onto the stack and used later
insn: envi.Opcode = ihandle.inner
insn: envi.Opcode = ih.inner
if insn.mnem not in ["push", "mov"]:
return
@@ -404,7 +405,7 @@ def extract_insn_peb_access_characteristic_features(f, bb, ihandle: InsnHandle)
if (isinstance(oper, envi.archs.i386.disasm.i386RegMemOper) and oper.disp == 0x30) or (
isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper) and oper.imm == 0x30
):
yield Characteristic("peb access"), ihandle.address
yield Characteristic("peb access"), ih.address
elif "gs" in prefix:
for oper in insn.opers:
if (
@@ -412,22 +413,22 @@ def extract_insn_peb_access_characteristic_features(f, bb, ihandle: InsnHandle)
or (isinstance(oper, envi.archs.amd64.disasm.i386SibOper) and oper.imm == 0x60)
or (isinstance(oper, envi.archs.amd64.disasm.i386ImmMemOper) and oper.imm == 0x60)
):
yield Characteristic("peb access"), ihandle.address
yield Characteristic("peb access"), ih.address
else:
pass
def extract_insn_segment_access_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_segment_access_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse the instruction for access to fs or gs"""
insn: envi.Opcode = ihandle.inner
insn: envi.Opcode = ih.inner
prefix = insn.getPrefixName()
if prefix == "fs":
yield Characteristic("fs access"), ihandle.address
yield Characteristic("fs access"), ih.address
if prefix == "gs":
yield Characteristic("gs access"), ihandle.address
yield Characteristic("gs access"), ih.address
def get_section(vw, va: int):
@@ -438,14 +439,12 @@ def get_section(vw, va: int):
raise KeyError(va)
def extract_insn_cross_section_cflow(
fhandle: FunctionHandle, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_cross_section_cflow(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
inspect the instruction for a CALL or JMP that crosses section boundaries.
"""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
for va, flags in insn.getBranches():
if va is None:
@@ -473,7 +472,7 @@ def extract_insn_cross_section_cflow(
continue
if get_section(f.vw, insn.va) != get_section(f.vw, va):
yield Characteristic("cross section flow"), ihandle.address
yield Characteristic("cross section flow"), ih.address
except KeyError:
continue
@@ -481,9 +480,9 @@ def extract_insn_cross_section_cflow(
# this is a feature that's most relevant at the function scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_calls_from(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
def extract_function_calls_from(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
if insn.mnem != "call":
return
@@ -519,14 +518,12 @@ def extract_function_calls_from(fhandle: FunctionHandle, bb, ihandle: InsnHandle
# this is a feature that's most relevant at the function or basic block scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_indirect_call_characteristic_features(
f, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
def extract_function_indirect_call_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""
insn: envi.Opcode = ihandle.inner
insn: envi.Opcode = ih.inner
if insn.mnem != "call":
return
@@ -534,24 +531,25 @@ def extract_function_indirect_call_characteristic_features(
# Checks below work for x86 and x64
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper):
# call edx
yield Characteristic("indirect call"), ihandle.address
yield Characteristic("indirect call"), ih.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegMemOper):
# call dword ptr [eax+50h]
yield Characteristic("indirect call"), ihandle.address
yield Characteristic("indirect call"), ih.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386SibOper):
# call qword ptr [rsp+78h]
yield Characteristic("indirect call"), ihandle.address
yield Characteristic("indirect call"), ih.address
def extract_op_number_features(
fhandle: FunctionHandle, bb, ihandle: InsnHandle, i, oper: envi.Operand
fh: FunctionHandle, bb, ih: InsnHandle, i, oper: envi.Operand
) -> Iterator[Tuple[Feature, Address]]:
"""parse number features from the given operand."""
# example:
#
# push 3136B0h ; dwControlCode
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
"""parse number features from the given operand.
example:
push 3136B0h ; dwControlCode
"""
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
# this is for both x32 and x64
if not isinstance(oper, (envi.archs.i386.disasm.i386ImmOper, envi.archs.i386.disasm.i386ImmMemOper)):
@@ -574,8 +572,8 @@ def extract_op_number_features(
# .text:00401145 add esp, 0Ch
return
yield Number(v), ihandle.address
yield OperandNumber(i, v), ihandle.address
yield Number(v), ih.address
yield OperandNumber(i, v), ih.address
if insn.mnem == "add" and 0 < v < MAX_STRUCTURE_SIZE and isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
# for pattern like:
@@ -583,19 +581,19 @@ def extract_op_number_features(
# add eax, 0x10
#
# assume 0x10 is also an offset (imagine eax is a pointer).
yield Offset(v), ihandle.address
yield OperandOffset(i, v), ihandle.address
yield Offset(v), ih.address
yield OperandOffset(i, v), ih.address
def extract_op_offset_features(
fhandle: FunctionHandle, bb, ihandle: InsnHandle, i, oper: envi.Operand
fh: FunctionHandle, bb, ih: InsnHandle, i, oper: envi.Operand
) -> Iterator[Tuple[Feature, Address]]:
"""parse structure offset features from the given operand."""
# example:
#
# .text:0040112F cmp [esi+4], ebx
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
# this is for both x32 and x64
# like [esi + 4]
@@ -615,8 +613,8 @@ def extract_op_offset_features(
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), ihandle.address
yield OperandOffset(i, v), ihandle.address
yield Offset(v), ih.address
yield OperandOffset(i, v), ih.address
if insn.mnem == "lea" and i == 1 and not f.vw.probeMemory(v, 1, envi.memory.MM_READ):
# for pattern like:
@@ -624,8 +622,8 @@ def extract_op_offset_features(
# lea eax, [ebx + 1]
#
# assume 1 is also an offset (imagine ebx is a zero register).
yield Number(v), ihandle.address
yield OperandNumber(i, v), ihandle.address
yield Number(v), ih.address
yield OperandNumber(i, v), ih.address
# like: [esi + ecx + 16384]
# reg ^ ^
@@ -635,19 +633,19 @@ def extract_op_offset_features(
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), ihandle.address
yield OperandOffset(i, v), ihandle.address
yield Offset(v), ih.address
yield OperandOffset(i, v), ih.address
def extract_op_string_features(
fhandle: FunctionHandle, bb, ihandle: InsnHandle, i, oper: envi.Operand
fh: FunctionHandle, bb, ih: InsnHandle, i, oper: envi.Operand
) -> Iterator[Tuple[Feature, Address]]:
"""parse string features from the given operand."""
# example:
#
# push offset aAcr ; "ACR > "
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
insn: envi.Opcode = ih.inner
f: viv_utils.Function = fh.inner
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
@@ -668,7 +666,7 @@ def extract_op_string_features(
except ValueError:
continue
else:
yield String(s.rstrip("\x00")), ihandle.address
yield String(s.rstrip("\x00")), ih.address
def extract_operand_features(f: FunctionHandle, bb, insn: InsnHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -5,7 +5,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import idaapi

View File

@@ -12,6 +12,7 @@ import json
import logging
import itertools
import collections
from typing import Set, Dict
import idaapi
import ida_kernwin
@@ -26,6 +27,7 @@ import capa.render.json
import capa.features.common
import capa.render.result_document
import capa.features.extractors.ida.extractor
from capa.features.common import Feature
from capa.ida.plugin.icon import QICON
from capa.ida.plugin.view import (
CapaExplorerQtreeView,
@@ -33,9 +35,11 @@ from capa.ida.plugin.view import (
CapaExplorerRulgenPreview,
CapaExplorerRulegenFeatures,
)
from capa.features.address import Address
from capa.ida.plugin.hooks import CapaExplorerIdaHooks
from capa.ida.plugin.model import CapaExplorerDataModel
from capa.ida.plugin.proxy import CapaExplorerRangeProxyModel, CapaExplorerSearchProxyModel
from capa.features.extractors.base_extractor import FunctionHandle
logger = logging.getLogger(__name__)
settings = ida_settings.IDASettings("capa")
@@ -66,27 +70,27 @@ def trim_function_name(f, max_length=25):
return n
def find_func_features(f, extractor):
def find_func_features(fh: FunctionHandle, extractor):
""" """
func_features = collections.defaultdict(set)
bb_features = collections.defaultdict(dict)
func_features: Dict[Feature, Set] = collections.defaultdict(set)
bb_features: Dict[Address, Dict] = collections.defaultdict(dict)
for (feature, ea) in extractor.extract_function_features(f):
func_features[feature].add(ea)
for (feature, addr) in extractor.extract_function_features(fh):
func_features[feature].add(addr)
for bb in extractor.get_basic_blocks(f):
for bbh in extractor.get_basic_blocks(fh):
_bb_features = collections.defaultdict(set)
for (feature, ea) in extractor.extract_basic_block_features(f, bb):
_bb_features[feature].add(ea)
func_features[feature].add(ea)
for (feature, addr) in extractor.extract_basic_block_features(fh, bbh):
_bb_features[feature].add(addr)
func_features[feature].add(addr)
for insn in extractor.get_instructions(f, bb):
for (feature, ea) in extractor.extract_insn_features(f, bb, insn):
_bb_features[feature].add(ea)
func_features[feature].add(ea)
for insn in extractor.get_instructions(fh, bbh):
for (feature, addr) in extractor.extract_insn_features(fh, bbh, insn):
_bb_features[feature].add(addr)
func_features[feature].add(addr)
bb_features[int(bb)] = _bb_features
bb_features[bbh.address] = _bb_features
return func_features, bb_features
@@ -173,9 +177,9 @@ class CapaExplorerFeatureExtractor(capa.features.extractors.ida.extractor.IdaFea
super(CapaExplorerFeatureExtractor, self).__init__()
self.indicator = CapaExplorerProgressIndicator()
def extract_function_features(self, f):
self.indicator.update("function at 0x%X" % f.start_ea)
return super(CapaExplorerFeatureExtractor, self).extract_function_features(f)
def extract_function_features(self, fh: FunctionHandle):
self.indicator.update("function at 0x%X" % fh.inner.start_ea)
return super(CapaExplorerFeatureExtractor, self).extract_function_features(fh)
class QLineEditClicked(QtWidgets.QLineEdit):
@@ -861,7 +865,7 @@ class CapaExplorerForm(idaapi.PluginForm):
# must use extractor to get function, as capa analysis requires casted object
extractor = CapaExplorerFeatureExtractor()
except Exception as e:
logger.error("Failed to load IDA feature extractor (error: %s)" % e)
logger.error("Failed to load IDA feature extractor (error: %s)", e)
return False
if ida_kernwin.user_cancelled():
@@ -894,7 +898,7 @@ class CapaExplorerForm(idaapi.PluginForm):
for (ea, _) in res:
func_features[capa.features.common.MatchedRule(name)].add(ea)
except Exception as e:
logger.error("Failed to match function/basic block rule scope (error: %s)" % e)
logger.error("Failed to match function/basic block rule scope (error: %s)", e)
return False
else:
func_features = {}
@@ -902,7 +906,7 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("User cancelled analysis.")
return False
except Exception as e:
logger.error("Failed to extract function features (error: %s)" % e)
logger.error("Failed to extract function features (error: %s)", e)
return False
if ida_kernwin.user_cancelled():
@@ -928,10 +932,10 @@ class CapaExplorerForm(idaapi.PluginForm):
for (ea, _) in res:
file_features[capa.features.common.MatchedRule(name)].add(ea)
except Exception as e:
logger.error("Failed to match file scope rules (error: %s)" % e)
logger.error("Failed to match file scope rules (error: %s)", e)
return False
except Exception as e:
logger.error("Failed to extract file features (error: %s)" % e)
logger.error("Failed to extract file features (error: %s)", e)
return False
if ida_kernwin.user_cancelled():
@@ -953,7 +957,7 @@ class CapaExplorerForm(idaapi.PluginForm):
"capa rules directory: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.rules_cache))
)
except Exception as e:
logger.error("Failed to render views (error: %s)" % e)
logger.error("Failed to render views (error: %s)", e)
return False
return True

View File

@@ -216,6 +216,8 @@ class CapaExplorerFunctionItem(CapaExplorerDataItem):
@param parent: parent node
@param location: virtual address of function as seen by IDA
"""
# location can be an Address now, so need to get the VA
location = int(location)
super(CapaExplorerFunctionItem, self).__init__(
parent, [self.fmt % idaapi.get_name(location), location_to_hex(location), ""], can_check
)

View File

@@ -6,7 +6,8 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from collections import deque, defaultdict
from typing import List
from collections import deque
import idc
import idaapi
@@ -545,6 +546,14 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
@param location: address of feature
@param display: text to display in plugin UI
"""
# convert to offset from locations: List[Address]
try:
location = int(location)
except TypeError:
# e.g. capa.features.address._NoAddress, global features
return
# special handling for characteristic pending type
if feature["type"] == "characteristic":
if feature[feature["type"]] in ("embedded pe",):

View File

@@ -172,7 +172,7 @@ def find_basic_block_capabilities(
def find_code_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle
ruleset: RuleSet, extractor: FeatureExtractor, fh: FunctionHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given function.
@@ -191,8 +191,8 @@ def find_code_capabilities(
# might be found at different instructions, thats ok.
insn_matches = collections.defaultdict(list) # type: MatchResults
for bb in extractor.get_basic_blocks(f):
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, f, bb)
for bb in extractor.get_basic_blocks(fh):
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in features.items():
function_features[feature].update(vas)
@@ -202,10 +202,10 @@ def find_code_capabilities(
for rule_name, res in imatches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
function_features[feature].add(va)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, f.address)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
return function_matches, bb_matches, insn_matches, len(function_features)

2
rules

Submodule rules updated: 52ff654ca0...d7b5c33414

View File

@@ -130,23 +130,24 @@ def main(argv=None):
for feature, addr in extractor.extract_file_features():
print("file: %s: %s" % (capa.render.verbose.format_address(addr), feature))
functions = extractor.get_functions()
function_handles = extractor.get_functions()
if args.function:
if args.format == "freeze":
functions = tuple(filter(lambda f: f == args.function, functions))
# TODO fix
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
else:
functions = tuple(filter(lambda f: str(f) == args.function, functions))
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
if args.function not in [str(f) for f in functions]:
if args.function not in [str(f) for f in function_handles]:
print("%s not a function" % args.function)
return -1
if len(functions) == 0:
if len(function_handles) == 0:
print("%s not a function", args.function)
return -1
print_features(functions, extractor)
print_features(function_handles, extractor)
return 0

View File

@@ -13,6 +13,7 @@ import binascii
import itertools
import contextlib
import collections
from typing import Set, Dict
from functools import lru_cache
import pytest
@@ -34,7 +35,10 @@ from capa.features.common import (
FORMAT_DOTNET,
Arch,
Format,
Feature,
)
from capa.features.address import Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
CD = os.path.dirname(__file__)
DOTNET_DIR = os.path.join(CD, "data", "dotnet")
@@ -190,10 +194,10 @@ def extract_basic_block_features(extractor, f, bb):
# f may not be hashable (e.g. ida func_t) so cannot @lru_cache this
def extract_instruction_features(extractor, f, bb, insn):
def extract_instruction_features(extractor, fh, bbh, ih) -> Dict[Feature, Set[Address]]:
features = collections.defaultdict(set)
for feature, va in extractor.extract_insn_features(f, bb, insn):
features[feature].add(va)
for feature, addr in extractor.extract_insn_features(fh, bbh, ih):
features[feature].add(addr)
return features
@@ -323,24 +327,24 @@ def sample(request):
return resolve_sample(request.param)
def get_function(extractor, fva):
for f in extractor.get_functions():
if str(f) == fva:
return f
def get_function(extractor, fva: int) -> FunctionHandle:
for fh in extractor.get_functions():
if fh.address == fva:
return fh
raise ValueError("function not found")
def get_basic_block(extractor, f, va):
for bb in extractor.get_basic_blocks(f):
if str(bb) == va:
return bb
def get_basic_block(extractor, fh: FunctionHandle, va: int) -> BBHandle:
for bbh in extractor.get_basic_blocks(fh):
if bbh.address == va:
return bbh
raise ValueError("basic block not found")
def get_instruction(extractor, f, bb, va):
for insn in extractor.get_instructions(f, bb):
if str(insn) == va:
return insn
def get_instruction(extractor, fh: FunctionHandle, bbh: BBHandle, va: int) -> InsnHandle:
for ih in extractor.get_instructions(fh, bbh):
if ih.address == va:
return ih
raise ValueError("instruction not found")
@@ -367,10 +371,10 @@ def resolve_scope(scope):
iva = int(ispec.partition("=")[2], 0x10)
def inner_insn(extractor):
f = get_function(extractor, fva)
bb = get_basic_block(extractor, f, bbva)
insn = get_instruction(extractor, f, bb, iva)
features = extract_instruction_features(extractor, f, bb, insn)
fh = get_function(extractor, fva)
bbh = get_basic_block(extractor, fh, bbva)
ih = get_instruction(extractor, fh, bbh, iva)
features = extract_instruction_features(extractor, fh, bbh, ih)
for k, vs in extract_global_features(extractor).items():
features[k].update(vs)
return features