Add a Feature Extractor for the Drakvuf Sandbox (#2143)

* initial commit

* update changelog

* Update CHANGELOG.md

* Update pyproject.toml

* Apply suggestions from code review: Typos

Co-authored-by: Vasco Schiavo <115561717+VascoSch92@users.noreply.github.com>

* capa/helpers.py: update if/else statement

Co-authored-by: Vasco Schiavo <115561717+VascoSch92@users.noreply.github.com>

* loader.py: replace print() statement with log.info()

* Update capa/features/extractors/drakvuf/models.py

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>

* extractors/drakvuf/call.py: yield arguments right to left

* extractors/drakvuf/file.py: add a TODO comment for extracting more file features

* extractors/drakvuf/global_.py: add arch extraction

* extractors/drakvuf/helpers.py: ignore null pids

* capa/helpers.py: mention msgspec.json explicitely

* capa/helpers.py: generalize empty sandbox reports error logging

* capa/loader.py: log jsonl garbage collection into debug

* features/extractors/drakvuf/models.py: add documentation for SystemCall class

* capa/main.py: fix erroneous imports

* drakvuf extractor: fixed faulty type annotations

* fix black formatting

* fix flake8 issues

* drakvuf file extraction: add link to tracking issue

* drakvuf reports: add the ability to read gzip-compressed report files

* capa/helpers.py: fix mypy issues

* apply review comments

* drakvuf/helpers.py: add more information about null pid

* drakvuf/file.py: remove discovered_dlls file strings extraction

* capa/helpers.py: add comments for the dynamic extensions

* capa/helpers.py: log bad lines

* capa/helpers.py: add gzip support for reading one jsonl line

* drakvuf/helpers.py: add comment for sort_calls()

* tests/fixtures.py: add TODO for unifying CAPE and Drakvuf tests

* drakvuf/models.py: add TODO comment for supporting more drakvuf plugins

* tests/fixtures.py: remove obsolete file strings tests

* Update capa/main.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update capa/features/extractors/drakvuf/models.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update capa/features/extractors/drakvuf/models.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update capa/features/extractors/drakvuf/call.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update CHANGELOG.md

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update capa/features/extractors/drakvuf/helpers.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* review comments

* Update capa/features/extractors/drakvuf/extractor.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* Update capa/features/extractors/drakvuf/models.py

Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>

* styling

* drakvuf/extractor.py: black linting

* drakvuf/models.py: remove need to empty report checking

* tests: add drakvuf models test

* Update capa/features/extractors/drakvuf/global_.py

Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>

* Update tests/test_cape_features.py

Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>

* Update capa/features/extractors/drakvuf/models.py

Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>

* Apply suggestions from code review: rename Drakvuf to DRAKVUF

Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>

* drakvuf/call.py: use int(..., 0) instead of str_to_number()

* remove str_to_number

* drakvuf/call.py: yield argument memory address value as well

* Update call.py: remove verbosity in yield statement

* Update call.py: yield missing address as well

* drakvuf/call.py: yield entire argument string only

* update readme.md

* Update README.md: typo

* Update CHANGELOG.md

Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>

---------

Co-authored-by: Vasco Schiavo <115561717+VascoSch92@users.noreply.github.com>
Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>
Co-authored-by: msm-cert <156842376+msm-cert@users.noreply.github.com>
This commit is contained in:
Yacine
2024-07-24 13:22:21 +01:00
committed by GitHub
parent e2e84f7f50
commit cf3494d427
20 changed files with 843 additions and 94 deletions

View File

@@ -3,6 +3,7 @@
## master (unreleased)
### New Features
- support analyzing DRAKVUF traces #2143 @yelhamer
### Breaking Changes

View File

@@ -126,8 +126,10 @@ function @ 0x4011C0
...
```
Additionally, capa also supports analyzing [CAPE](https://github.com/kevoreilly/CAPEv2) sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to CAPE for analysis, and then run capa against the generated report (JSON).
Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file.
Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it.
Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary:

View File

@@ -461,6 +461,7 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_DRAKVUF = "drakvuf"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
@@ -474,6 +475,7 @@ STATIC_FORMATS = {
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_DRAKVUF,
FORMAT_FREEZE,
FORMAT_RESULT,
}

View File

@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
from capa.features.extractors.drakvuf.models import Call
logger = logging.getLogger(__name__)
def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
"""
This method extracts the given call's features (such as API name and arguments),
and returns them as API, Number, and String features.
args:
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
ch: call handle (for defining the extraction scope)
yields:
Feature, address; where Feature is either: API, Number, or String.
"""
call: Call = ch.inner
# list similar to disassembly: arguments right-to-left, call
for arg_value in reversed(call.arguments.values()):
try:
yield Number(int(arg_value, 0)), ch.address
except ValueError:
# DRAKVUF automatically resolves the contents of memory addresses, (e.g. Arg1="0xc6f217efe0:\"ntdll.dll\"").
# For those cases we yield the entire string as it, since yielding the address only would
# likely not provide any matches, and yielding just the memory contentswould probably be misleading,
# but yielding the entire string would be helpful for an analyst looking at the verbose output
yield String(arg_value), ch.address
yield API(call.name), ch.address
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(ph, th, ch):
yield feature, addr
CALL_HANDLERS = (extract_call_features,)

View File

@@ -0,0 +1,96 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Union, Iterator
import capa.features.extractors.drakvuf.call
import capa.features.extractors.drakvuf.file
import capa.features.extractors.drakvuf.thread
import capa.features.extractors.drakvuf.global_
import capa.features.extractors.drakvuf.process
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)
from capa.features.extractors.drakvuf.models import Call, DrakvufReport
from capa.features.extractors.drakvuf.helpers import index_calls
logger = logging.getLogger(__name__)
class DrakvufExtractor(DynamicFeatureExtractor):
def __init__(self, report: DrakvufReport):
super().__init__(
# DRAKVUF currently does not yield hash information about the sample in its output
hashes=SampleHashes(md5="", sha1="", sha256="")
)
self.report: DrakvufReport = report
# sort the api calls to prevent going through the entire list each time
self.sorted_calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = index_calls(report)
# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.drakvuf.global_.extract_features(self.report))
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# DRAKVUF currently does not yield information about the PE's address
return NO_ADDRESS
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.file.extract_features(self.report)
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.drakvuf.file.get_processes(self.sorted_calls)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.process.extract_features(ph)
def get_process_name(self, ph: ProcessHandle) -> str:
return ph.inner["process_name"]
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.drakvuf.process.get_threads(self.sorted_calls, ph)
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.drakvuf.thread.get_calls(self.sorted_calls, ph, th)
def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> str:
call: Call = ch.inner
call_name = "{}({}){}".format(
call.name,
", ".join(f"{arg_name}={arg_value}" for arg_name, arg_value in call.arguments.items()),
(f" -> {getattr(call, 'return_value', '')}"), # SysCalls don't have a return value, while WinApi calls do
)
return call_name
def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.call.extract_features(ph, th, ch)
@classmethod
def from_report(cls, report: Iterator[Dict]) -> "DrakvufExtractor":
dr = DrakvufReport.from_raw_report(report)
return DrakvufExtractor(report=dr)

View File

@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Iterator
from capa.features.file import Import
from capa.features.common import Feature
from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle
from capa.features.extractors.drakvuf.models import Call, DrakvufReport
logger = logging.getLogger(__name__)
def get_processes(calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]) -> Iterator[ProcessHandle]:
"""
Get all the created processes for a sample.
"""
for proc_addr, calls_per_thread in calls.items():
sample_call = next(iter(calls_per_thread.values()))[0] # get process name
yield ProcessHandle(proc_addr, inner={"process_name": sample_call.process_name})
def extract_import_names(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
"""
Extract imported function names.
"""
if report.loaded_dlls is None:
return
dlls = report.loaded_dlls
for dll in dlls:
dll_base_name = dll.name.split("\\")[-1]
for function_name, function_address in dll.imports.items():
for name in generate_symbols(dll_base_name, function_name, include_dll=True):
yield Import(name), AbsoluteVirtualAddress(function_address)
def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(report):
yield feature, addr
FILE_HANDLERS = (
# TODO(yelhamer): extract more file features from other DRAKVUF plugins
# https://github.com/mandiant/capa/issues/2169
extract_import_names,
)

View File

@@ -0,0 +1,44 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.common import OS, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.drakvuf.models import DrakvufReport
logger = logging.getLogger(__name__)
def extract_format(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Format(FORMAT_PE), NO_ADDRESS
def extract_os(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only PE files: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield OS(OS_WINDOWS), NO_ADDRESS
def extract_arch(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only x64 Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Arch(ARCH_AMD64), NO_ADDRESS
def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(report):
yield feature, addr
GLOBAL_HANDLER = (
extract_format,
extract_os,
extract_arch,
)

View File

@@ -0,0 +1,39 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import itertools
from typing import Dict, List
from capa.features.address import ThreadAddress, ProcessAddress
from capa.features.extractors.drakvuf.models import Call, DrakvufReport
def index_calls(report: DrakvufReport) -> Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]:
# this method organizes calls into processes and threads, and then sorts them based on
# timestamp so that we can address individual calls per index (CallAddress requires call index)
result: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = {}
for call in itertools.chain(report.syscalls, report.apicalls):
if call.pid == 0:
# DRAKVUF captures api/native calls from all processes running on the system.
# we ignore the pid 0 since it's a system process and it's unlikely for it to
# be hijacked or so on, in addition to capa addresses not supporting null pids
continue
proc_addr = ProcessAddress(pid=call.pid, ppid=call.ppid)
thread_addr = ThreadAddress(process=proc_addr, tid=call.tid)
if proc_addr not in result:
result[proc_addr] = {}
if thread_addr not in result[proc_addr]:
result[proc_addr][thread_addr] = []
result[proc_addr][thread_addr].append(call)
for proc, threads in result.items():
for thread in threads:
result[proc][thread].sort(key=lambda call: call.timestamp)
return result

View File

@@ -0,0 +1,137 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Iterator
from pydantic import Field, BaseModel, ConfigDict, model_validator
logger = logging.getLogger(__name__)
REQUIRED_SYSCALL_FIELD_NAMES = {
"Plugin",
"TimeStamp",
"PID",
"PPID",
"TID",
"UserName",
"UserId",
"ProcessName",
"Method",
"EventUID",
"Module",
"vCPU",
"CR3",
"Syscall",
"NArgs",
}
class ConciseModel(BaseModel):
ConfigDict(extra="ignore")
class DiscoveredDLL(ConciseModel):
plugin_name: str = Field(alias="Plugin")
event: str = Field(alias="Event")
name: str = Field(alias="DllName")
pid: int = Field(alias="PID")
class LoadedDLL(ConciseModel):
plugin_name: str = Field(alias="Plugin")
event: str = Field(alias="Event")
name: str = Field(alias="DllName")
imports: Dict[str, int] = Field(alias="Rva")
class Call(ConciseModel):
plugin_name: str = Field(alias="Plugin")
timestamp: str = Field(alias="TimeStamp")
process_name: str = Field(alias="ProcessName")
ppid: int = Field(alias="PPID")
pid: int = Field(alias="PID")
tid: int = Field(alias="TID")
name: str = Field(alias="Method")
arguments: Dict[str, str]
class WinApiCall(Call):
# This class models Windows API calls captured by DRAKVUF (DLLs, etc.).
arguments: Dict[str, str] = Field(alias="Arguments")
event: str = Field(alias="Event")
return_value: str = Field(alias="ReturnValue")
@model_validator(mode="before")
@classmethod
def build_arguments(cls, values: Dict[str, Any]) -> Dict[str, Any]:
args = values["Arguments"]
values["Arguments"] = dict(arg.split("=", 1) for arg in args)
return values
class SystemCall(Call):
# This class models native Windows API calls captured by DRAKVUF.
# Schema: {
# "Plugin": "syscall",
# "TimeStamp": "1716999134.582553",
# "PID": 3888, "PPID": 2852, "TID": 368, "UserName": "SessionID", "UserId": 2,
# "ProcessName": "\\Device\\HarddiskVolume2\\Windows\\explorer.exe",
# "Method": "NtSetIoCompletionEx",
# "EventUID": "0x27",
# "Module": "nt",
# "vCPU": 0,
# "CR3": "0x119b1002",
# "Syscall": 419,
# "NArgs": 6,
# "IoCompletionHandle": "0xffffffff80001ac0", "IoCompletionReserveHandle": "0xffffffff8000188c",
# "KeyContext": "0x0", "ApcContext": "0x2", "IoStatus": "0x7ffb00000000", "IoStatusInformation": "0x0"
# }
# The keys up until "NArgs" are common to all the native calls that DRAKVUF reports, with
# the remaining keys representing the call's specific arguments.
syscall_number: int = Field(alias="Syscall")
module: str = Field(alias="Module")
nargs: int = Field(alias="NArgs")
@model_validator(mode="before")
@classmethod
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
# DRAKVUF stores argument names and values as entries in the syscall's entry.
# This model validator collects those arguments into a list in the model.
values["arguments"] = {
name: value for name, value in values.items() if name not in REQUIRED_SYSCALL_FIELD_NAMES
}
return values
class DrakvufReport(ConciseModel):
syscalls: List[SystemCall] = []
apicalls: List[WinApiCall] = []
discovered_dlls: List[DiscoveredDLL] = []
loaded_dlls: List[LoadedDLL] = []
@classmethod
def from_raw_report(cls, entries: Iterator[Dict]) -> "DrakvufReport":
report = cls()
for entry in entries:
plugin = entry.get("Plugin")
# TODO(yelhamer): add support for more DRAKVUF plugins
# https://github.com/mandiant/capa/issues/2181
if plugin == "syscall":
report.syscalls.append(SystemCall(**entry))
elif plugin == "apimon":
event = entry.get("Event")
if event == "api_called":
report.apicalls.append(WinApiCall(**entry))
elif event == "dll_loaded":
report.loaded_dlls.append(LoadedDLL(**entry))
elif event == "dll_discovered":
report.discovered_dlls.append(DiscoveredDLL(**entry))
return report

View File

@@ -0,0 +1,40 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Iterator
from capa.features.common import String, Feature
from capa.features.address import Address, ThreadAddress, ProcessAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
from capa.features.extractors.drakvuf.models import Call
logger = logging.getLogger(__name__)
def get_threads(
calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]], ph: ProcessHandle
) -> Iterator[ThreadHandle]:
"""
Get the threads associated with a given process.
"""
for thread_addr in calls[ph.address]:
yield ThreadHandle(address=thread_addr, inner={})
def extract_process_name(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield String(ph.inner["process_name"]), ph.address
def extract_features(ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in PROCESS_HANDLERS:
for feature, addr in handler(ph):
yield feature, addr
PROCESS_HANDLERS = (extract_process_name,)

View File

@@ -0,0 +1,24 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Iterator
from capa.features.address import ThreadAddress, ProcessAddress, DynamicCallAddress
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
from capa.features.extractors.drakvuf.models import Call
logger = logging.getLogger(__name__)
def get_calls(
sorted_calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]], ph: ProcessHandle, th: ThreadHandle
) -> Iterator[CallHandle]:
for i, call in enumerate(sorted_calls[ph.address][th.address]):
call_addr = DynamicCallAddress(thread=th.address, id=i)
yield CallHandle(address=call_addr, inner=call)

View File

@@ -7,15 +7,15 @@
# See the License for the specific language governing permissions and limitations under the License.
import sys
import gzip
import json
import inspect
import logging
import contextlib
import importlib.util
from typing import NoReturn
from typing import Dict, Union, BinaryIO, Iterator, NoReturn
from pathlib import Path
import tqdm
import msgspec.json
from capa.exceptions import UnsupportedFormatError
from capa.features.common import (
@@ -25,13 +25,16 @@ from capa.features.common import (
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_DRAKVUF,
FORMAT_UNKNOWN,
Format,
)
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz")
# CAPE extensions: .json, .json_, .json.gz
# DRAKVUF Sandbox extensions: .log, .log.gz
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", "log", ".log.gz")
EXTENSIONS_ELF = "elf_"
EXTENSIONS_FREEZE = "frz"
@@ -76,13 +79,52 @@ def load_json_from_path(json_path: Path):
try:
report_json = compressed_report.read()
except gzip.BadGzipFile:
report = json.load(json_path.open(encoding="utf-8"))
report = msgspec.json.decode(json_path.read_text(encoding="utf-8"))
else:
report = json.loads(report_json)
report = msgspec.json.decode(report_json)
return report
def decode_json_lines(fd: Union[BinaryIO, gzip.GzipFile]):
for line in fd:
try:
line_s = line.strip().decode()
obj = msgspec.json.decode(line_s)
yield obj
except (msgspec.DecodeError, UnicodeDecodeError):
# sometimes DRAKVUF reports bad method names and/or malformed JSON
logger.debug("bad DRAKVUF log line: %s", line)
def load_jsonl_from_path(jsonl_path: Path) -> Iterator[Dict]:
try:
with gzip.open(jsonl_path, "rb") as fg:
yield from decode_json_lines(fg)
except gzip.BadGzipFile:
with jsonl_path.open(mode="rb") as f:
yield from decode_json_lines(f)
def load_one_jsonl_from_path(jsonl_path: Path):
# this loads one json line to avoid the overhead of loading the entire file
try:
with gzip.open(jsonl_path, "rb") as f:
line = next(iter(f))
except gzip.BadGzipFile:
with jsonl_path.open(mode="rb") as f:
line = next(iter(f))
finally:
line = msgspec.json.decode(line.decode(errors="ignore"))
return line
def get_format_from_report(sample: Path) -> str:
if sample.name.endswith((".log", "log.gz")):
line = load_one_jsonl_from_path(sample)
if "Plugin" in line:
return FORMAT_DRAKVUF
return FORMAT_UNKNOWN
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE
@@ -189,9 +231,20 @@ def log_unsupported_cape_report_error(error: str):
logger.error("-" * 80)
def log_empty_cape_report_error(error: str):
def log_unsupported_drakvuf_report_error(error: str):
logger.error("-" * 80)
logger.error(" CAPE report is empty or only contains little useful data: %s", error)
logger.error(" Input file is not a valid DRAKVUF output file: %s", error)
logger.error(" ")
logger.error(" capa currently only supports analyzing standard DRAKVUF outputs in JSONL format.")
logger.error(
" Please make sure your report file is in the standard format and contains both the static and dynamic sections."
)
logger.error("-" * 80)
def log_empty_sandbox_report_error(error: str, sandbox_name: str):
logger.error("-" * 80)
logger.error(" %s report is empty or only contains little useful data: %s", sandbox_name, error)
logger.error(" ")
logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.")
logger.error("-" * 80)

View File

@@ -45,6 +45,7 @@ from capa.features.common import (
FORMAT_SC32,
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_DRAKVUF,
)
from capa.features.address import Address
from capa.features.extractors.base_extractor import (
@@ -61,6 +62,7 @@ BACKEND_DOTNET = "dotnet"
BACKEND_BINJA = "binja"
BACKEND_PEFILE = "pefile"
BACKEND_CAPE = "cape"
BACKEND_DRAKVUF = "drakvuf"
BACKEND_FREEZE = "freeze"
@@ -210,6 +212,12 @@ def get_extractor(
report = capa.helpers.load_json_from_path(input_path)
return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
elif backend == BACKEND_DRAKVUF:
import capa.features.extractors.drakvuf.extractor
report = capa.helpers.load_jsonl_from_path(input_path)
return capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report)
elif backend == BACKEND_DOTNET:
import capa.features.extractors.dnfile.extractor
@@ -327,6 +335,13 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr
report = capa.helpers.load_json_from_path(input_file)
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
elif input_format == FORMAT_DRAKVUF:
import capa.helpers
import capa.features.extractors.drakvuf.extractor
report = capa.helpers.load_jsonl_from_path(input_file)
file_extractors.append(capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report))
return file_extractors

View File

@@ -42,15 +42,24 @@ import capa.render.result_document as rdoc
import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.loader import BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, BACKEND_DOTNET, BACKEND_FREEZE, BACKEND_PEFILE
from capa.loader import (
BACKEND_VIV,
BACKEND_CAPE,
BACKEND_BINJA,
BACKEND_DOTNET,
BACKEND_FREEZE,
BACKEND_PEFILE,
BACKEND_DRAKVUF,
)
from capa.helpers import (
get_file_taste,
get_auto_format,
log_unsupported_os_error,
log_unsupported_arch_error,
log_empty_cape_report_error,
log_unsupported_format_error,
log_empty_sandbox_report_error,
log_unsupported_cape_report_error,
log_unsupported_drakvuf_report_error,
)
from capa.exceptions import (
EmptyReportError,
@@ -73,6 +82,7 @@ from capa.features.common import (
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
FORMAT_DRAKVUF,
)
from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
@@ -232,6 +242,7 @@ def install_common_args(parser, wanted=None):
(FORMAT_SC32, "32-bit shellcode"),
(FORMAT_SC64, "64-bit shellcode"),
(FORMAT_CAPE, "CAPE sandbox report"),
(FORMAT_DRAKVUF, "DRAKVUF sandbox report"),
(FORMAT_FREEZE, "features previously frozen by capa"),
]
format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats])
@@ -253,6 +264,7 @@ def install_common_args(parser, wanted=None):
(BACKEND_DOTNET, ".NET"),
(BACKEND_FREEZE, "capa freeze"),
(BACKEND_CAPE, "CAPE"),
(BACKEND_DRAKVUF, "DRAKVUF"),
]
backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends])
parser.add_argument(
@@ -505,6 +517,9 @@ def get_backend_from_cli(args, input_format: str) -> str:
if input_format == FORMAT_CAPE:
return BACKEND_CAPE
if input_format == FORMAT_DRAKVUF:
return BACKEND_DRAKVUF
elif input_format == FORMAT_DOTNET:
return BACKEND_DOTNET
@@ -529,7 +544,7 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
if backend == BACKEND_CAPE:
if backend in (BACKEND_CAPE, BACKEND_DRAKVUF):
return None
else:
return args.input_file
@@ -632,12 +647,17 @@ def get_file_extractors_from_cli(args, input_format: str) -> List[FeatureExtract
except UnsupportedFormatError as e:
if input_format == FORMAT_CAPE:
log_unsupported_cape_report_error(str(e))
elif input_format == FORMAT_DRAKVUF:
log_unsupported_drakvuf_report_error(str(e))
else:
log_unsupported_format_error()
raise ShouldExitError(E_INVALID_FILE_TYPE) from e
except EmptyReportError as e:
if input_format == FORMAT_CAPE:
log_empty_cape_report_error(str(e))
log_empty_sandbox_report_error(str(e), sandbox_name="CAPE")
raise ShouldExitError(E_EMPTY_REPORT) from e
elif input_format == FORMAT_DRAKVUF:
log_empty_sandbox_report_error(str(e), sandbox_name="DRAKVUF")
raise ShouldExitError(E_EMPTY_REPORT) from e
else:
log_unsupported_format_error()
@@ -744,6 +764,8 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
except UnsupportedFormatError as e:
if input_format == FORMAT_CAPE:
log_unsupported_cape_report_error(str(e))
elif input_format == FORMAT_DRAKVUF:
log_unsupported_drakvuf_report_error(str(e))
else:
log_unsupported_format_error()
raise ShouldExitError(E_INVALID_FILE_TYPE) from e

View File

@@ -1918,7 +1918,6 @@ class RuleSet:
# This strategy is described here:
# https://github.com/mandiant/capa/issues/2129
if feature_index.string_rules:
# This is a FeatureSet that contains only String features.
# Since we'll only be evaluating String/Regex features below, we don't care about
# other sorts of features (Mnemonic, Number, etc.) and therefore can save some time

View File

@@ -79,6 +79,7 @@ dependencies = [
"rich>=13",
"humanize>=4",
"protobuf>=5",
"msgspec>=0.18.6",
# ---------------------------------------
# Dependencies that we develop

View File

@@ -199,6 +199,16 @@ def get_cape_extractor(path):
return CapeExtractor.from_report(report)
@lru_cache(maxsize=1)
def get_drakvuf_extractor(path):
from capa.helpers import load_jsonl_from_path
from capa.features.extractors.drakvuf.extractor import DrakvufExtractor
report = load_jsonl_from_path(path)
return DrakvufExtractor.from_report(report)
@lru_cache(maxsize=1)
def get_ghidra_extractor(path: Path):
import capa.features.extractors.ghidra.extractor
@@ -385,6 +395,14 @@ def get_data_path_by_name(name) -> Path:
/ "v2.2"
/ "d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json.gz"
)
elif name.startswith("93b2d1"):
return (
CD
/ "data"
/ "dynamic"
/ "drakvuf"
/ "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795.log.gz"
)
elif name.startswith("ea2876"):
return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_"
elif name.startswith("1038a2"):
@@ -680,84 +698,6 @@ def parametrize(params, values, **kwargs):
return pytest.mark.parametrize(params, values, ids=ids, **kwargs)
DYNAMIC_FEATURE_PRESENCE_TESTS = sorted(
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), True),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), True),
("0000a657", "file", capa.features.common.String("nope"), False),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), True),
("0000a657", "file", capa.features.file.Section(".nope"), False),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), True),
("0000a657", "file", capa.features.file.Import("Nope"), False),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), False),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
True,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), False),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), False),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), False),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), False),
("0000a657", "process=(2852:3052),thread=2804,call=56", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804,call=1958", capa.features.insn.API("nope"), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_FEATURE_COUNT_TESTS = sorted(
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), 1),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), 1),
("0000a657", "file", capa.features.common.String("nope"), 0),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), 1),
("0000a657", "file", capa.features.file.Section(".nope"), 0),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), 1),
("0000a657", "file", capa.features.file.Import("Nope"), 0),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), 0),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
2,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 7),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), 0),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), 0),
("0000a657", "process=(2852:3052),thread=2804,call=56", capa.features.insn.API("NtQueryValueKey"), 1),
("0000a657", "process=(2852:3052),thread=2804,call=1958", capa.features.insn.API("nope"), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
FEATURE_PRESENCE_TESTS = sorted(
[
# file/characteristic("embedded pe")

View File

@@ -8,10 +8,96 @@
import fixtures
import capa.main
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
DYNAMIC_CAPE_FEATURE_PRESENCE_TESTS = sorted(
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), True),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), True),
("0000a657", "file", capa.features.common.String("nope"), False),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), True),
("0000a657", "file", capa.features.file.Section(".nope"), False),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), True),
("0000a657", "file", capa.features.file.Import("Nope"), False),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), False),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
True,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), False),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), False),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), False),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), False),
("0000a657", "process=(2852:3052),thread=2804,call=56", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804,call=1958", capa.features.insn.API("nope"), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_CAPE_FEATURE_COUNT_TESTS = sorted(
# TODO(yelhamer): use the same sample for testing CAPE and DRAKVUF extractors
# https://github.com/mandiant/capa/issues/2180
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), 1),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), 1),
("0000a657", "file", capa.features.common.String("nope"), 0),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), 1),
("0000a657", "file", capa.features.file.Section(".nope"), 0),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), 1),
("0000a657", "file", capa.features.file.Import("Nope"), 0),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), 0),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
2,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 7),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), 0),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), 0),
("0000a657", "process=(2852:3052),thread=2804,call=56", capa.features.insn.API("NtQueryValueKey"), 1),
("0000a657", "process=(2852:3052),thread=2804,call=1958", capa.features.insn.API("nope"), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_PRESENCE_TESTS,
DYNAMIC_CAPE_FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_cape_features(sample, scope, feature, expected):
@@ -20,7 +106,7 @@ def test_cape_features(sample, scope, feature, expected):
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_COUNT_TESTS,
DYNAMIC_CAPE_FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_cape_feature_counts(sample, scope, feature, expected):

View File

@@ -0,0 +1,88 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import fixtures
import capa.main
import capa.features.file
import capa.features.insn
import capa.features.common
DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS = sorted(
[
("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), True),
# thread/api calls
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False),
# call/string argument
(
"93b2d1",
"process=(3564:4852),thread=6592,call=1",
capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'),
True,
),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), False),
# call/number argument
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), True),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_DRAKVUF_FEATURE_COUNT_TESTS = sorted(
[
("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), 1),
# thread/api calls
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), 9),
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), 1),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), 0),
# call/string argument
(
"93b2d1",
"process=(3564:4852),thread=6592,call=1",
capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'),
1,
),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), 0),
# call/number argument
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), 1),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
@fixtures.parametrize(
"sample,scope,feature,expected",
DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_drakvuf_features(sample, scope, feature, expected):
fixtures.do_test_feature_presence(fixtures.get_drakvuf_extractor, sample, scope, feature, expected)
@fixtures.parametrize(
"sample,scope,feature,expected",
DYNAMIC_DRAKVUF_FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_drakvuf_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_drakvuf_extractor, sample, scope, feature, expected)

View File

@@ -0,0 +1,48 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
from capa.features.extractors.drakvuf.models import SystemCall
def test_syscall_argument_construction():
call_dictionary = json.loads(
r"""
{
"Plugin": "syscall",
"TimeStamp": "1716999134.581449",
"PID": 3888,
"PPID": 2852,
"TID": 368,
"UserName": "SessionID",
"UserId": 2,
"ProcessName": "\\Device\\HarddiskVolume2\\Windows\\explorer.exe",
"Method": "NtRemoveIoCompletionEx",
"EventUID": "0x1f",
"Module": "nt",
"vCPU": 0,
"CR3": "0x119b1002",
"Syscall": 369,
"NArgs": 6,
"IoCompletionHandle": "0xffffffff80001ac0",
"IoCompletionInformation": "0xfffff506a0284898",
"Count": "0x1",
"NumEntriesRemoved": "0xfffff506a02846bc",
"Timeout": "0xfffff506a02846d8",
"Alertable": "0x0"
}
"""
)
call = SystemCall(**call_dictionary)
assert len(call.arguments) == call.nargs
assert call.arguments["IoCompletionHandle"] == "0xffffffff80001ac0"
assert call.arguments["IoCompletionInformation"] == "0xfffff506a0284898"
assert call.arguments["Count"] == "0x1"
assert call.arguments["NumEntriesRemoved"] == "0xfffff506a02846bc"
assert call.arguments["Timeout"] == "0xfffff506a02846d8"
assert call.arguments["Alertable"] == "0x0"