mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 07:40:38 -08:00
Compare commits
2 Commits
3687bb95e9
...
0ba5f9664a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ba5f9664a | ||
|
|
98873c8570 |
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -173,8 +173,8 @@ jobs:
|
|||||||
matrix:
|
matrix:
|
||||||
python-version: ["3.10", "3.13"]
|
python-version: ["3.10", "3.13"]
|
||||||
java-version: ["21"]
|
java-version: ["21"]
|
||||||
ghidra-version: ["11.4"]
|
ghidra-version: ["12.0"]
|
||||||
public-version: ["PUBLIC_20250620"] # for ghidra releases
|
public-version: ["PUBLIC_20251205"] # for ghidra releases
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout capa with submodules
|
- name: Checkout capa with submodules
|
||||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import weakref
|
||||||
import contextlib
|
import contextlib
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
|
|
||||||
@@ -58,19 +59,17 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
|||||||
self.externs = ghidra_helpers.get_file_externs()
|
self.externs = ghidra_helpers.get_file_externs()
|
||||||
self.fakes = ghidra_helpers.map_fake_import_addrs()
|
self.fakes = ghidra_helpers.map_fake_import_addrs()
|
||||||
|
|
||||||
|
# Register cleanup to run when the extractor is garbage collected or when the program exits.
|
||||||
|
# We use weakref.finalize instead of __del__ to avoid issues with reference cycles and
|
||||||
|
# to ensure deterministic cleanup on interpreter shutdown.
|
||||||
|
if self.ctx_manager or self.tmpdir:
|
||||||
|
weakref.finalize(self, cleanup, self.ctx_manager, self.tmpdir)
|
||||||
|
|
||||||
def get_base_address(self):
|
def get_base_address(self):
|
||||||
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
import capa.features.extractors.ghidra.helpers as ghidra_helpers
|
||||||
|
|
||||||
return AbsoluteVirtualAddress(ghidra_helpers.get_current_program().getImageBase().getOffset())
|
return AbsoluteVirtualAddress(ghidra_helpers.get_current_program().getImageBase().getOffset())
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
if hasattr(self, "ctx_manager") and self.ctx_manager:
|
|
||||||
with contextlib.suppress(Exception):
|
|
||||||
self.ctx_manager.__exit__(None, None, None)
|
|
||||||
if hasattr(self, "tmpdir") and self.tmpdir:
|
|
||||||
with contextlib.suppress(Exception):
|
|
||||||
self.tmpdir.cleanup()
|
|
||||||
|
|
||||||
def extract_global_features(self):
|
def extract_global_features(self):
|
||||||
yield from self.global_features
|
yield from self.global_features
|
||||||
|
|
||||||
@@ -113,3 +112,12 @@ class GhidraFeatureExtractor(StaticFeatureExtractor):
|
|||||||
|
|
||||||
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
|
def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
|
||||||
yield from capa.features.extractors.ghidra.insn.extract_features(fh, bbh, ih)
|
yield from capa.features.extractors.ghidra.insn.extract_features(fh, bbh, ih)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup(ctx_manager, tmpdir):
|
||||||
|
if ctx_manager:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
ctx_manager.__exit__(None, None, None)
|
||||||
|
if tmpdir:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
tmpdir.cleanup()
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
## Prerequisites
|
## Prerequisites
|
||||||
|
|
||||||
- Ghidra >= 11.4 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
|
- Ghidra >= 12.0 must be installed and available to PyGhidra (e.g. set `GHIDRA_INSTALL_DIR` environment variable)
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
|
|||||||
@@ -90,11 +90,11 @@ def is_supported_ghidra_version():
|
|||||||
try:
|
try:
|
||||||
# version format example: "11.1.2" or "11.4"
|
# version format example: "11.1.2" or "11.4"
|
||||||
major, minor = map(int, version.split(".")[:2])
|
major, minor = map(int, version.split(".")[:2])
|
||||||
if major < 11 or (major == 11 and minor < 4):
|
if major < 12:
|
||||||
logger.error("-" * 80)
|
logger.error("-" * 80)
|
||||||
logger.error(" Ghidra version %s is not supported.", version)
|
logger.error(" Ghidra version %s is not supported.", version)
|
||||||
logger.error(" ")
|
logger.error(" ")
|
||||||
logger.error(" capa requires Ghidra 11.4 or higher.")
|
logger.error(" capa requires Ghidra 12.0 or higher.")
|
||||||
logger.error("-" * 80)
|
logger.error("-" * 80)
|
||||||
return False
|
return False
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|||||||
@@ -368,21 +368,47 @@ def get_extractor(
|
|||||||
|
|
||||||
tmpdir = tempfile.TemporaryDirectory()
|
tmpdir = tempfile.TemporaryDirectory()
|
||||||
|
|
||||||
# PyGhidra's open_program returns a context manager.
|
project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)
|
||||||
# We manually enter it here and pass it to the extractor, which will exit it when done.
|
project = project_cm.__enter__()
|
||||||
cm = pyghidra.open_program(str(input_path), project_location=tmpdir.name)
|
|
||||||
flat_api = cm.__enter__()
|
|
||||||
try:
|
try:
|
||||||
from ghidra.util.task import TaskMonitor
|
from ghidra.util.task import TaskMonitor
|
||||||
|
|
||||||
monitor = TaskMonitor.DUMMY
|
monitor = TaskMonitor.DUMMY
|
||||||
program = flat_api.getCurrentProgram()
|
|
||||||
|
# Import file
|
||||||
|
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
|
||||||
|
with loader.load() as load_results:
|
||||||
|
load_results.save(monitor)
|
||||||
|
|
||||||
|
# Open program
|
||||||
|
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)
|
||||||
|
|
||||||
|
# Analyze
|
||||||
|
pyghidra.analyze(program, monitor)
|
||||||
|
|
||||||
|
from ghidra.program.flatapi import FlatProgramAPI
|
||||||
|
|
||||||
|
flat_api = FlatProgramAPI(program)
|
||||||
|
|
||||||
import capa.features.extractors.ghidra.context as ghidra_context
|
import capa.features.extractors.ghidra.context as ghidra_context
|
||||||
|
|
||||||
ghidra_context.set_context(program, flat_api, monitor)
|
ghidra_context.set_context(program, flat_api, monitor)
|
||||||
|
|
||||||
|
# Wrapper to handle cleanup of program (consumer) and project
|
||||||
|
class GhidraContextWrapper:
|
||||||
|
def __init__(self, project_cm, program, consumer):
|
||||||
|
self.project_cm = project_cm
|
||||||
|
self.program = program
|
||||||
|
self.consumer = consumer
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.program.release(self.consumer)
|
||||||
|
self.project_cm.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
|
||||||
|
cm = GhidraContextWrapper(project_cm, program, consumer)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
cm.__exit__(None, None, None)
|
project_cm.__exit__(None, None, None)
|
||||||
tmpdir.cleanup()
|
tmpdir.cleanup()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
@@ -25,17 +25,3 @@ from fixtures import _692f_dotnetfile_extractor # noqa: F401 [imported but unus
|
|||||||
from fixtures import _1c444_dotnetfile_extractor # noqa: F401 [imported but unused]
|
from fixtures import _1c444_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||||
from fixtures import _039a6_dotnetfile_extractor # noqa: F401 [imported but unused]
|
from fixtures import _039a6_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||||
from fixtures import _0953c_dotnetfile_extractor # noqa: F401 [imported but unused]
|
from fixtures import _0953c_dotnetfile_extractor # noqa: F401 [imported but unused]
|
||||||
|
|
||||||
|
|
||||||
def pytest_sessionfinish(session, exitstatus):
|
|
||||||
try:
|
|
||||||
import pyghidra
|
|
||||||
|
|
||||||
if pyghidra.started():
|
|
||||||
import os
|
|
||||||
|
|
||||||
# If PyGhidra was started, we might need to force exit to prevent hangs
|
|
||||||
# due to non-daemon JVM threads.
|
|
||||||
os._exit(exitstatus)
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import os
|
||||||
import importlib.util
|
import importlib.util
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -18,7 +19,7 @@ import fixtures
|
|||||||
|
|
||||||
import capa.features.common
|
import capa.features.common
|
||||||
|
|
||||||
ghidra_present = importlib.util.find_spec("pyghidra") is not None
|
ghidra_present = importlib.util.find_spec("pyghidra") is not None and "GHIDRA_INSTALL_DIR" in os.environ
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(ghidra_present is False, reason="PyGhidra not installed")
|
@pytest.mark.skipif(ghidra_present is False, reason="PyGhidra not installed")
|
||||||
|
|||||||
Reference in New Issue
Block a user