mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Ghidra insn features (#1670)
* Implement Ghidra Instruction Feature Extraction
This commit is contained in:
@@ -5,9 +5,14 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
from typing import Iterator
|
||||
from typing import Dict, List, Iterator
|
||||
|
||||
import ghidra
|
||||
from ghidra.program.model.lang import OperandType
|
||||
from ghidra.program.model.symbol import SourceType, SymbolType
|
||||
from ghidra.program.model.address import AddressSpace
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
|
||||
|
||||
def fix_byte(b: int) -> bytes:
|
||||
@@ -70,3 +75,185 @@ def get_function_symbols() -> Iterator[ghidra.program.database.function.Function
|
||||
"""yield all non-external function symbols"""
|
||||
|
||||
yield from currentProgram.getFunctionManager().getFunctionsNoStubs(True) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
|
||||
def get_file_imports() -> Dict[int, List[str]]:
|
||||
"""get all import names & addrs"""
|
||||
|
||||
import_dict: Dict[int, List[str]] = {}
|
||||
|
||||
for f in currentProgram.getFunctionManager().getExternalFunctions(): # type: ignore [name-defined] # noqa: F821
|
||||
for r in f.getSymbol().getReferences():
|
||||
if r.getReferenceType().isData():
|
||||
addr = r.getFromAddress().getOffset() # gets pointer to fake external addr
|
||||
|
||||
fstr = f.toString().split("::") # format: MODULE.dll::import / MODULE::Ordinal_*
|
||||
if "Ordinal_" in fstr[1]:
|
||||
fstr[1] = f"#{fstr[1].split('_')[1]}"
|
||||
|
||||
for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]):
|
||||
import_dict.setdefault(addr, []).append(name)
|
||||
|
||||
return import_dict
|
||||
|
||||
|
||||
def get_file_externs() -> Dict[int, List[str]]:
|
||||
"""
|
||||
Gets function names & addresses of statically-linked library functions
|
||||
|
||||
Ghidra's external namespace is mostly reserved for dynamically-linked
|
||||
imports. Statically-linked functions are part of the global namespace.
|
||||
Filtering on the type, source, and namespace of the symbols yield more
|
||||
statically-linked library functions.
|
||||
|
||||
Example: (PMA Lab 16-01.exe_) 7faafc7e4a5c736ebfee6abbbc812d80:0x407490
|
||||
- __aulldiv
|
||||
- Note: See Symbol Table labels
|
||||
"""
|
||||
|
||||
extern_dict: Dict[int, List[str]] = {}
|
||||
|
||||
for sym in currentProgram.getSymbolTable().getAllSymbols(True): # type: ignore [name-defined] # noqa: F821
|
||||
# .isExternal() misses more than this config for the function symbols
|
||||
if sym.getSymbolType() == SymbolType.FUNCTION and sym.getSource() == SourceType.ANALYSIS and sym.isGlobal():
|
||||
name = sym.getName() # starts to resolve names based on Ghidra's FidDB
|
||||
if name.startswith("FID_conflict:"): # format: FID_conflict:<function-name>
|
||||
name = name[13:]
|
||||
extern_dict.setdefault(sym.getAddress().getOffset(), []).append(name)
|
||||
if name.startswith("_"):
|
||||
# some linkers may prefix linked routines with a `_` to avoid name collisions.
|
||||
# extract features for both the mangled and un-mangled representations.
|
||||
# e.g. `_fwrite` -> `fwrite`
|
||||
# see: https://stackoverflow.com/a/2628384/87207
|
||||
extern_dict.setdefault(sym.getAddress().getOffset(), []).append(name[1:])
|
||||
|
||||
return extern_dict
|
||||
|
||||
|
||||
def map_fake_import_addrs() -> Dict[int, List[int]]:
|
||||
"""
|
||||
Map ghidra's fake import entrypoints to their
|
||||
real addresses
|
||||
|
||||
Helps as many Ghidra Scripting API calls end up returning
|
||||
these external (fake) addresses.
|
||||
|
||||
Undocumented but intended Ghidra behavior:
|
||||
- Import entryPoint fields are stored in the 'EXTERNAL:' AddressSpace.
|
||||
'getEntryPoint()' returns the entryPoint field, which is an offset
|
||||
from the beginning of the assigned AddressSpace. In the case of externals,
|
||||
they start from 1 and increment.
|
||||
https://github.com/NationalSecurityAgency/ghidra/blob/26d4bd9104809747c21f2528cab8aba9aef9acd5/Ghidra/Features/Base/src/test.slow/java/ghidra/program/database/function/ExternalFunctionDBTest.java#L90
|
||||
|
||||
Example: (mimikatz.exe_) 5f66b82558ca92e54e77f216ef4c066c:0x473090
|
||||
- 0x473090 -> PTR_CreateServiceW_00473090
|
||||
- 'EXTERNAL:00000025' -> External Address (ghidra.program.model.address.SpecialAddress)
|
||||
"""
|
||||
fake_dict: Dict[int, List[int]] = {}
|
||||
|
||||
for f in currentProgram.getFunctionManager().getExternalFunctions(): # type: ignore [name-defined] # noqa: F821
|
||||
for r in f.getSymbol().getReferences():
|
||||
if r.getReferenceType().isData():
|
||||
fake_dict.setdefault(f.getEntryPoint().getOffset(), []).append(r.getFromAddress().getOffset())
|
||||
|
||||
return fake_dict
|
||||
|
||||
|
||||
def get_external_locs() -> List[int]:
|
||||
"""
|
||||
Helps to discern external offsets from regular bytes when extracting
|
||||
data.
|
||||
|
||||
Ghidra behavior:
|
||||
- Offsets that point to specific sections of external programs
|
||||
i.e. library code.
|
||||
- Stored in data, and pointed to by an absolute address
|
||||
https://github.com/NationalSecurityAgency/ghidra/blob/26d4bd9104809747c21f2528cab8aba9aef9acd5/Ghidra/Framework/SoftwareModeling/src/main/java/ghidra/program/model/symbol/ExternalLocation.java#L25-30
|
||||
|
||||
Example: (mimikatz.exe_) 5f66b82558ca92e54e77f216ef4c066c:0x473090
|
||||
- 0x473090 -> PTR_CreateServiceW_00473090
|
||||
- 0x000b34EC -> External Location
|
||||
"""
|
||||
locs = []
|
||||
for fh in currentProgram.getFunctionManager().getExternalFunctions(): # type: ignore [name-defined] # noqa: F821
|
||||
external_loc = fh.getExternalLocation().getAddress()
|
||||
if external_loc:
|
||||
locs.append(external_loc)
|
||||
return locs
|
||||
|
||||
|
||||
def check_addr_for_api(
|
||||
addr: ghidra.program.model.address.Address,
|
||||
fakes: Dict[int, List[int]],
|
||||
imports: Dict[int, List[str]],
|
||||
externs: Dict[int, List[str]],
|
||||
ex_locs: List[int],
|
||||
) -> bool:
|
||||
offset = addr.getOffset()
|
||||
|
||||
fake = fakes.get(offset)
|
||||
if fake:
|
||||
return True
|
||||
|
||||
imp = imports.get(offset)
|
||||
if imp:
|
||||
return True
|
||||
|
||||
extern = externs.get(offset)
|
||||
if extern:
|
||||
return True
|
||||
|
||||
if addr in ex_locs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_call_or_jmp(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
return any(mnem in insn.getMnemonicString() for mnem in ["CALL", "J"]) # JMP, JNE, JNZ, etc
|
||||
|
||||
|
||||
def is_sp_modified(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) == OperandType.REGISTER:
|
||||
return "SP" in insn.getRegister(i).getName() and insn.getOperandRefType(i).isWrite()
|
||||
return False
|
||||
|
||||
|
||||
def is_stack_referenced(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
"""generic catch-all for stack references"""
|
||||
return any(ref.isStackReference() for ref in insn.getReferencesFrom())
|
||||
|
||||
|
||||
def is_zxor(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
# assume XOR insn
|
||||
# XOR's against the same operand zero out
|
||||
ops = []
|
||||
operands = []
|
||||
for i in range(insn.getNumOperands()):
|
||||
ops.append(insn.getOpObjects(i))
|
||||
|
||||
# Operands stored in a 2D array
|
||||
for j in range(len(ops)):
|
||||
for k in range(len(ops[j])):
|
||||
operands.append(ops[j][k])
|
||||
|
||||
return all(n == operands[0] for n in operands)
|
||||
|
||||
|
||||
def dereference_ptr(insn: ghidra.program.database.code.InstructionDB):
|
||||
to_deref = insn.getAddress(0)
|
||||
dat = getDataContaining(to_deref) # type: ignore [name-defined] # noqa: F821
|
||||
if not dat:
|
||||
return to_deref
|
||||
if dat.isDefined() and dat.isPointer():
|
||||
addr = dat.getValue()
|
||||
# now we need to check the addr space to see if it is truly resolvable
|
||||
# ghidra sometimes likes to hand us direct RAM addrs, which typically point
|
||||
# to api calls that we can't actually resolve as such
|
||||
if addr.getAddressSpace().getType() == AddressSpace.TYPE_RAM:
|
||||
return to_deref
|
||||
else:
|
||||
return addr
|
||||
else:
|
||||
return to_deref
|
||||
|
||||
467
capa/features/extractors/ghidra/insn.py
Normal file
467
capa/features/extractors/ghidra/insn.py
Normal file
@@ -0,0 +1,467 @@
|
||||
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
from typing import Any, Dict, Tuple, Iterator
|
||||
|
||||
import ghidra
|
||||
from ghidra.program.model.lang import OperandType
|
||||
from ghidra.program.model.block import BasicBlockModel, SimpleBlockModel, SimpleBlockIterator
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
import capa.features.extractors.ghidra.helpers
|
||||
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Bytes, String, Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
|
||||
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
|
||||
# significantly cut down on runtime by caching api info
|
||||
imports = capa.features.extractors.ghidra.helpers.get_file_imports()
|
||||
externs = capa.features.extractors.ghidra.helpers.get_file_externs()
|
||||
mapped_fake_addrs = capa.features.extractors.ghidra.helpers.map_fake_import_addrs()
|
||||
external_locs = capa.features.extractors.ghidra.helpers.get_external_locs()
|
||||
|
||||
|
||||
def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
|
||||
"""check instruction for API call"""
|
||||
info = ()
|
||||
|
||||
# assume only CALLs or JMPs are passed
|
||||
ref_type = insn.getOperandType(0)
|
||||
addr_data = OperandType.ADDRESS | OperandType.DATA # needs dereferencing
|
||||
|
||||
if OperandType.isRegister(ref_type):
|
||||
if OperandType.isAddress(ref_type):
|
||||
# If it's an address in a register, check the mapped fake addrs
|
||||
# since they're dereferenced to their fake addrs
|
||||
op_ref = insn.getAddress(0).getOffset()
|
||||
ref = mapped_fake_addrs.get(op_ref) # obtain the real addr
|
||||
if not ref:
|
||||
return
|
||||
else:
|
||||
return
|
||||
elif ref_type == addr_data:
|
||||
# we must dereference and check if the addr is a pointer to an api function
|
||||
addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
|
||||
if addr_ref != insn.getAddress(0):
|
||||
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
addr_ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
return
|
||||
ref = addr_ref.getOffset()
|
||||
else:
|
||||
# could not dereference
|
||||
return
|
||||
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
|
||||
return # cannot resolve dynamics statically
|
||||
elif OperandType.isIndirect(ref_type):
|
||||
return # cannot resolve the indirection statically
|
||||
else:
|
||||
# pure address does not need to get dereferenced/ handled
|
||||
addr_ref = insn.getAddress(0)
|
||||
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
addr_ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
return
|
||||
ref = addr_ref.getOffset()
|
||||
|
||||
if isinstance(ref, list): # ref from REG | ADDR
|
||||
for r in ref:
|
||||
info = funcs.get(r) # type: ignore
|
||||
if info:
|
||||
yield info
|
||||
else:
|
||||
info = funcs.get(ref) # type: ignore
|
||||
if info:
|
||||
yield info
|
||||
|
||||
|
||||
def extract_insn_api_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
|
||||
return
|
||||
|
||||
# check calls to imported functions
|
||||
for api in check_for_api_call(insn, imports):
|
||||
for imp in api:
|
||||
yield API(imp), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
# check calls to extern functions
|
||||
for api in check_for_api_call(insn, externs):
|
||||
for ext in api:
|
||||
yield API(ext), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_number_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse instruction number features
|
||||
example:
|
||||
push 3136B0h ; dwControlCode
|
||||
"""
|
||||
if insn.getMnemonicString().startswith("RET"):
|
||||
# skip things like:
|
||||
# .text:0042250E retn 8
|
||||
return
|
||||
|
||||
if capa.features.extractors.ghidra.helpers.is_sp_modified(insn):
|
||||
# skip things like:
|
||||
# .text:00401145 add esp, 0Ch
|
||||
return
|
||||
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) != OperandType.SCALAR:
|
||||
# skip things like:
|
||||
# references, void types
|
||||
continue
|
||||
|
||||
const = insn.getScalar(i).getValue()
|
||||
addr = AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
yield Number(const), addr
|
||||
yield OperandNumber(i, const), addr
|
||||
|
||||
if insn.getMnemonicString().startswith("ADD") and 0 < const < MAX_STRUCTURE_SIZE:
|
||||
# for pattern like:
|
||||
#
|
||||
# add eax, 0x10
|
||||
#
|
||||
# assume 0x10 is also an offset (imagine eax is a pointer).
|
||||
yield Offset(const), addr
|
||||
yield OperandOffset(i, const), addr
|
||||
|
||||
|
||||
def extract_insn_offset_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse instruction structure offset features
|
||||
|
||||
example:
|
||||
.text:0040112F cmp [esi+4], ebx
|
||||
"""
|
||||
|
||||
# ignore any stack references
|
||||
if not capa.features.extractors.ghidra.helpers.is_stack_referenced(insn):
|
||||
# Ghidra stores operands in 2D arrays if they contain offsets
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) == OperandType.DYNAMIC: # e.g. [esi + 4]
|
||||
# manual extraction, since the default api calls only work on the 1st dimension of the array
|
||||
op_objs = insn.getOpObjects(i)
|
||||
if isinstance(op_objs[-1], ghidra.program.model.scalar.Scalar):
|
||||
op_off = op_objs[-1].getValue()
|
||||
yield Offset(op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
yield OperandOffset(i, op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_bytes_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse referenced byte sequences
|
||||
example:
|
||||
push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
|
||||
if capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
|
||||
return
|
||||
|
||||
ref = insn.getAddress() # init to insn addr
|
||||
for i in range(insn.getNumOperands()):
|
||||
if OperandType.isScalarAsAddress(insn.getOperandType(i)):
|
||||
ref = insn.getAddress(i) # pulls pointer if there is one
|
||||
|
||||
if ref != insn.getAddress(): # bail out if there's no pointer
|
||||
ghidra_dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if (
|
||||
ghidra_dat and not ghidra_dat.hasStringValue() and not ghidra_dat.isPointer()
|
||||
): # avoid if the data itself is a pointer
|
||||
extracted_bytes = capa.features.extractors.ghidra.helpers.get_bytes(ref, MAX_BYTES_FEATURE_SIZE)
|
||||
if extracted_bytes and not capa.features.extractors.helpers.all_zeros(extracted_bytes):
|
||||
# don't extract byte features for obvious strings
|
||||
yield Bytes(extracted_bytes), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_string_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse instruction string features
|
||||
|
||||
example:
|
||||
push offset aAcr ; "ACR > "
|
||||
"""
|
||||
ref = insn.getAddress()
|
||||
for i in range(insn.getNumOperands()):
|
||||
if OperandType.isScalarAsAddress(insn.getOperandType(i)):
|
||||
ref = insn.getAddress(i)
|
||||
|
||||
if ref != insn.getAddress():
|
||||
ghidra_dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if ghidra_dat and ghidra_dat.hasStringValue():
|
||||
yield String(ghidra_dat.getValue()), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse instruction mnemonic features"""
|
||||
yield Mnemonic(insn.getMnemonicString().lower()), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_obfs_call_plus_5_characteristic_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse call $+5 instruction from the given instruction.
|
||||
"""
|
||||
|
||||
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
|
||||
return
|
||||
|
||||
code_ref = OperandType.ADDRESS | OperandType.CODE
|
||||
ref = insn.getAddress()
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) == code_ref:
|
||||
ref = insn.getAddress(i)
|
||||
|
||||
if insn.getAddress().add(5) == ref:
|
||||
yield Characteristic("call $+5"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse instruction fs or gs access"""
|
||||
insn_str = insn.toString()
|
||||
|
||||
if "FS:" in insn_str:
|
||||
yield Characteristic("fs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
if "GS:" in insn_str:
|
||||
yield Characteristic("gs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse instruction peb access
|
||||
|
||||
fs:[0x30] on x86, gs:[0x60] on x64
|
||||
|
||||
"""
|
||||
insn_str = insn.toString()
|
||||
if insn_str.startswith(("PUSH", "MOV")):
|
||||
if "FS:[0x30]" in insn_str or "GS:[0x60]" in insn_str:
|
||||
yield Characteristic("peb access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""inspect the instruction for a CALL or JMP that crosses section boundaries"""
|
||||
|
||||
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
|
||||
return
|
||||
|
||||
# OperandType to dereference
|
||||
addr_data = OperandType.ADDRESS | OperandType.DATA
|
||||
|
||||
ref_type = insn.getOperandType(0)
|
||||
|
||||
# both OperandType flags must be present
|
||||
# bail on REGISTER alone
|
||||
if OperandType.isRegister(ref_type):
|
||||
if OperandType.isAddress(ref_type):
|
||||
ref = insn.getAddress(0) # Ghidra dereferences REG | ADDR
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
return
|
||||
else:
|
||||
return
|
||||
elif ref_type == addr_data:
|
||||
# we must dereference and check if the addr is a pointer to an api function
|
||||
ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
|
||||
if ref != insn.getAddress(0):
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
return
|
||||
else:
|
||||
# could not dereference
|
||||
return
|
||||
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
|
||||
return # cannot resolve dynamics statically
|
||||
elif OperandType.isIndirect(ref_type):
|
||||
return # cannot resolve the indirection statically
|
||||
else:
|
||||
# pure address does not need to get dereferenced/ handled
|
||||
ref = insn.getAddress(0)
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
return
|
||||
|
||||
this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821
|
||||
ref_block = getMemoryBlock(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if ref_block != this_mem_block:
|
||||
yield Characteristic("cross section flow"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_function_calls_from(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract functions calls from features
|
||||
|
||||
most relevant at the function scope, however, its most efficient to extract at the instruction scope
|
||||
"""
|
||||
|
||||
if insn.getMnemonicString().startswith("CALL"):
|
||||
# This method of "dereferencing" addresses/ pointers
|
||||
# is not as robust as methods in other functions,
|
||||
# but works just fine for this one
|
||||
reference = 0
|
||||
for ref in insn.getReferencesFrom():
|
||||
addr = ref.getToAddress()
|
||||
|
||||
# avoid returning fake addrs
|
||||
if not addr.isExternalAddress():
|
||||
reference = addr.getOffset()
|
||||
|
||||
# if a reference is < 0, then ghidra pulled an offset from a DYNAMIC | ADDR (usually a stackvar)
|
||||
# these cannot be resolved to actual addrs
|
||||
if reference > 0:
|
||||
yield Characteristic("calls from"), AbsoluteVirtualAddress(reference)
|
||||
|
||||
|
||||
def extract_function_indirect_call_characteristic_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
|
||||
most relevant at the function or basic block scope;
|
||||
however, its most efficient to extract at the instruction scope
|
||||
"""
|
||||
if insn.getMnemonicString().startswith("CALL"):
|
||||
if OperandType.isIndirect(insn.getOperandType(0)):
|
||||
yield Characteristic("indirect call"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def check_nzxor_security_cookie_delta(
|
||||
fh: ghidra.program.database.function.FunctionDB, insn: ghidra.program.database.code.InstructionDB
|
||||
):
|
||||
"""Get the function containing the insn
|
||||
Get the last block of the function that contains the insn
|
||||
|
||||
Check the bb containing the insn
|
||||
Check the last bb of the function containing the insn
|
||||
"""
|
||||
|
||||
model = SimpleBlockModel(currentProgram) # type: ignore [name-defined] # noqa: F821
|
||||
insn_addr = insn.getAddress()
|
||||
func_asv = fh.getBody()
|
||||
first_addr = func_asv.getMinAddress()
|
||||
last_addr = func_asv.getMaxAddress()
|
||||
|
||||
if model.getFirstCodeBlockContaining(first_addr, monitor) == model.getFirstCodeBlockContaining(last_addr, monitor): # type: ignore [name-defined] # noqa: F821
|
||||
if insn_addr < first_addr.add(SECURITY_COOKIE_BYTES_DELTA):
|
||||
return True
|
||||
else:
|
||||
return insn_addr > last_addr.add(SECURITY_COOKIE_BYTES_DELTA * -1)
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
if "XOR" not in insn.getMnemonicString():
|
||||
return
|
||||
if capa.features.extractors.ghidra.helpers.is_stack_referenced(insn):
|
||||
return
|
||||
if capa.features.extractors.ghidra.helpers.is_zxor(insn):
|
||||
return
|
||||
if check_nzxor_security_cookie_delta(fh, insn):
|
||||
return
|
||||
yield Characteristic("nzxor"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
|
||||
|
||||
|
||||
def extract_features(
|
||||
fh: ghidra.program.database.function.FunctionDB,
|
||||
bb: ghidra.program.model.block.CodeBlock,
|
||||
insn: ghidra.program.database.code.InstructionDB,
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
for feature, addr in insn_handler(fh, bb, insn):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
extract_insn_number_features,
|
||||
extract_insn_bytes_features,
|
||||
extract_insn_string_features,
|
||||
extract_insn_offset_features,
|
||||
extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
extract_insn_obfs_call_plus_5_characteristic_features,
|
||||
extract_insn_peb_access_characteristic_features,
|
||||
extract_insn_cross_section_cflow,
|
||||
extract_insn_segment_access_features,
|
||||
extract_function_calls_from,
|
||||
extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
""" """
|
||||
listing = currentProgram.getListing() # type: ignore [name-defined] # noqa: F821
|
||||
features = []
|
||||
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
|
||||
for bab in SimpleBlockIterator(BasicBlockModel(currentProgram), fhandle.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
|
||||
for insnh in listing.getInstructions(bab, True):
|
||||
features.extend(list(extract_features(fhandle, bab, insnh)))
|
||||
|
||||
import pprint
|
||||
|
||||
pprint.pprint(features) # noqa: T203
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user