Ghidra: Implement GhidraFeatureExtractor (#1681)

* Implement GhidraFeatureExtractor & repo changes
This commit is contained in:
Colton Gabertan
2023-08-16 15:58:47 -07:00
committed by GitHub
parent 2de6dc7cb8
commit b3cf1129e3
13 changed files with 362 additions and 143 deletions

View File

@@ -12,15 +12,16 @@ from typing import Tuple, Iterator
import ghidra
from ghidra.program.model.lang import OperandType
from ghidra.program.model.block import BasicBlockModel, SimpleBlockIterator
import capa.features.extractors.ghidra.helpers
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.address import Address
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
listing = currentProgram.getListing() # type: ignore [name-defined] # noqa: F821
currentProgram = currentProgram() # type: ignore # noqa: F821
listing = currentProgram.getListing() # type: ignore # noqa: F821
def get_printable_len(op: ghidra.program.model.scalar.Scalar) -> int:
@@ -98,16 +99,20 @@ def _bb_has_tight_loop(bb: ghidra.program.model.block.CodeBlock):
return False
def extract_bb_stackstring(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]:
def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract stackstring indicators from basic block"""
bb: ghidra.program.model.block.CodeBlock = bbh.inner
if bb_contains_stackstring(bb):
yield Characteristic("stack string"), AbsoluteVirtualAddress(bb.getMinAddress().getOffset())
yield Characteristic("stack string"), bbh.address
def extract_bb_tight_loop(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]:
def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""check basic block for tight loop indicators"""
bb: ghidra.program.model.block.CodeBlock = bbh.inner
if _bb_has_tight_loop(bb):
yield Characteristic("tight loop"), AbsoluteVirtualAddress(bb.getMinAddress().getOffset())
yield Characteristic("tight loop"), bbh.address
BASIC_BLOCK_HANDLERS = (
@@ -116,7 +121,7 @@ BASIC_BLOCK_HANDLERS = (
)
def extract_features(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]:
def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract features from the given basic block.
@@ -126,17 +131,17 @@ def extract_features(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple
yields:
Tuple[Feature, int]: the features and their location found in this basic block.
"""
yield BasicBlock(), AbsoluteVirtualAddress(bb.getMinAddress().getOffset())
yield BasicBlock(), bbh.address
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, addr in bb_handler(bb):
for feature, addr in bb_handler(fh, bbh):
yield feature, addr
def main():
features = []
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
for bb in SimpleBlockIterator(BasicBlockModel(currentProgram), fhandle.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
features.extend(list(extract_features(bb)))
for fh in capa.features.extractors.ghidra.helpers.get_function_symbols():
for bbh in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
features.extend(list(extract_features(fh, bbh)))
import pprint

View File

@@ -5,16 +5,19 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple
import ghidra
from typing import List, Tuple, Iterator
import capa.features.extractors.ghidra.file
import capa.features.extractors.ghidra.insn
import capa.features.extractors.ghidra.global_
import capa.features.extractors.ghidra.function
import capa.features.extractors.ghidra.basicblock
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
currentProgram: ghidra.program.database.ProgramDB
currentProgram = currentProgram() # type: ignore # noqa: F821
currentAddress = currentAddress() # type: ignore # noqa: F821
class GhidraFeatureExtractor(FeatureExtractor):
@@ -33,3 +36,33 @@ class GhidraFeatureExtractor(FeatureExtractor):
def extract_file_features(self):
yield from capa.features.extractors.ghidra.file.extract_features()
def get_functions(self) -> Iterator[FunctionHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_function_symbols()
@staticmethod
def get_function(addr: int) -> FunctionHandle:
get_addr = currentAddress.getAddress(hex(addr)) # type: ignore [name-defined] # noqa: F821
func = getFunctionContaining(get_addr) # type: ignore [name-defined] # noqa: F821
return FunctionHandle(address=AbsoluteVirtualAddress(func.getAddress().getOffset()), inner=func)
def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.ghidra.function.extract_features(fh)
def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_function_blocks(fh)
def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.ghidra.basicblock.extract_features(fh, bbh)
def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_insn_in_range(bbh)
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
yield from capa.features.extractors.ghidra.insn.extract_features(fh, bbh, ih)

View File

@@ -19,6 +19,7 @@ from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress
currentProgram = currentProgram() # type: ignore # noqa: F821
MAX_OFFSET_PE_AFTER_MZ = 0x200

View File

@@ -14,19 +14,25 @@ import capa.features.extractors.ghidra.helpers
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle
currentProgram = currentProgram() # type: ignore # noqa: F821
monitor = monitor() # type: ignore # noqa: F821
def extract_function_calls_to(fh: ghidra.program.database.function.FunctionDB):
def extract_function_calls_to(fh: FunctionHandle):
"""extract callers to a function"""
for ref in fh.getSymbol().getReferences():
f: ghidra.program.database.function.FunctionDB = fh.inner
for ref in f.getSymbol().getReferences():
if ref.getReferenceType().isCall():
yield Characteristic("calls to"), AbsoluteVirtualAddress(ref.getFromAddress().getOffset())
def extract_function_loop(fh: ghidra.program.database.function.FunctionDB):
edges = []
def extract_function_loop(fh: FunctionHandle):
f: ghidra.program.database.function.FunctionDB = fh.inner
for block in SimpleBlockIterator(BasicBlockModel(currentProgram), fh.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
edges = []
for block in SimpleBlockIterator(BasicBlockModel(currentProgram), f.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
dests = block.getDestinations(monitor) # type: ignore [name-defined] # noqa: F821
s_addrs = block.getStartAddresses()
@@ -35,16 +41,18 @@ def extract_function_loop(fh: ghidra.program.database.function.FunctionDB):
edges.append((addr.getOffset(), dests.next().getDestinationAddress().getOffset()))
if loops.has_loop(edges):
yield Characteristic("loop"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset())
yield Characteristic("loop"), AbsoluteVirtualAddress(f.getEntryPoint().getOffset())
def extract_recursive_call(fh: ghidra.program.database.function.FunctionDB):
for f in fh.getCalledFunctions(monitor): # type: ignore [name-defined] # noqa: F821
if f.getEntryPoint().getOffset() == fh.getEntryPoint().getOffset():
yield Characteristic("recursive call"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset())
def extract_recursive_call(fh: FunctionHandle):
f: ghidra.program.database.function.FunctionDB = fh.inner
for func in f.getCalledFunctions(monitor): # type: ignore [name-defined] # noqa: F821
if func.getEntryPoint().getOffset() == f.getEntryPoint().getOffset():
yield Characteristic("recursive call"), AbsoluteVirtualAddress(f.getEntryPoint().getOffset())
def extract_features(fh: ghidra.program.database.function.FunctionDB) -> Iterator[Tuple[Feature, Address]]:
def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
for func_handler in FUNCTION_HANDLERS:
for feature, addr in func_handler(fh):
yield feature, addr

View File

@@ -15,6 +15,7 @@ from capa.features.common import OS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Fe
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
currentProgram = currentProgram() # type: ignore # noqa: F821
def extract_os() -> Iterator[Tuple[Feature, Address]]:

View File

@@ -9,10 +9,16 @@ from typing import Dict, List, Iterator
import ghidra
from ghidra.program.model.lang import OperandType
from ghidra.program.model.block import BasicBlockModel, SimpleBlockIterator
from ghidra.program.model.symbol import SourceType, SymbolType
from ghidra.program.model.address import AddressSpace
import capa.features.extractors.helpers
from capa.features.address import AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
monitor = monitor() # type: ignore # noqa: F821
currentProgram = currentProgram() # type: ignore # noqa: F821
def fix_byte(b: int) -> bytes:
@@ -71,10 +77,29 @@ def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes:
return bytez
def get_function_symbols() -> Iterator[ghidra.program.database.function.FunctionDB]:
def get_function_symbols() -> Iterator[FunctionHandle]:
"""yield all non-external function symbols"""
yield from currentProgram.getFunctionManager().getFunctionsNoStubs(True) # type: ignore [name-defined] # noqa: F821
for fhandle in currentProgram.getFunctionManager().getFunctionsNoStubs(True): # type: ignore [name-defined] # noqa: F821
yield FunctionHandle(address=AbsoluteVirtualAddress(fhandle.getEntryPoint().getOffset()), inner=fhandle)
def get_function_blocks(fh: FunctionHandle) -> Iterator[BBHandle]:
"""yield BBHandle for each bb in a given function"""
func: ghidra.program.database.function.FunctionDB = fh.inner
for bb in SimpleBlockIterator(BasicBlockModel(currentProgram), func.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
yield BBHandle(address=AbsoluteVirtualAddress(bb.getMinAddress().getOffset()), inner=bb)
def get_insn_in_range(bbh: BBHandle) -> Iterator[InsnHandle]:
"""yield InshHandle for each insn in a given basicblock"""
bb: ghidra.program.model.block.CodeBlock = bbh.inner
for addr in bb.getAddresses(True):
insn = getInstructionAt(addr) # type: ignore [name-defined] # noqa: F821
if insn:
yield InsnHandle(address=AbsoluteVirtualAddress(insn.getAddress().getOffset()), inner=insn)
def get_file_imports() -> Dict[int, List[str]]:

View File

@@ -9,17 +9,20 @@ from typing import Any, Dict, Tuple, Iterator
import ghidra
from ghidra.program.model.lang import OperandType
from ghidra.program.model.block import BasicBlockModel, SimpleBlockModel, SimpleBlockIterator
from ghidra.program.model.block import SimpleBlockModel
import capa.features.extractors.helpers
import capa.features.extractors.ghidra.helpers
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Bytes, String, Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
currentProgram = currentProgram() # type: ignore # noqa: F821
monitor = monitor() # type: ignore # noqa: F821
# significantly cut down on runtime by caching api info
imports = capa.features.extractors.ghidra.helpers.get_file_imports()
@@ -49,15 +52,11 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
elif ref_type == addr_data:
# we must dereference and check if the addr is a pointer to an api function
addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
if addr_ref != insn.getAddress(0):
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
addr_ref, mapped_fake_addrs, imports, externs, external_locs
):
return
ref = addr_ref.getOffset()
else:
# could not dereference
return
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
return # cannot resolve dynamics statically
elif OperandType.isIndirect(ref_type):
@@ -82,35 +81,31 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
yield info
def extract_insn_api_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_api_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
insn: ghidra.program.database.code.InstructionDB = ih.inner
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
# check calls to imported functions
for api in check_for_api_call(insn, imports):
for imp in api:
yield API(imp), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield API(imp), ih.address
# check calls to extern functions
for api in check_for_api_call(insn, externs):
for ext in api:
yield API(ext), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield API(ext), ih.address
def extract_insn_number_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_number_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction number features
example:
push 3136B0h ; dwControlCode
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if insn.getMnemonicString().startswith("RET"):
# skip things like:
# .text:0042250E retn 8
@@ -128,7 +123,7 @@ def extract_insn_number_features(
continue
const = insn.getScalar(i).getValue()
addr = AbsoluteVirtualAddress(insn.getAddress().getOffset())
addr = ih.address
yield Number(const), addr
yield OperandNumber(i, const), addr
@@ -143,17 +138,14 @@ def extract_insn_number_features(
yield OperandOffset(i, const), addr
def extract_insn_offset_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_offset_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction structure offset features
example:
.text:0040112F cmp [esi+4], ebx
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
# ignore any stack references
if not capa.features.extractors.ghidra.helpers.is_stack_referenced(insn):
@@ -164,20 +156,17 @@ def extract_insn_offset_features(
op_objs = insn.getOpObjects(i)
if isinstance(op_objs[-1], ghidra.program.model.scalar.Scalar):
op_off = op_objs[-1].getValue()
yield Offset(op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield OperandOffset(i, op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Offset(op_off), ih.address
yield OperandOffset(i, op_off), ih.address
def extract_insn_bytes_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_bytes_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse referenced byte sequences
example:
push offset iid_004118d4_IShellLinkA ; riid
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
@@ -195,20 +184,18 @@ def extract_insn_bytes_features(
extracted_bytes = capa.features.extractors.ghidra.helpers.get_bytes(ref, MAX_BYTES_FEATURE_SIZE)
if extracted_bytes and not capa.features.extractors.helpers.all_zeros(extracted_bytes):
# don't extract byte features for obvious strings
yield Bytes(extracted_bytes), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Bytes(extracted_bytes), ih.address
def extract_insn_string_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
) -> Iterator[Tuple[Feature, Address]]:
def extract_insn_string_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse instruction string features
example:
push offset aAcr ; "ACR > "
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
ref = insn.getAddress()
for i in range(insn.getNumOperands()):
if OperandType.isScalarAsAddress(insn.getOperandType(i)):
@@ -217,26 +204,25 @@ def extract_insn_string_features(
if ref != insn.getAddress():
ghidra_dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821
if ghidra_dat and ghidra_dat.hasStringValue():
yield String(ghidra_dat.getValue()), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield String(ghidra_dat.getValue()), ih.address
def extract_insn_mnemonic_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction mnemonic features"""
yield Mnemonic(insn.getMnemonicString().lower()), AbsoluteVirtualAddress(insn.getAddress().getOffset())
insn: ghidra.program.database.code.InstructionDB = ih.inner
yield Mnemonic(insn.getMnemonicString().lower()), ih.address
def extract_insn_obfs_call_plus_5_characteristic_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse call $+5 instruction from the given instruction.
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
@@ -248,46 +234,45 @@ def extract_insn_obfs_call_plus_5_characteristic_features(
ref = insn.getAddress(i)
if insn.getAddress().add(5) == ref:
yield Characteristic("call $+5"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("call $+5"), ih.address
def extract_insn_segment_access_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction fs or gs access"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
insn_str = insn.toString()
if "FS:" in insn_str:
yield Characteristic("fs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("fs access"), ih.address
if "GS:" in insn_str:
yield Characteristic("gs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("gs access"), ih.address
def extract_insn_peb_access_characteristic_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""parse instruction peb access
fs:[0x30] on x86, gs:[0x60] on x64
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
insn_str = insn.toString()
if insn_str.startswith(("PUSH", "MOV")):
if "FS:[0x30]" in insn_str or "GS:[0x60]" in insn_str:
yield Characteristic("peb access"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("peb access"), ih.address
def extract_insn_cross_section_cflow(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""inspect the instruction for a CALL or JMP that crosses section boundaries"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
@@ -311,14 +296,10 @@ def extract_insn_cross_section_cflow(
elif ref_type == addr_data:
# we must dereference and check if the addr is a pointer to an api function
ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
if ref != insn.getAddress(0):
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
ref, mapped_fake_addrs, imports, externs, external_locs
):
return
else:
# could not dereference
return
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
return # cannot resolve dynamics statically
elif OperandType.isIndirect(ref_type):
@@ -334,18 +315,19 @@ def extract_insn_cross_section_cflow(
this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821
ref_block = getMemoryBlock(ref) # type: ignore [name-defined] # noqa: F821
if ref_block != this_mem_block:
yield Characteristic("cross section flow"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("cross section flow"), ih.address
def extract_function_calls_from(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle,
bb: BBHandle,
ih: InsnHandle,
) -> Iterator[Tuple[Feature, Address]]:
"""extract functions calls from features
most relevant at the function scope, however, its most efficient to extract at the instruction scope
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if insn.getMnemonicString().startswith("CALL"):
# This method of "dereferencing" addresses/ pointers
@@ -366,9 +348,9 @@ def extract_function_calls_from(
def extract_function_indirect_call_characteristic_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle,
bb: BBHandle,
ih: InsnHandle,
) -> Iterator[Tuple[Feature, Address]]:
"""extract indirect function calls (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
@@ -376,9 +358,11 @@ def extract_function_indirect_call_characteristic_features(
most relevant at the function or basic block scope;
however, its most efficient to extract at the instruction scope
"""
insn: ghidra.program.database.code.InstructionDB = ih.inner
if insn.getMnemonicString().startswith("CALL"):
if OperandType.isIndirect(insn.getOperandType(0)):
yield Characteristic("indirect call"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("indirect call"), ih.address
def check_nzxor_security_cookie_delta(
@@ -407,25 +391,28 @@ def check_nzxor_security_cookie_delta(
def extract_insn_nzxor_characteristic_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle,
bb: BBHandle,
ih: InsnHandle,
) -> Iterator[Tuple[Feature, Address]]:
f: ghidra.program.database.function.FunctionDB = fh.inner
insn: ghidra.program.database.code.InstructionDB = ih.inner
if "XOR" not in insn.getMnemonicString():
return
if capa.features.extractors.ghidra.helpers.is_stack_referenced(insn):
return
if capa.features.extractors.ghidra.helpers.is_zxor(insn):
return
if check_nzxor_security_cookie_delta(fh, insn):
if check_nzxor_security_cookie_delta(f, insn):
return
yield Characteristic("nzxor"), AbsoluteVirtualAddress(insn.getAddress().getOffset())
yield Characteristic("nzxor"), ih.address
def extract_features(
fh: ghidra.program.database.function.FunctionDB,
bb: ghidra.program.model.block.CodeBlock,
insn: ghidra.program.database.code.InstructionDB,
fh: FunctionHandle,
bb: BBHandle,
insn: InsnHandle,
) -> Iterator[Tuple[Feature, Address]]:
for insn_handler in INSTRUCTION_HANDLERS:
for feature, addr in insn_handler(fh, bb, insn):
@@ -451,12 +438,11 @@ INSTRUCTION_HANDLERS = (
def main():
""" """
listing = currentProgram.getListing() # type: ignore [name-defined] # noqa: F821
features = []
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
for bab in SimpleBlockIterator(BasicBlockModel(currentProgram), fhandle.getBody(), monitor): # type: ignore [name-defined] # noqa: F821
for insnh in listing.getInstructions(bab, True):
features.extend(list(extract_features(fhandle, bab, insnh)))
for fh in capa.features.extractors.ghidra.helpers.get_function_symbols():
for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb):
features.extend(list(extract_features(fh, bb, insn)))
import pprint

0
capa/ghidra/__init__.py Normal file
View File

156
capa/ghidra/helpers.py Normal file
View File

@@ -0,0 +1,156 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import datetime
import contextlib
from typing import List
from pathlib import Path
import capa
import capa.version
import capa.features.common
import capa.features.freeze
import capa.render.result_document as rdoc
import capa.features.extractors.ghidra.helpers
logger = logging.getLogger("capa")
currentProgram = currentProgram() # type: ignore # noqa: F821
currentAddress = currentAddress() # type: ignore # noqa: F821
# file type as returned by Ghidra
SUPPORTED_FILE_TYPES = ("Executable and Linking Format (ELF)", "Portable Executable (PE)", "Raw Binary")
class GHIDRAIO:
"""
An object that acts as a file-like object,
using bytes from the current Ghidra listing.
"""
def __init__(self):
super().__init__()
self.offset = 0
def seek(self, offset, whence=0):
assert whence == 0
self.offset = offset
def read(self, size):
try:
# ghidra.program.model.address.Address has no public constructor,
# so we have to use the exposed currentAddress object for its
# member function .getAddress()
ea = currentAddress.getAddress(hex(self.offset)) # type: ignore [name-defined] # noqa: F821
except RuntimeError: # AddressFormatException to Ghidra
logger.debug("cannot read 0x%x bytes at 0x%x (ea: BADADDR)", size, self.offset)
return b""
logger.debug("reading 0x%x bytes at 0x%x (ea: 0x%x)", size, self.offset, ea.getOffset())
# returns bytes or b""
return capa.features.extractors.ghidra.helpers.get_bytes(ea, size)
def close(self):
return
def is_supported_ghidra_version():
version = float(getGhidraVersion()[:4]) # type: ignore [name-defined] # noqa: F821
if version < 10.2:
warning_msg = "capa does not support this Ghidra version"
logger.warning(warning_msg)
logger.warning("Your Ghidra version is: %s. Supported versions are: Ghidra >= 10.2", version)
return False
return True
def is_running_headless():
return isRunningHeadless() # type: ignore [name-defined] # noqa: F821
def is_supported_file_type():
file_info = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821
if file_info.filetype not in SUPPORTED_FILE_TYPES:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a supported file type.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing PE, ELF, or binary files containing x86 (32- and 64-bit) shellcode."
)
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
logger.error("-" * 80)
return False
return True
def is_supported_arch_type():
file_info = currentProgram.getLanguageID() # type: ignore [name-defined] # noqa: F821
if "x86" not in file_info or not any(arch in file_info for arch in ["32", "64"]):
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported architecture.")
logger.error(" ")
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
logger.error("-" * 80)
return False
return True
def get_file_md5():
return currentProgram.getExecutableMD5() # type: ignore [name-defined] # noqa: F821
def get_file_sha256():
return currentProgram.getExecutableSHA256() # type: ignore [name-defined] # noqa: F821
def collect_metadata(rules: List[Path]):
md5 = get_file_md5()
sha256 = get_file_sha256()
info = currentProgram.getLanguageID().toString() # type: ignore [name-defined] # noqa: F821
if "x86" in info and "64" in info:
arch = "x86_64"
elif "x86" in info and "32" in info:
arch = "x86"
else:
arch = "unknown arch"
format_name: str = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821
if "PE" in format_name:
os = "windows"
elif "ELF" in format_name:
with contextlib.closing(capa.ghidra.helpers.GHIDRAIO()) as f:
os = capa.features.extractors.elf.detect_elf_os(f)
else:
os = "unknown os"
return rdoc.Metadata(
timestamp=datetime.datetime.now(),
version=capa.version.__version__,
argv=(),
sample=rdoc.Sample(
md5=md5,
sha1="",
sha256=sha256,
path=currentProgram.getExecutablePath(), # type: ignore [name-defined] # noqa: F821
),
analysis=rdoc.Analysis(
format=currentProgram.getExecutableFormat(), # type: ignore [name-defined] # noqa: F821
arch=arch,
os=os,
extractor="ghidra",
rules=tuple(r.resolve().absolute().as_posix() for r in rules),
base_address=capa.features.freeze.Address.from_capa(currentProgram.getImageBase().getOffset()), # type: ignore [name-defined] # noqa: F821
layout=rdoc.Layout(
functions=(),
),
feature_counts=rdoc.FeatureCounts(file=0, functions=()),
library_functions=(),
),
)

View File

@@ -44,7 +44,7 @@ def is_runtime_ida():
def is_runtime_ghidra():
return importlib.util.find_spec("ghidra.program.flatapi") is not None
return importlib.util.find_spec("ghidra") is not None
def assert_never(value) -> NoReturn:

View File

@@ -256,6 +256,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
if capa.helpers.is_runtime_ghidra():
# Ghidrathon interpreter cannot properly handle
# the TMonitor thread that is created via a monitor_interval
# > 0
pbar.monitor_interval = 0
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
@@ -1340,14 +1345,9 @@ def ida_main():
def ghidra_main():
import capa.rules
import capa.features.extractors.ghidra.file
# import capa.render.default
# import capa.features.extractors.ghidra.extractor
import capa.features.extractors.ghidra.global_
import capa.features.extractors.ghidra.helpers
import capa.features.extractors.ghidra.function
from capa.features.common import Feature
import capa.ghidra.helpers
import capa.render.default
import capa.features.extractors.ghidra.extractor
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
@@ -1359,21 +1359,25 @@ def ghidra_main():
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
# rules_path = os.path.join(get_default_root(), "rules")
# logger.debug("rule path: %s", rules_path)
# rules = get_rules([rules_path])
rules_path = get_default_root() / "rules"
logger.debug("rule path: %s", rules_path)
rules = get_rules([rules_path])
# temp test for ghidra CI
ghidra_features: List[Tuple[Feature, Address]] = []
ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_os())
ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
ghidra_features.extend(capa.features.extractors.ghidra.file.extract_features())
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
ghidra_features.extend(list(capa.features.extractors.ghidra.function.extract_features(fhandle)))
meta = capa.ghidra.helpers.collect_metadata([rules_path])
import pprint
capabilities, counts = find_capabilities(
rules,
capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(),
not capa.ghidra.helpers.is_running_headless(),
)
pprint.pprint(ghidra_features) # noqa: T203
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
if has_file_limitation(rules, capabilities, is_standalone=False):
logger.info("capa encountered warnings during analysis")
print(capa.render.default.render(meta, rules, capabilities))
if __name__ == "__main__":

2
rules

Submodule rules updated: 149cf2d133...7685a232d9