mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Ghidra: Function Feature Extraction (#1597)
* save progress * implement loop detection * implement recursive call detection * lint repo * fix python/java import errors * simplify recursion detection * streamline loop detection, fix helper function signature
This commit is contained in:
@@ -126,8 +126,6 @@ def extract_file_strings() -> Iterator[Tuple[Feature, Address]]:
|
||||
for block in currentProgram.getMemory().getBlocks():
|
||||
if block.isInitialized():
|
||||
p_bytes = capa.features.extractors.ghidra.helpers.get_block_bytes(block)
|
||||
if len(p_bytes) == 0:
|
||||
break
|
||||
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(p_bytes):
|
||||
offset = block.getStart().getOffset() + s.offset
|
||||
|
||||
72
capa/features/extractors/ghidra/function.py
Normal file
72
capa/features/extractors/ghidra/function.py
Normal file
@@ -0,0 +1,72 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
import ghidra
|
||||
from ghidra.program.model.block import BasicBlockModel, SimpleBlockIterator
|
||||
|
||||
import capa.features.extractors.ghidra.helpers
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors import loops
|
||||
from capa.features.extractors.base_extractor import FunctionHandle
|
||||
|
||||
currentProgram: ghidra.program.database.ProgramDB
|
||||
monitor: ghidra.util.task.TaskMonitor
|
||||
|
||||
|
||||
def extract_function_calls_to(fh: ghidra.program.database.function.FunctionDB):
|
||||
"""extract callers to a function"""
|
||||
for ref in fh.getSymbol().getReferences():
|
||||
if ref.getReferenceType().isCall():
|
||||
yield Characteristic("calls to"), AbsoluteVirtualAddress(ref.getFromAddress().getOffset())
|
||||
|
||||
|
||||
def extract_function_loop(fh: ghidra.program.database.function.FunctionDB):
|
||||
edges = []
|
||||
|
||||
for block in SimpleBlockIterator(BasicBlockModel(currentProgram), fh.getBody(), monitor):
|
||||
dests = block.getDestinations(monitor)
|
||||
s_addrs = block.getStartAddresses()
|
||||
|
||||
while dests.hasNext(): # For loop throws Python TypeError
|
||||
for addr in s_addrs:
|
||||
edges.append((addr.getOffset(), dests.next().getDestinationAddress().getOffset()))
|
||||
|
||||
if loops.has_loop(edges):
|
||||
yield Characteristic("loop"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset())
|
||||
|
||||
|
||||
def extract_recursive_call(fh: ghidra.program.database.function.FunctionDB):
|
||||
for f in fh.getCalledFunctions(monitor):
|
||||
if f.getEntryPoint().getOffset() == fh.getEntryPoint().getOffset():
|
||||
yield Characteristic("recursive call"), AbsoluteVirtualAddress(fh.getEntryPoint().getOffset())
|
||||
|
||||
|
||||
def extract_features(fh: ghidra.program.database.function.FunctionDB) -> Iterator[Tuple[Feature, Address]]:
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, addr in func_handler(fh):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call)
|
||||
|
||||
|
||||
def main():
|
||||
""" """
|
||||
features = []
|
||||
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
|
||||
features.extend(list(extract_features(fhandle)))
|
||||
|
||||
import pprint
|
||||
|
||||
pprint.pprint(features)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -8,10 +8,20 @@
|
||||
from typing import Any, Dict, Tuple, Iterator, Optional
|
||||
|
||||
import ghidra
|
||||
from ghidra.program.model.symbol import SymbolType
|
||||
|
||||
currentProgram: ghidra.program.database.ProgramDB
|
||||
|
||||
|
||||
def fix_byte(b: int) -> bytes:
|
||||
"""Transform signed ints from Java into bytes for Python
|
||||
|
||||
args:
|
||||
b: signed int returned from Java processing
|
||||
"""
|
||||
return (b & 0xFF).to_bytes(1, "little")
|
||||
|
||||
|
||||
def find_byte_sequence(seq: bytes) -> Iterator[int]:
|
||||
"""yield all ea of a given byte sequence
|
||||
|
||||
@@ -37,7 +47,7 @@ def get_bytes(addr: ghidra.program.model.address.Address, length: int) -> bytes:
|
||||
try:
|
||||
signed_ints = getBytes(addr, length) # type: ignore [name-defined]
|
||||
for b in signed_ints:
|
||||
bytez = bytez + (b & 0xFF).to_bytes(1, "little")
|
||||
bytez = bytez + fix_byte(b)
|
||||
return bytez
|
||||
except RuntimeError:
|
||||
return bytez
|
||||
@@ -54,7 +64,14 @@ def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes:
|
||||
try:
|
||||
signed_ints = getBytes(block.getStart(), block.getEnd().getOffset() - block.getStart().getOffset()) # type: ignore [name-defined]
|
||||
for b in signed_ints:
|
||||
bytez = bytez + (b & 0xFF).to_bytes(1, "little")
|
||||
bytez = bytez + fix_byte(b)
|
||||
return bytez
|
||||
except RuntimeError:
|
||||
return bytez
|
||||
|
||||
|
||||
def get_function_symbols() -> Iterator[ghidra.program.database.function.FunctionDB]:
|
||||
"""yield all non-external function symbols"""
|
||||
|
||||
for f in currentProgram.getFunctionManager().getFunctionsNoStubs(True):
|
||||
yield f
|
||||
|
||||
22
capa/main.py
22
capa/main.py
@@ -1002,7 +1002,7 @@ def handle_common_args(args):
|
||||
# if isinstance(sys.stdout, io.TextIOWrapper):
|
||||
# sys.stdout.reconfigure(...)
|
||||
sys.stdout.reconfigure(encoding="utf-8")
|
||||
colorama.just_fix_windows_console() # type: ignore [attr-defined]
|
||||
colorama.just_fix_windows_console()
|
||||
|
||||
if args.color == "always":
|
||||
colorama.init(strip=False)
|
||||
@@ -1344,6 +1344,8 @@ def ghidra_main():
|
||||
# import capa.render.default
|
||||
# import capa.features.extractors.ghidra.extractor
|
||||
import capa.features.extractors.ghidra.global_
|
||||
import capa.features.extractors.ghidra.helpers
|
||||
import capa.features.extractors.ghidra.function
|
||||
from capa.features.common import Feature
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -1360,15 +1362,17 @@ def ghidra_main():
|
||||
# logger.debug("rule path: %s", rules_path)
|
||||
# rules = get_rules([rules_path])
|
||||
|
||||
# temp test for OS & ARCH extractions
|
||||
globl_features: List[Tuple[Feature, Address]] = []
|
||||
globl_features.extend(capa.features.extractors.ghidra.global_.extract_os())
|
||||
globl_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
|
||||
print(globl_features)
|
||||
# temp test for ghidra CI
|
||||
ghidra_features: List[Tuple[Feature, Address]] = []
|
||||
ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_os())
|
||||
ghidra_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
|
||||
ghidra_features.extend(capa.features.extractors.ghidra.file.extract_features())
|
||||
for fhandle in capa.features.extractors.ghidra.helpers.get_function_symbols():
|
||||
ghidra_features.extend(list(capa.features.extractors.ghidra.function.extract_features(fhandle)))
|
||||
|
||||
file_features: List[Tuple[Feature, Address]] = []
|
||||
file_features.extend(capa.features.extractors.ghidra.file.extract_features())
|
||||
print(file_features)
|
||||
import pprint
|
||||
|
||||
pprint.pprint(ghidra_features)
|
||||
|
||||
|
||||
def is_runtime_ida():
|
||||
|
||||
Reference in New Issue
Block a user