mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 07:40:38 -08:00
Compare commits
2 Commits
3687bb95e9
...
0ba5f9664a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ba5f9664a | ||
|
|
98873c8570 |
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -173,8 +173,8 @@ jobs:
|
||||
matrix:
|
||||
python-version: ["3.10", "3.13"]
|
||||
java-version: ["21"]
|
||||
ghidra-version: ["11.4"]
|
||||
public-version: ["PUBLIC_20250620"] # for ghidra releases
|
||||
ghidra-version: ["12.0"]
|
||||
public-version: ["PUBLIC_20251205"] # for ghidra releases
|
||||
steps:
|
||||
- name: Checkout capa with submodules
|
||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import weakref
|
||||
import contextlib
|
||||
from typing import Iterator
|
||||
|
||||
@@ -58,19 +59,17 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
self.externs = ghidra_helpers.get_file_externs()
|
||||
self.fakes = ghidra_helpers.map_fake_import_addrs()
|
||||
|
||||
# Register cleanup to run when the extractor is garbage collected or when the program exits.
|
||||
# We use weakref.finalize instead of __del__ to avoid issues with reference cycles and
|
||||
# to ensure deterministic cleanup on interpreter shutdown.
|
||||
if self.ctx_manager or self.tmpdir:
|
||||
weakref.finalize(self, cleanup, self.ctx_manager, self.tmpdir)
|
||||
|
||||
def get_base_address(self):
|
||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||
|
||||
return AbsoluteVirtualAddress(ghidra_helpers.get_current_program().getImageBase().getOffset())
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, "ctx_manager") and self.ctx_manager:
|
||||
with contextlib.suppress(Exception):
|
||||
self.ctx_manager.__exit__(None, None, None)
|
||||
if hasattr(self, "tmpdir") and self.tmpdir:
|
||||
with contextlib.suppress(Exception):
|
||||
self.tmpdir.cleanup()
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
@@ -113,3 +112,12 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
||||
|
||||
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
|
||||
yield from capa.features.extractors.ghidra.insn.extract_features(fh, bbh, ih)
|
||||
|
||||
|
||||
def cleanup(ctx_manager, tmpdir):
|
||||
if ctx_manager:
|
||||
with contextlib.suppress(Exception):
|
||||
ctx_manager.__exit__(None, None, None)
|
||||
if tmpdir:
|
||||
with contextlib.suppress(Exception):
|
||||
tmpdir.cleanup()
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Ghidra >= 11.4 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
|
||||
- Ghidra >= 12.0 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
|
||||
|
||||
## Usage
|
||||
|
||||
|
||||
@@ -90,11 +90,11 @@ def is_supported_ghidra_version():
|
||||
try:
|
||||
# version format example: "11.1.2" or "11.4"
|
||||
major, minor = map(int, version.split(".")[:2])
|
||||
if major < 11 or (major == 11 and minor < 4):
|
||||
if major < 12:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Ghidra version %s is not supported.", version)
|
||||
logger.error(" ")
|
||||
logger.error(" capa requires Ghidra 11.4 or higher.")
|
||||
logger.error(" capa requires Ghidra 12.0 or higher.")
|
||||
logger.error("-" * 80)
|
||||
return False
|
||||
except ValueError:
|
||||
|
||||
@@ -368,21 +368,47 @@ def get_extractor(
|
||||
|
||||
tmpdir = tempfile.TemporaryDirectory()
|
||||
|
||||
# PyGhidra's open_program returns a context manager.
|
||||
# We manually enter it here and pass it to the extractor, which will exit it when done.
|
||||
cm = pyghidra.open_program(str(input_path), project_location=tmpdir.name)
|
||||
flat_api = cm.__enter__()
|
||||
project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)
|
||||
project = project_cm.__enter__()
|
||||
try:
|
||||
from ghidra.util.task import TaskMonitor
|
||||
|
||||
monitor = TaskMonitor.DUMMY
|
||||
program = flat_api.getCurrentProgram()
|
||||
|
||||
# Import file
|
||||
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
|
||||
with loader.load() as load_results:
|
||||
load_results.save(monitor)
|
||||
|
||||
# Open program
|
||||
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)
|
||||
|
||||
# Analyze
|
||||
pyghidra.analyze(program, monitor)
|
||||
|
||||
from ghidra.program.flatapi import FlatProgramAPI
|
||||
|
||||
flat_api = FlatProgramAPI(program)
|
||||
|
||||
import capa.features.extractors.ghidra.context as ghidra_context
|
||||
|
||||
ghidra_context.set_context(program, flat_api, monitor)
|
||||
|
||||
# Wrapper to handle cleanup of program (consumer) and project
|
||||
class GhidraContextWrapper:
|
||||
def __init__(self, project_cm, program, consumer):
|
||||
self.project_cm = project_cm
|
||||
self.program = program
|
||||
self.consumer = consumer
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.program.release(self.consumer)
|
||||
self.project_cm.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
cm = GhidraContextWrapper(project_cm, program, consumer)
|
||||
|
||||
except Exception:
|
||||
cm.__exit__(None, None, None)
|
||||
project_cm.__exit__(None, None, None)
|
||||
tmpdir.cleanup()
|
||||
raise
|
||||
|
||||
|
||||
@@ -25,17 +25,3 @@ from fixtures import _692f_dotnetfile_extractor # noqa: F401 [imported but unus
|
||||
from fixtures import _1c444_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||
from fixtures import _039a6_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||
from fixtures import _0953c_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||
|
||||
|
||||
def pytest_sessionfinish(session, exitstatus):
|
||||
try:
|
||||
import pyghidra
|
||||
|
||||
if pyghidra.started():
|
||||
import os
|
||||
|
||||
# If PyGhidra was started, we might need to force exit to prevent hangs
|
||||
# due to non-daemon JVM threads.
|
||||
os._exit(exitstatus)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
import importlib.util
|
||||
|
||||
import pytest
|
||||
@@ -18,7 +19,7 @@ import fixtures
|
||||
|
||||
import capa.features.common
|
||||
|
||||
ghidra_present = importlib.util.find_spec("pyghidra") is not None
|
||||
ghidra_present = importlib.util.find_spec("pyghidra") is not None and "GHIDRA_INSTALL_DIR" in os.environ
|
||||
|
||||
|
||||
@pytest.mark.skipif(ghidra_present is False, reason="PyGhidra not installed")
|
||||
|
||||
Reference in New Issue
Block a user