diff --git a/capa/features/extractors/ghidra/basicblock.py b/capa/features/extractors/ghidra/basicblock.py index ddd24165..f27ad6ed 100644 --- a/capa/features/extractors/ghidra/basicblock.py +++ b/capa/features/extractors/ghidra/basicblock.py @@ -12,15 +12,16 @@ from typing import Tuple, Iterator import ghidra from ghidra.program.model.lang import OperandType -from ghidra.program.model.block import BasicBlockModel, SimpleBlockIterator import capa.features.extractors.ghidra.helpers from capa.features.common import Feature, Characteristic -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN +from capa.features.extractors.base_extractor import BBHandle, FunctionHandle -listing = currentProgram.getListing() # type: ignore [name-defined] # noqa: F821 +currentProgram = currentProgram() # type: ignore # noqa: F821 +listing = currentProgram.getListing() # type: ignore # noqa: F821 def get_printable_len(op: ghidra.program.model.scalar.Scalar) -> int: @@ -98,16 +99,20 @@ def _bb_has_tight_loop(bb: ghidra.program.model.block.CodeBlock): return False -def extract_bb_stackstring(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]: +def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: """extract stackstring indicators from basic block""" + bb: ghidra.program.model.block.CodeBlock = bbh.inner + if bb_contains_stackstring(bb): - yield Characteristic("stack string"), AbsoluteVirtualAddress(bb.getMinAddress().getOffset()) + yield Characteristic("stack string"), bbh.address -def extract_bb_tight_loop(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]: +def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: """check basic block for tight loop indicators""" + bb: ghidra.program.model.block.CodeBlock = bbh.inner + if _bb_has_tight_loop(bb): - yield Characteristic("tight loop"), AbsoluteVirtualAddress(bb.getMinAddress().getOffset()) + yield Characteristic("tight loop"), bbh.address BASIC_BLOCK_HANDLERS = ( @@ -116,7 +121,7 @@ BASIC_BLOCK_HANDLERS = ( ) -def extract_features(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple[Feature, Address]]: +def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: """ extract features from the given basic block. @@ -126,17 +131,17 @@ def extract_features(bb: ghidra.program.model.block.CodeBlock) -> Iterator[Tuple yields: Tuple[Feature, int]: the features and their location found in this basic block. """ - yield BasicBlock(), AbsoluteVirtualAddress(bb.getMinAddress().getOffset()) + yield BasicBlock(), bbh.address for bb_handler in BASIC_BLOCK_HANDLERS: - for feature, addr in bb_handler(bb): + for feature, addr in bb_handler(fh, bbh): yield feature, addr def main(): features = [] - for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols(): - for bb in SimpleBlockIterator(BasicBlockModel(currentProgram), fhandle.getBody(), monitor): # type: ignore [name-defined] # noqa: F821 - features.extend(list(extract_features(bb))) + for fh in capa.features.extractors.ghidra.helpers.get_function_symbols(): + for bbh in capa.features.extractors.ghidra.helpers.get_function_blocks(fh): + features.extend(list(extract_features(fh, bbh))) import pprint diff --git a/capa/features/extractors/ghidra/extractor.py b/capa/features/extractors/ghidra/extractor.py index af504737..83a6716a 100644 --- a/capa/features/extractors/ghidra/extractor.py +++ b/capa/features/extractors/ghidra/extractor.py @@ -5,16 +5,19 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from typing import List, Tuple - -import ghidra +from typing import List, Tuple, Iterator +import capa.features.extractors.ghidra.file +import capa.features.extractors.ghidra.insn import capa.features.extractors.ghidra.global_ +import capa.features.extractors.ghidra.function +import capa.features.extractors.ghidra.basicblock from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.base_extractor import FeatureExtractor +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor -currentProgram: ghidra.program.database.ProgramDB +currentProgram = currentProgram() # type: ignore # noqa: F821 +currentAddress = currentAddress() # type: ignore # noqa: F821 class GhidraFeatureExtractor(FeatureExtractor): @@ -33,3 +36,33 @@ class GhidraFeatureExtractor(FeatureExtractor): def extract_file_features(self): yield from capa.features.extractors.ghidra.file.extract_features() + + def get_functions(self) -> Iterator[FunctionHandle]: + import capa.features.extractors.ghidra.helpers as ghidra_helpers + + yield from ghidra_helpers.get_function_symbols() + + @staticmethod + def get_function(addr: int) -> FunctionHandle: + get_addr = currentAddress.getAddress(hex(addr)) # type: ignore [name-defined] # noqa: F821 + func = getFunctionContaining(get_addr) # type: ignore [name-defined] # noqa: F821 + return FunctionHandle(address=AbsoluteVirtualAddress(func.getAddress().getOffset()), inner=func) + + def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.ghidra.function.extract_features(fh) + + def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: + import capa.features.extractors.ghidra.helpers as ghidra_helpers + + yield from ghidra_helpers.get_function_blocks(fh) + + def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.ghidra.basicblock.extract_features(fh, bbh) + + def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]: + import capa.features.extractors.ghidra.helpers as ghidra_helpers + + yield from ghidra_helpers.get_insn_in_range(bbh) + + def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle): + yield from capa.features.extractors.ghidra.insn.extract_features(fh, bbh, ih) diff --git a/capa/features/extractors/ghidra/file.py b/capa/features/extractors/ghidra/file.py index 8845354f..f3da41f9 100644 --- a/capa/features/extractors/ghidra/file.py +++ b/capa/features/extractors/ghidra/file.py @@ -19,6 +19,7 @@ from capa.features.file import Export, Import, Section, FunctionName from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress +currentProgram = currentProgram() # type: ignore # noqa: F821 MAX_OFFSET_PE_AFTER_MZ = 0x200 diff --git a/capa/features/extractors/ghidra/function.py b/capa/features/extractors/ghidra/function.py index 98d4ee9b..3f9c956c 100644 --- a/capa/features/extractors/ghidra/function.py +++ b/capa/features/extractors/ghidra/function.py @@ -14,19 +14,25 @@ import capa.features.extractors.ghidra.helpers from capa.features.common import Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors import loops +from capa.features.extractors.base_extractor import FunctionHandle + +currentProgram = currentProgram() # type: ignore # noqa: F821 +monitor = monitor() # type: ignore # noqa: F821 -def extract_function_calls_to(fh: ghidra.program.database.function.FunctionDB): +def extract_function_calls_to(fh: FunctionHandle): """extract callers to a function""" - for ref in fh.getSymbol().getReferences(): + f: ghidra.program.database.function.FunctionDB = fh.inner + for ref in f.getSymbol().getReferences(): if ref.getReferenceType().isCall(): yield Characteristic("calls to"), AbsoluteVirtualAddress(ref.getFromAddress().getOffset()) -def extract_function_loop(fh: ghidra.program.database.function.FunctionDB): - edges = [] +def extract_function_loop(fh: FunctionHandle): + f: ghidra.program.database.function.FunctionDB = fh.inner - for block in SimpleBlockIterator(BasicBlockModel(currentProgram), fh.getBody(), monitor): # type: ignore [name-defined] # noqa: F821 + edges = [] + for block in SimpleBlockIterator(BasicBlockModel(currentProgram), f.getBody(), monitor): # type: ignore [name-defined] # noqa: F821 dests = block.getDestinations(monitor) # type: ignore [name-defined] # noqa: F821 s_addrs = block.getStartAddresses() @@ -35,16 +41,18 @@ def extract_function_loop(fh: ghidra.program.database.function.FunctionDB): edges.append((addr.getOffset(), dests.next().getDestinationAddress().getOffset())) if loops.has_loop(edges): - yield Characteristic("loop"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset()) + yield Characteristic("loop"), AbsoluteVirtualAddress(f.getEntryPoint().getOffset()) -def extract_recursive_call(fh: ghidra.program.database.function.FunctionDB): - for f in fh.getCalledFunctions(monitor): # type: ignore [name-defined] # noqa: F821 - if f.getEntryPoint().getOffset() == fh.getEntryPoint().getOffset(): - yield Characteristic("recursive call"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset()) +def extract_recursive_call(fh: FunctionHandle): + f: ghidra.program.database.function.FunctionDB = fh.inner + + for func in f.getCalledFunctions(monitor): # type: ignore [name-defined] # noqa: F821 + if func.getEntryPoint().getOffset() == f.getEntryPoint().getOffset(): + yield Characteristic("recursive call"), AbsoluteVirtualAddress(f.getEntryPoint().getOffset()) -def extract_features(fh: ghidra.program.database.function.FunctionDB) -> Iterator[Tuple[Feature, Address]]: +def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: for func_handler in FUNCTION_HANDLERS: for feature, addr in func_handler(fh): yield feature, addr diff --git a/capa/features/extractors/ghidra/global_.py b/capa/features/extractors/ghidra/global_.py index bf78baec..59797dec 100644 --- a/capa/features/extractors/ghidra/global_.py +++ b/capa/features/extractors/ghidra/global_.py @@ -15,6 +15,7 @@ from capa.features.common import OS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Fe from capa.features.address import NO_ADDRESS, Address logger = logging.getLogger(__name__) +currentProgram = currentProgram() # type: ignore # noqa: F821 def extract_os() -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/ghidra/helpers.py b/capa/features/extractors/ghidra/helpers.py index 5bf6b9a8..6f520b09 100644 --- a/capa/features/extractors/ghidra/helpers.py +++ b/capa/features/extractors/ghidra/helpers.py @@ -9,10 +9,16 @@ from typing import Dict, List, Iterator import ghidra from ghidra.program.model.lang import OperandType +from ghidra.program.model.block import BasicBlockModel, SimpleBlockIterator from ghidra.program.model.symbol import SourceType, SymbolType from ghidra.program.model.address import AddressSpace import capa.features.extractors.helpers +from capa.features.address import AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle + +monitor = monitor() # type: ignore # noqa: F821 +currentProgram = currentProgram() # type: ignore # noqa: F821 def fix_byte(b: int) -> bytes: @@ -71,10 +77,29 @@ def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes: return bytez -def get_function_symbols() -> Iterator[ghidra.program.database.function.FunctionDB]: +def get_function_symbols() -> Iterator[FunctionHandle]: """yield all non-external function symbols""" - yield from currentProgram.getFunctionManager().getFunctionsNoStubs(True) # type: ignore [name-defined] # noqa: F821 + for fhandle in currentProgram.getFunctionManager().getFunctionsNoStubs(True): # type: ignore [name-defined] # noqa: F821 + yield FunctionHandle(address=AbsoluteVirtualAddress(fhandle.getEntryPoint().getOffset()), inner=fhandle) + + +def get_function_blocks(fh: FunctionHandle) -> Iterator[BBHandle]: + """yield BBHandle for each bb in a given function""" + + func: ghidra.program.database.function.FunctionDB = fh.inner + for bb in SimpleBlockIterator(BasicBlockModel(currentProgram), func.getBody(), monitor): # type: ignore [name-defined] # noqa: F821 + yield BBHandle(address=AbsoluteVirtualAddress(bb.getMinAddress().getOffset()), inner=bb) + + +def get_insn_in_range(bbh: BBHandle) -> Iterator[InsnHandle]: + """yield InshHandle for each insn in a given basicblock""" + + bb: ghidra.program.model.block.CodeBlock = bbh.inner + for addr in bb.getAddresses(True): + insn = getInstructionAt(addr) # type: ignore [name-defined] # noqa: F821 + if insn: + yield InsnHandle(address=AbsoluteVirtualAddress(insn.getAddress().getOffset()), inner=insn) def get_file_imports() -> Dict[int, List[str]]: diff --git a/capa/features/extractors/ghidra/insn.py b/capa/features/extractors/ghidra/insn.py index 4002ae28..9c77286f 100644 --- a/capa/features/extractors/ghidra/insn.py +++ b/capa/features/extractors/ghidra/insn.py @@ -9,17 +9,20 @@ from typing import Any, Dict, Tuple, Iterator import ghidra from ghidra.program.model.lang import OperandType -from ghidra.program.model.block import BasicBlockModel, SimpleBlockModel, SimpleBlockIterator +from ghidra.program.model.block import SimpleBlockModel import capa.features.extractors.helpers import capa.features.extractors.ghidra.helpers from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset from capa.features.common import MAX_BYTES_FEATURE_SIZE, Bytes, String, Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features SECURITY_COOKIE_BYTES_DELTA = 0x40 +currentProgram = currentProgram() # type: ignore # noqa: F821 +monitor = monitor() # type: ignore # noqa: F821 # significantly cut down on runtime by caching api info imports = capa.features.extractors.ghidra.helpers.get_file_imports() @@ -49,15 +52,11 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: elif ref_type == addr_data: # we must dereference and check if the addr is a pointer to an api function addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn) - if addr_ref != insn.getAddress(0): - if not capa.features.extractors.ghidra.helpers.check_addr_for_api( - addr_ref, mapped_fake_addrs, imports, externs, external_locs - ): - return - ref = addr_ref.getOffset() - else: - # could not dereference + if not capa.features.extractors.ghidra.helpers.check_addr_for_api( + addr_ref, mapped_fake_addrs, imports, externs, external_locs + ): return + ref = addr_ref.getOffset() elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC: return # cannot resolve dynamics statically elif OperandType.isIndirect(ref_type): @@ -82,35 +81,31 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: yield info -def extract_insn_api_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, -) -> Iterator[Tuple[Feature, Address]]: +def extract_insn_api_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: + insn: ghidra.program.database.code.InstructionDB = ih.inner + if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return # check calls to imported functions for api in check_for_api_call(insn, imports): for imp in api: - yield API(imp), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield API(imp), ih.address # check calls to extern functions for api in check_for_api_call(insn, externs): for ext in api: - yield API(ext), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield API(ext), ih.address -def extract_insn_number_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, -) -> Iterator[Tuple[Feature, Address]]: +def extract_insn_number_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: """ parse instruction number features example: push 3136B0h ; dwControlCode """ + insn: ghidra.program.database.code.InstructionDB = ih.inner + if insn.getMnemonicString().startswith("RET"): # skip things like: # .text:0042250E retn 8 @@ -128,7 +123,7 @@ def extract_insn_number_features( continue const = insn.getScalar(i).getValue() - addr = AbsoluteVirtualAddress(insn.getAddress().getOffset()) + addr = ih.address yield Number(const), addr yield OperandNumber(i, const), addr @@ -143,17 +138,14 @@ def extract_insn_number_features( yield OperandOffset(i, const), addr -def extract_insn_offset_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, -) -> Iterator[Tuple[Feature, Address]]: +def extract_insn_offset_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: """ parse instruction structure offset features example: .text:0040112F cmp [esi+4], ebx """ + insn: ghidra.program.database.code.InstructionDB = ih.inner # ignore any stack references if not capa.features.extractors.ghidra.helpers.is_stack_referenced(insn): @@ -164,20 +156,17 @@ def extract_insn_offset_features( op_objs = insn.getOpObjects(i) if isinstance(op_objs[-1], ghidra.program.model.scalar.Scalar): op_off = op_objs[-1].getValue() - yield Offset(op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset()) - yield OperandOffset(i, op_off), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Offset(op_off), ih.address + yield OperandOffset(i, op_off), ih.address -def extract_insn_bytes_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, -) -> Iterator[Tuple[Feature, Address]]: +def extract_insn_bytes_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: """ parse referenced byte sequences example: push offset iid_004118d4_IShellLinkA ; riid """ + insn: ghidra.program.database.code.InstructionDB = ih.inner if capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return @@ -195,20 +184,18 @@ def extract_insn_bytes_features( extracted_bytes = capa.features.extractors.ghidra.helpers.get_bytes(ref, MAX_BYTES_FEATURE_SIZE) if extracted_bytes and not capa.features.extractors.helpers.all_zeros(extracted_bytes): # don't extract byte features for obvious strings - yield Bytes(extracted_bytes), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Bytes(extracted_bytes), ih.address -def extract_insn_string_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, -) -> Iterator[Tuple[Feature, Address]]: +def extract_insn_string_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: """ parse instruction string features example: push offset aAcr ; "ACR > " """ + insn: ghidra.program.database.code.InstructionDB = ih.inner + ref = insn.getAddress() for i in range(insn.getNumOperands()): if OperandType.isScalarAsAddress(insn.getOperandType(i)): @@ -217,26 +204,25 @@ def extract_insn_string_features( if ref != insn.getAddress(): ghidra_dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821 if ghidra_dat and ghidra_dat.hasStringValue(): - yield String(ghidra_dat.getValue()), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield String(ghidra_dat.getValue()), ih.address def extract_insn_mnemonic_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """parse instruction mnemonic features""" - yield Mnemonic(insn.getMnemonicString().lower()), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + insn: ghidra.program.database.code.InstructionDB = ih.inner + + yield Mnemonic(insn.getMnemonicString().lower()), ih.address def extract_insn_obfs_call_plus_5_characteristic_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """ parse call $+5 instruction from the given instruction. """ + insn: ghidra.program.database.code.InstructionDB = ih.inner if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return @@ -248,46 +234,45 @@ def extract_insn_obfs_call_plus_5_characteristic_features( ref = insn.getAddress(i) if insn.getAddress().add(5) == ref: - yield Characteristic("call $+5"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("call $+5"), ih.address def extract_insn_segment_access_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """parse instruction fs or gs access""" + insn: ghidra.program.database.code.InstructionDB = ih.inner + insn_str = insn.toString() if "FS:" in insn_str: - yield Characteristic("fs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("fs access"), ih.address if "GS:" in insn_str: - yield Characteristic("gs access"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("gs access"), ih.address def extract_insn_peb_access_characteristic_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """parse instruction peb access fs:[0x30] on x86, gs:[0x60] on x64 """ + insn: ghidra.program.database.code.InstructionDB = ih.inner + insn_str = insn.toString() if insn_str.startswith(("PUSH", "MOV")): if "FS:[0x30]" in insn_str or "GS:[0x60]" in insn_str: - yield Characteristic("peb access"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("peb access"), ih.address def extract_insn_cross_section_cflow( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """inspect the instruction for a CALL or JMP that crosses section boundaries""" + insn: ghidra.program.database.code.InstructionDB = ih.inner if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return @@ -311,13 +296,9 @@ def extract_insn_cross_section_cflow( elif ref_type == addr_data: # we must dereference and check if the addr is a pointer to an api function ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn) - if ref != insn.getAddress(0): - if capa.features.extractors.ghidra.helpers.check_addr_for_api( - ref, mapped_fake_addrs, imports, externs, external_locs - ): - return - else: - # could not dereference + if capa.features.extractors.ghidra.helpers.check_addr_for_api( + ref, mapped_fake_addrs, imports, externs, external_locs + ): return elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC: return # cannot resolve dynamics statically @@ -334,18 +315,19 @@ def extract_insn_cross_section_cflow( this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821 ref_block = getMemoryBlock(ref) # type: ignore [name-defined] # noqa: F821 if ref_block != this_mem_block: - yield Characteristic("cross section flow"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("cross section flow"), ih.address def extract_function_calls_from( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, + bb: BBHandle, + ih: InsnHandle, ) -> Iterator[Tuple[Feature, Address]]: """extract functions calls from features most relevant at the function scope, however, its most efficient to extract at the instruction scope """ + insn: ghidra.program.database.code.InstructionDB = ih.inner if insn.getMnemonicString().startswith("CALL"): # This method of "dereferencing" addresses/ pointers @@ -366,9 +348,9 @@ def extract_function_calls_from( def extract_function_indirect_call_characteristic_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, + bb: BBHandle, + ih: InsnHandle, ) -> Iterator[Tuple[Feature, Address]]: """extract indirect function calls (e.g., call eax or call dword ptr [edx+4]) does not include calls like => call ds:dword_ABD4974 @@ -376,9 +358,11 @@ def extract_function_indirect_call_characteristic_features( most relevant at the function or basic block scope; however, its most efficient to extract at the instruction scope """ + insn: ghidra.program.database.code.InstructionDB = ih.inner + if insn.getMnemonicString().startswith("CALL"): if OperandType.isIndirect(insn.getOperandType(0)): - yield Characteristic("indirect call"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("indirect call"), ih.address def check_nzxor_security_cookie_delta( @@ -407,25 +391,28 @@ def check_nzxor_security_cookie_delta( def extract_insn_nzxor_characteristic_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, + bb: BBHandle, + ih: InsnHandle, ) -> Iterator[Tuple[Feature, Address]]: + f: ghidra.program.database.function.FunctionDB = fh.inner + insn: ghidra.program.database.code.InstructionDB = ih.inner + if "XOR" not in insn.getMnemonicString(): return if capa.features.extractors.ghidra.helpers.is_stack_referenced(insn): return if capa.features.extractors.ghidra.helpers.is_zxor(insn): return - if check_nzxor_security_cookie_delta(fh, insn): + if check_nzxor_security_cookie_delta(f, insn): return - yield Characteristic("nzxor"), AbsoluteVirtualAddress(insn.getAddress().getOffset()) + yield Characteristic("nzxor"), ih.address def extract_features( - fh: ghidra.program.database.function.FunctionDB, - bb: ghidra.program.model.block.CodeBlock, - insn: ghidra.program.database.code.InstructionDB, + fh: FunctionHandle, + bb: BBHandle, + insn: InsnHandle, ) -> Iterator[Tuple[Feature, Address]]: for insn_handler in INSTRUCTION_HANDLERS: for feature, addr in insn_handler(fh, bb, insn): @@ -451,12 +438,11 @@ INSTRUCTION_HANDLERS = ( def main(): """ """ - listing = currentProgram.getListing() # type: ignore [name-defined] # noqa: F821 features = [] - for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols(): - for bab in SimpleBlockIterator(BasicBlockModel(currentProgram), fhandle.getBody(), monitor): # type: ignore [name-defined] # noqa: F821 - for insnh in listing.getInstructions(bab, True): - features.extend(list(extract_features(fhandle, bab, insnh))) + for fh in capa.features.extractors.ghidra.helpers.get_function_symbols(): + for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh): + for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb): + features.extend(list(extract_features(fh, bb, insn))) import pprint diff --git a/capa/ghidra/__init__.py b/capa/ghidra/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/capa/ghidra/helpers.py b/capa/ghidra/helpers.py new file mode 100644 index 00000000..ffe0d373 --- /dev/null +++ b/capa/ghidra/helpers.py @@ -0,0 +1,156 @@ +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +import datetime +import contextlib +from typing import List +from pathlib import Path + +import capa +import capa.version +import capa.features.common +import capa.features.freeze +import capa.render.result_document as rdoc +import capa.features.extractors.ghidra.helpers + +logger = logging.getLogger("capa") + +currentProgram = currentProgram() # type: ignore # noqa: F821 +currentAddress = currentAddress() # type: ignore # noqa: F821 + +# file type as returned by Ghidra +SUPPORTED_FILE_TYPES = ("Executable and Linking Format (ELF)", "Portable Executable (PE)", "Raw Binary") + + +class GHIDRAIO: + """ + An object that acts as a file-like object, + using bytes from the current Ghidra listing. + """ + + def __init__(self): + super().__init__() + self.offset = 0 + + def seek(self, offset, whence=0): + assert whence == 0 + self.offset = offset + + def read(self, size): + try: + # ghidra.program.model.address.Address has no public constructor, + # so we have to use the exposed currentAddress object for its + # member function .getAddress() + ea = currentAddress.getAddress(hex(self.offset)) # type: ignore [name-defined] # noqa: F821 + except RuntimeError: # AddressFormatException to Ghidra + logger.debug("cannot read 0x%x bytes at 0x%x (ea: BADADDR)", size, self.offset) + return b"" + + logger.debug("reading 0x%x bytes at 0x%x (ea: 0x%x)", size, self.offset, ea.getOffset()) + + # returns bytes or b"" + return capa.features.extractors.ghidra.helpers.get_bytes(ea, size) + + def close(self): + return + + +def is_supported_ghidra_version(): + version = float(getGhidraVersion()[:4]) # type: ignore [name-defined] # noqa: F821 + if version < 10.2: + warning_msg = "capa does not support this Ghidra version" + logger.warning(warning_msg) + logger.warning("Your Ghidra version is: %s. Supported versions are: Ghidra >= 10.2", version) + return False + return True + + +def is_running_headless(): + return isRunningHeadless() # type: ignore [name-defined] # noqa: F821 + + +def is_supported_file_type(): + file_info = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821 + if file_info.filetype not in SUPPORTED_FILE_TYPES: + logger.error("-" * 80) + logger.error(" Input file does not appear to be a supported file type.") + logger.error(" ") + logger.error( + " capa currently only supports analyzing PE, ELF, or binary files containing x86 (32- and 64-bit) shellcode." + ) + logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.") + logger.error("-" * 80) + return False + return True + + +def is_supported_arch_type(): + file_info = currentProgram.getLanguageID() # type: ignore [name-defined] # noqa: F821 + if "x86" not in file_info or not any(arch in file_info for arch in ["32", "64"]): + logger.error("-" * 80) + logger.error(" Input file does not appear to target a supported architecture.") + logger.error(" ") + logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).") + logger.error("-" * 80) + return False + return True + + +def get_file_md5(): + return currentProgram.getExecutableMD5() # type: ignore [name-defined] # noqa: F821 + + +def get_file_sha256(): + return currentProgram.getExecutableSHA256() # type: ignore [name-defined] # noqa: F821 + + +def collect_metadata(rules: List[Path]): + md5 = get_file_md5() + sha256 = get_file_sha256() + + info = currentProgram.getLanguageID().toString() # type: ignore [name-defined] # noqa: F821 + if "x86" in info and "64" in info: + arch = "x86_64" + elif "x86" in info and "32" in info: + arch = "x86" + else: + arch = "unknown arch" + + format_name: str = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821 + if "PE" in format_name: + os = "windows" + elif "ELF" in format_name: + with contextlib.closing(capa.ghidra.helpers.GHIDRAIO()) as f: + os = capa.features.extractors.elf.detect_elf_os(f) + else: + os = "unknown os" + + return rdoc.Metadata( + timestamp=datetime.datetime.now(), + version=capa.version.__version__, + argv=(), + sample=rdoc.Sample( + md5=md5, + sha1="", + sha256=sha256, + path=currentProgram.getExecutablePath(), # type: ignore [name-defined] # noqa: F821 + ), + analysis=rdoc.Analysis( + format=currentProgram.getExecutableFormat(), # type: ignore [name-defined] # noqa: F821 + arch=arch, + os=os, + extractor="ghidra", + rules=tuple(r.resolve().absolute().as_posix() for r in rules), + base_address=capa.features.freeze.Address.from_capa(currentProgram.getImageBase().getOffset()), # type: ignore [name-defined] # noqa: F821 + layout=rdoc.Layout( + functions=(), + ), + feature_counts=rdoc.FeatureCounts(file=0, functions=()), + library_functions=(), + ), + ) diff --git a/capa/helpers.py b/capa/helpers.py index 69c7ccf2..c1d2ebf5 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -44,7 +44,7 @@ def is_runtime_ida(): def is_runtime_ghidra(): - return importlib.util.find_spec("ghidra.program.flatapi") is not None + return importlib.util.find_spec("ghidra") is not None def assert_never(value) -> NoReturn: diff --git a/capa/main.py b/capa/main.py index 07bb007d..8839f889 100644 --- a/capa/main.py +++ b/capa/main.py @@ -256,6 +256,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro with redirecting_print_to_tqdm(disable_progress): with tqdm.contrib.logging.logging_redirect_tqdm(): pbar = tqdm.tqdm + if capa.helpers.is_runtime_ghidra(): + # Ghidrathon interpreter cannot properly handle + # the TMonitor thread that is created via a monitor_interval + # > 0 + pbar.monitor_interval = 0 if disable_progress: # do not use tqdm to avoid unnecessary side effects when caller intends # to disable progress completely @@ -1340,14 +1345,9 @@ def ida_main(): def ghidra_main(): import capa.rules - import capa.features.extractors.ghidra.file - - # import capa.render.default - # import capa.features.extractors.ghidra.extractor - import capa.features.extractors.ghidra.global_ - import capa.features.extractors.ghidra.helpers - import capa.features.extractors.ghidra.function - from capa.features.common import Feature + import capa.ghidra.helpers + import capa.render.default + import capa.features.extractors.ghidra.extractor logging.basicConfig(level=logging.INFO) logging.getLogger().setLevel(logging.INFO) @@ -1359,21 +1359,25 @@ def ghidra_main(): logger.debug(" https://github.com/mandiant/capa-rules") logger.debug("-" * 80) - # rules_path = os.path.join(get_default_root(), "rules") - # logger.debug("rule path: %s", rules_path) - # rules = get_rules([rules_path]) + rules_path = get_default_root() / "rules" + logger.debug("rule path: %s", rules_path) + rules = get_rules([rules_path]) - # temp test for ghidra CI - ghidra_features: List[Tuple[Feature, Address]] = [] - ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_os()) - ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_arch()) - ghidra_features.extend(capa.features.extractors.ghidra.file.extract_features()) - for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols(): - ghidra_features.extend(list(capa.features.extractors.ghidra.function.extract_features(fhandle))) + meta = capa.ghidra.helpers.collect_metadata([rules_path]) - import pprint + capabilities, counts = find_capabilities( + rules, + capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(), + not capa.ghidra.helpers.is_running_headless(), + ) - pprint.pprint(ghidra_features) # noqa: T203 + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + + if has_file_limitation(rules, capabilities, is_standalone=False): + logger.info("capa encountered warnings during analysis") + + print(capa.render.default.render(meta, rules, capabilities)) if __name__ == "__main__": diff --git a/rules b/rules index 149cf2d1..7685a232 160000 --- a/rules +++ b/rules @@ -1 +1 @@ -Subproject commit 149cf2d133a0ea08b4eb250388e9f93c67b83cbf +Subproject commit 7685a232d94acbe7e69addb8bd89d752c9fa27a2 diff --git a/tests/data b/tests/data index cf965117..faf741a5 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit cf965117cbb3d7391bf01ab8dfb049262a3ad4fd +Subproject commit faf741a538224f52d4412468f910d52a70911662