Merge pull request #2208 from mandiant/vmray-extractor

dynamic: add extractor for VMRay dynamic sandbox traces
This commit is contained in:
Moritz
2024-08-27 12:11:36 +02:00
committed by GitHub
23 changed files with 1300 additions and 71 deletions

View File

@@ -2,12 +2,15 @@
## master (unreleased)
Unlock powerful malware analysis with capa's new [VMRay sandbox](https://www.vmray.com/) integration! Simply provide a VMRay analysis archive, and capa will automatically extract and match capabilties, streamlining your workflow.
### New Features
- regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff
- add landing page https://mandiant.github.io/capa/ @williballenthin #2310
- add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310
- add .justfile @williballenthin #2325
- dynamic: add support for VMRay dynamic sandbox traces #2208 @mike-hunhoff @r-sm2024 @mr-tz
### Breaking Changes

View File

@@ -150,13 +150,15 @@ function @ 0x4011C0
...
```
## analyzing sandbox reports
Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file.
capa also supports dynamic capabilities detection for multiple sandboxes including:
* [CAPE](https://github.com/kevoreilly/CAPEv2) (supported report formats: `.json`, `.json_`, `.json.gz`)
* [DRAKVUF](https://github.com/CERT-Polska/drakvuf-sandbox/) (supported report formats: `.log`, `.log.gz`)
* [VMRay](https://www.vmray.com/) (supported report formats: analysis archive `.zip`)
Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it.
Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary:
To use this feature, submit your file to a supported sandbox and then download and run capa against the generated report file. This feature enables capa to match capabilities against dynamic and static features that the sandbox captured during execution.
Here's an example of running capa against a packed file, and then running capa against the CAPE report generated for the same packed file:
```yaml
$ capa 05be49819139a3fdcdbddbdefd298398779521f3d68daa25275cc77508e42310.exe

View File

@@ -462,6 +462,7 @@ FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_DRAKVUF = "drakvuf"
FORMAT_VMRAY = "vmray"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
@@ -476,6 +477,7 @@ STATIC_FORMATS = {
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_DRAKVUF,
FORMAT_VMRAY,
FORMAT_FREEZE,
FORMAT_RESULT,
}

View File

@@ -0,0 +1,161 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Optional
from pathlib import Path
from zipfile import ZipFile
from collections import defaultdict
from capa.exceptions import UnsupportedFormatError
from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall, xml_to_dict
logger = logging.getLogger(__name__)
DEFAULT_ARCHIVE_PASSWORD = b"infected"
SUPPORTED_FLOG_VERSIONS = ("2",)
class VMRayAnalysis:
def __init__(self, zipfile_path: Path):
self.zipfile = ZipFile(zipfile_path, "r")
# summary_v2.json is the entry point to the entire VMRay archive and
# we use its data to find everything else that we need for capa
self.sv2 = SummaryV2.model_validate_json(
self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
)
self.file_type: str = self.sv2.analysis_metadata.sample_type
# flog.xml contains all of the call information that VMRay captured during execution
flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
flog_dict = xml_to_dict(flog_xml)
self.flog = Flog.model_validate(flog_dict)
if self.flog.analysis.log_version not in SUPPORTED_FLOG_VERSIONS:
raise UnsupportedFormatError(
"VMRay feature extractor does not support flog version %s" % self.flog.analysis.log_version
)
self.exports: Dict[int, str] = {}
self.imports: Dict[int, Tuple[str, str]] = {}
self.sections: Dict[int, str] = {}
self.process_ids: Dict[int, int] = {}
self.process_threads: Dict[int, List[int]] = defaultdict(list)
self.process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
self.base_address: int
self.sample_file_name: Optional[str] = None
self.sample_file_analysis: Optional[File] = None
self.sample_file_static_data: Optional[StaticData] = None
self._find_sample_file()
# VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data
# we can expect to find in the archive, so to be explicit we check for the various pieces that we need at
# minimum to run capa analysis
if self.sample_file_name is None or self.sample_file_analysis is None:
raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type)
if not self.sample_file_static_data:
raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type)
if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf:
raise UnsupportedFormatError(
"VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type
)
# VMRay does not store static strings for the sample file so we must use the source file
# stored in the archive
sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower()
sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}"
logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path)
self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
self._compute_base_address()
self._compute_imports()
self._compute_exports()
self._compute_sections()
self._compute_process_ids()
self._compute_process_threads()
self._compute_process_calls()
def _find_sample_file(self):
for file_name, file_analysis in self.sv2.files.items():
if file_analysis.is_sample:
# target the sample submitted for analysis
self.sample_file_name = file_name
self.sample_file_analysis = file_analysis
if file_analysis.ref_static_data:
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
# key for the file's static data
self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]]
break
def _compute_base_address(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
self.base_address = self.sample_file_static_data.pe.basic_info.image_base
def _compute_exports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for export in self.sample_file_static_data.pe.exports:
self.exports[export.address] = export.api.name
def _compute_imports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for module in self.sample_file_static_data.pe.imports:
for api in module.apis:
self.imports[api.address] = (module.dll, api.api.name)
def _compute_sections(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for pefile_section in self.sample_file_static_data.pe.sections:
self.sections[pefile_section.virtual_address] = pefile_section.name
elif self.sample_file_static_data.elf:
for elffile_section in self.sample_file_static_data.elf.sections:
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name
def _compute_process_ids(self):
for process in self.sv2.processes.values():
# we expect VMRay's monitor IDs to be unique, but OS PIDs may be reused
assert process.monitor_id not in self.process_ids.keys()
self.process_ids[process.monitor_id] = process.os_pid
def _compute_process_threads(self):
# logs/flog.xml appears to be the only file that contains thread-related data
# so we use it here to map processes to threads
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id
assert isinstance(pid, int)
assert isinstance(tid, int)
if tid not in self.process_threads[pid]:
self.process_threads[pid].append(tid)
def _compute_process_calls(self):
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id
assert isinstance(pid, int)
assert isinstance(tid, int)
self.process_calls[pid][tid].append(function_call)
def get_process_os_pid(self, monitor_id: int) -> int:
return self.process_ids[monitor_id]

View File

@@ -0,0 +1,53 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.vmray.models import PARAM_TYPE_INT, PARAM_TYPE_STR, Param, FunctionCall, hexint
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def get_call_param_features(param: Param, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
if param.deref is not None:
# pointer types contain a special "deref" member that stores the deref'd value
# so we check for this first and ignore Param.value as this always contains the
# deref'd pointer value
if param.deref.value is not None:
if param.deref.type_ in PARAM_TYPE_INT:
yield Number(hexint(param.deref.value)), ch.address
elif param.deref.type_ in PARAM_TYPE_STR:
yield String(param.deref.value), ch.address
else:
logger.debug("skipping deref param type %s", param.deref.type_)
elif param.value is not None:
if param.type_ in PARAM_TYPE_INT:
yield Number(hexint(param.value)), ch.address
def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
call: FunctionCall = ch.inner
if call.params_in:
for param in call.params_in.params:
yield from get_call_param_features(param, ch)
yield API(call.name), ch.address
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(ph, th, ch):
yield feature, addr
CALL_HANDLERS = (extract_call_features,)

View File

@@ -0,0 +1,122 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple, Iterator
from pathlib import Path
import capa.helpers
import capa.features.extractors.vmray.call
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, DynamicCallAddress, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import PARAM_TYPE_STR, Process, ParamList, FunctionCall
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)
def get_formatted_params(params: ParamList) -> List[str]:
params_list: List[str] = []
for param in params:
if param.deref and param.deref.value is not None:
deref_value: str = f'"{param.deref.value}"' if param.deref.type_ in PARAM_TYPE_STR else param.deref.value
params_list.append(f"{param.name}: {deref_value}")
else:
value: str = "" if param.value is None else param.value
params_list.append(f"{param.name}: {value}")
return params_list
class VMRayExtractor(DynamicFeatureExtractor):
def __init__(self, analysis: VMRayAnalysis):
assert analysis.sample_file_analysis is not None
super().__init__(
hashes=SampleHashes(
md5=analysis.sample_file_analysis.hash_values.md5.lower(),
sha1=analysis.sample_file_analysis.hash_values.sha1.lower(),
sha256=analysis.sample_file_analysis.hash_values.sha256.lower(),
)
)
self.analysis = analysis
# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis))
def get_base_address(self) -> Address:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.analysis.base_address)
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.vmray.file.get_processes(self.analysis)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
# we have not identified process-specific features for VMRay yet
yield from []
def get_process_name(self, ph) -> str:
process: Process = ph.inner
return process.image_name
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
for thread in self.analysis.process_threads[ph.address.pid]:
address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread)
yield ThreadHandle(address=address, inner={})
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
for function_call in self.analysis.process_calls[ph.address.pid][th.address.tid]:
addr = DynamicCallAddress(thread=th.address, id=function_call.fncall_id)
yield CallHandle(address=addr, inner=function_call)
def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.call.extract_features(ph, th, ch)
def get_call_name(self, ph, th, ch) -> str:
call: FunctionCall = ch.inner
call_formatted: str = call.name
# format input parameters
if call.params_in:
call_formatted += f"({', '.join(get_formatted_params(call.params_in.params))})"
else:
call_formatted += "()"
# format output parameters
if call.params_out:
call_formatted += f" -> {', '.join(get_formatted_params(call.params_out.params))}"
return call_formatted
@classmethod
def from_zipfile(cls, zipfile_path: Path):
return cls(VMRayAnalysis(zipfile_path))

View File

@@ -0,0 +1,101 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
import capa.features.extractors.common
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.vmray.models import Process
from capa.features.extractors.base_extractor import ProcessHandle
logger = logging.getLogger(__name__)
def get_processes(analysis: VMRayAnalysis) -> Iterator[ProcessHandle]:
processes: Dict[str, Process] = analysis.sv2.processes
for process in processes.values():
# we map VMRay's monitor ID to the OS PID to make it easier for users
# to follow the processes in capa's output
pid: int = analysis.get_process_os_pid(process.monitor_id)
ppid: int = (
analysis.get_process_os_pid(processes[process.ref_parent_process.path[1]].monitor_id)
if process.ref_parent_process
else 0
)
addr: ProcessAddress = ProcessAddress(pid=pid, ppid=ppid)
yield ProcessHandle(address=addr, inner=process)
def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.exports.items():
yield Export(name), AbsoluteVirtualAddress(addr)
def extract_import_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, (module, api) in analysis.imports.items():
for symbol in generate_symbols(module, api, include_dll=True):
yield Import(symbol), AbsoluteVirtualAddress(addr)
def extract_section_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.sections.items():
yield Section(name), AbsoluteVirtualAddress(addr)
def extract_referenced_filenames(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for filename in analysis.sv2.filenames.values():
yield String(filename.filename), NO_ADDRESS
def extract_referenced_mutex_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for mutex in analysis.sv2.mutexes.values():
yield String(mutex.name), NO_ADDRESS
def extract_referenced_domain_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for domain in analysis.sv2.domains.values():
yield String(domain.domain), NO_ADDRESS
def extract_referenced_ip_addresses(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for ip_address in analysis.sv2.ip_addresses.values():
yield String(ip_address.ip_address), NO_ADDRESS
def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for registry_record in analysis.sv2.registry_records.values():
yield String(registry_record.reg_key_name), NO_ADDRESS
def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf)
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(analysis):
yield feature, addr
FILE_HANDLERS = (
extract_import_names,
extract_export_names,
extract_section_names,
extract_referenced_filenames,
extract_referenced_mutex_names,
extract_referenced_domain_names,
extract_referenced_ip_addresses,
extract_referenced_registry_key_names,
extract_file_strings,
)

View File

@@ -0,0 +1,72 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.common import (
OS,
OS_LINUX,
ARCH_I386,
FORMAT_PE,
ARCH_AMD64,
FORMAT_ELF,
OS_WINDOWS,
Arch,
Format,
Feature,
)
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.vmray import VMRayAnalysis
logger = logging.getLogger(__name__)
def extract_arch(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
file_type: str = analysis.file_type
if "x86-32" in file_type:
yield Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in file_type:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
raise ValueError("unrecognized arch from the VMRay report: %s" % file_type)
def extract_format(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
assert analysis.sample_file_static_data is not None
if analysis.sample_file_static_data.pe:
yield Format(FORMAT_PE), NO_ADDRESS
elif analysis.sample_file_static_data.elf:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
raise ValueError("unrecognized file format from the VMRay report: %s" % analysis.file_type)
def extract_os(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
file_type: str = analysis.file_type
if "windows" in file_type.lower():
yield OS(OS_WINDOWS), NO_ADDRESS
elif "linux" in file_type.lower():
yield OS(OS_LINUX), NO_ADDRESS
else:
raise ValueError("unrecognized OS from the VMRay report: %s" % file_type)
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(analysis):
yield feature, addr
GLOBAL_HANDLER = (
extract_format,
extract_os,
extract_arch,
)

View File

@@ -0,0 +1,334 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Dict, List, Union, Optional
import xmltodict
from pydantic import Field, BaseModel
from typing_extensions import Annotated
from pydantic.functional_validators import BeforeValidator
"""
# possible param types, included for documentation
PARAM_TYPE = (
"signed_8bit",
"unsigned_8bit",
"signed_16bit",
"unsigned_16bit",
"signed_32bit",
"unsigned_32bit",
"signed_64bit",
"unsigned_64bit",
"double",
"void_ptr",
"bool",
"unknown",
"ptr",
"void",
"str",
"array",
"container",
"bindata",
"undefined_type",
)
"""
PARAM_TYPE_PTR = ("void_ptr", "ptr")
PARAM_TYPE_STR = ("str",)
PARAM_TYPE_INT = (
"signed_8bit",
"unsigned_8bit",
"signed_16bit",
"unsigned_16bit",
"signed_32bit",
"unsigned_32bit",
"signed_64bit",
"unsigned_64bit",
"double",
"bool",
"unknown",
)
def xml_to_dict(xml):
return xmltodict.parse(xml, attr_prefix="")
def hexint(value: Union[str, int]) -> int:
if isinstance(value, str):
return int(value, 16) if value.startswith("0x") else int(value, 10)
else:
return value
def validate_hex_int(value: Union[str, int]) -> int:
return hexint(value)
# convert the input value to a Python int type before inner validation (int) is called
HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
# models flog.xml file, certain fields left as comments for documentation purposes
class ParamDeref(BaseModel):
type_: str = Field(alias="type")
value: Optional[str] = None
class Param(BaseModel):
name: str
type_: str = Field(alias="type")
value: Optional[str] = None
deref: Optional[ParamDeref] = None
def validate_param_list(value: Union[List[Param], Param]) -> List[Param]:
if isinstance(value, list):
return value
else:
return [value]
# params may be stored as a list of Param or a single Param so we convert
# the input value to Python list type before the inner validation (List[Param])
# is called
ParamList = Annotated[List[Param], BeforeValidator(validate_param_list)]
class Params(BaseModel):
params: ParamList = Field(alias="param")
def validate_call_name(value: str) -> str:
if value.startswith("sys_"):
# VMRay appears to log kernel function calls ("sys_*") for Linux so we remove that
# here to enable capa matching
return value[4:]
else:
return value
# function call names may need to be reformatted to remove data, etc. so we reformat
# before calling the inner validation (str)
CallName = Annotated[str, BeforeValidator(validate_call_name)]
class FunctionCall(BaseModel):
# ts: HexInt
fncall_id: HexInt
process_id: HexInt
thread_id: HexInt
name: CallName
# addr: HexInt
# from_addr: HexInt = Field(alias="from")
params_in: Optional[Params] = Field(alias="in", default=None)
params_out: Optional[Params] = Field(alias="out", default=None)
class FunctionReturn(BaseModel):
ts: HexInt
fncall_id: HexInt
addr: HexInt
from_addr: HexInt = Field(alias="from")
class Analysis(BaseModel):
log_version: str # tested 2
analyzer_version: str # tested 2024.2.1
# analysis_date: str
function_calls: List[FunctionCall] = Field(alias="fncall", default=[])
# function_returns: List[FunctionReturn] = Field(alias="fnret", default=[])
class Flog(BaseModel):
analysis: Analysis
# models for summary_v2.json file, certain fields left as comments for documentation purposes
class GenericReference(BaseModel):
path: List[str]
source: str
class StaticDataReference(GenericReference): ...
class PEFileBasicInfo(BaseModel):
# compile_time: str
# file_type: str
image_base: int
# machine_type: str
# size_of_code: int
# size_of_initialized_data: int
# size_of_uninitialized_data: int
# subsystem: str
# entry_point: int
# imphash: Optional[str] = None
class API(BaseModel):
name: str
ordinal: Optional[int] = None
class PEFileExport(BaseModel):
address: int
api: API
class PEFileImport(BaseModel):
address: int
api: API
# thunk_offset: int
# hint: Optional[int] = None
# thunk_rva: int
class PEFileImportModule(BaseModel):
dll: str
apis: List[PEFileImport]
class PEFileSection(BaseModel):
# entropy: float
# flags: List[str] = []
name: str
# raw_data_offset: int
# raw_data_size: int
virtual_address: int
# virtual_size: int
class PEFile(BaseModel):
basic_info: PEFileBasicInfo
exports: List[PEFileExport] = []
imports: List[PEFileImportModule] = []
sections: List[PEFileSection] = []
class ElfFileSectionHeader(BaseModel):
sh_name: str
sh_addr: int
class ElfFileSection(BaseModel):
header: ElfFileSectionHeader
"""
class ElfFileHeader(BaseModel):
file_class: str
endianness: str
file_type: str
architecture: str
architecture_human_str: str
entry_point: int
"""
class ElfFile(BaseModel):
# file_header: ElfFileHeader
sections: List[ElfFileSection]
class StaticData(BaseModel):
pe: Optional[PEFile] = None
elf: Optional[ElfFile] = None
class FileHashes(BaseModel):
md5: str
sha1: str
sha256: str
# ssdeep: str
class File(BaseModel):
# categories: List[str]
hash_values: FileHashes
# is_artifact: bool
# is_ioc: bool
is_sample: bool
# size: int
# is_truncated: bool
# mime_type: Optional[str] = None
# operations: List[str] = []
# ref_filenames: List[GenericReference] = []
# ref_gfncalls: List[GenericReference] = []
ref_static_data: Optional[StaticDataReference] = None
# ref_vti_matches: List[GenericReference] = []
# verdict: str
class Process(BaseModel):
# bitness: int
# is_artifact: bool
# is_ioc: bool
monitor_id: int
# monitor_reason: str
os_pid: int
filename: str
image_name: str
ref_parent_process: Optional[GenericReference] = None
class Filename(BaseModel):
filename: str
# is_artifact: bool
# is_ioc: bool
# verdict: str
class Mutex(BaseModel):
name: str
# is_artifact: bool
# is_ioc: bool
# verdict: str
class Registry(BaseModel):
reg_key_name: str
# reg_key_value_type: Optional[str] = None
# is_artifact: bool
# is_ioc: bool
# verdict: str
class Domain(BaseModel):
domain: str
# is_artifact: bool
# is_ioc: bool
# verdict: str
class IPAddress(BaseModel):
ip_address: str
# is_artifact: bool
# is_ioc: bool
# verdict: str
class AnalysisMetadata(BaseModel):
sample_type: str
submission_filename: str
class SummaryV2(BaseModel):
analysis_metadata: AnalysisMetadata
static_data: Dict[str, StaticData] = {}
# recorded artifacts
files: Dict[str, File] = {}
processes: Dict[str, Process] = {}
filenames: Dict[str, Filename] = {}
mutexes: Dict[str, Mutex] = {}
domains: Dict[str, Domain] = {}
ip_addresses: Dict[str, IPAddress] = {}
registry_records: Dict[str, Registry] = {}

View File

@@ -12,8 +12,9 @@ import inspect
import logging
import contextlib
import importlib.util
from typing import Dict, Union, BinaryIO, Iterator, NoReturn
from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn
from pathlib import Path
from zipfile import ZipFile
from datetime import datetime
import tqdm
@@ -25,6 +26,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_DRAKVUF,
@@ -34,9 +36,10 @@ from capa.features.common import (
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
# CAPE extensions: .json, .json_, .json.gz
# DRAKVUF Sandbox extensions: .log, .log.gz
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", "log", ".log.gz")
# CAPE (.json, .json_, .json.gz)
# DRAKVUF (.log, .log.gz)
# VMRay (.zip)
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", "log", ".log.gz", ".zip")
EXTENSIONS_ELF = "elf_"
EXTENSIONS_FREEZE = "frz"
@@ -125,16 +128,20 @@ def get_format_from_report(sample: Path) -> str:
line = load_one_jsonl_from_path(sample)
if "Plugin" in line:
return FORMAT_DRAKVUF
return FORMAT_UNKNOWN
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE
if "target" in report and "info" in report and "behavior" in report:
# CAPE report that's missing the "CAPE" key,
# which is not going to be much use, but its correct.
return FORMAT_CAPE
elif sample.name.endswith(".zip"):
with ZipFile(sample, "r") as zipfile:
namelist: List[str] = zipfile.namelist()
if "logs/summary_v2.json" in namelist and "logs/flog.xml" in namelist:
# assume VMRay zipfile at a minimum has these files
return FORMAT_VMRAY
elif sample.name.endswith(("json", "json_", "json.gz")):
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE
if "target" in report and "info" in report and "behavior" in report:
# CAPE report that's missing the "CAPE" key,
# which is not going to be much use, but its correct.
return FORMAT_CAPE
return FORMAT_UNKNOWN
@@ -244,6 +251,17 @@ def log_unsupported_drakvuf_report_error(error: str):
logger.error("-" * 80)
def log_unsupported_vmray_report_error(error: str):
logger.error("-" * 80)
logger.error(" Input file is not a valid VMRay analysis archive: %s", error)
logger.error(" ")
logger.error(
" capa only supports analyzing VMRay dynamic analysis archives containing summary_v2.json and flog.xml log files."
)
logger.error(" Please make sure you have downloaded a dynamic analysis archive from VMRay.")
logger.error("-" * 80)
def log_empty_sandbox_report_error(error: str, sandbox_name: str):
logger.error("-" * 80)
logger.error(" %s report is empty or only contains little useful data: %s", sandbox_name, error)

View File

@@ -44,6 +44,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
FORMAT_DRAKVUF,
)
@@ -63,6 +64,7 @@ BACKEND_BINJA = "binja"
BACKEND_PEFILE = "pefile"
BACKEND_CAPE = "cape"
BACKEND_DRAKVUF = "drakvuf"
BACKEND_VMRAY = "vmray"
BACKEND_FREEZE = "freeze"
@@ -218,6 +220,11 @@ def get_extractor(
report = capa.helpers.load_jsonl_from_path(input_path)
return capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report)
elif backend == BACKEND_VMRAY:
import capa.features.extractors.vmray.extractor
return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_path)
elif backend == BACKEND_DOTNET:
import capa.features.extractors.dnfile.extractor
@@ -342,6 +349,11 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr
report = capa.helpers.load_jsonl_from_path(input_file)
file_extractors.append(capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report))
elif input_format == FORMAT_VMRAY:
import capa.features.extractors.vmray.extractor
file_extractors.append(capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file))
return file_extractors

View File

@@ -46,6 +46,7 @@ from capa.loader import (
BACKEND_VIV,
BACKEND_CAPE,
BACKEND_BINJA,
BACKEND_VMRAY,
BACKEND_DOTNET,
BACKEND_FREEZE,
BACKEND_PEFILE,
@@ -59,6 +60,7 @@ from capa.helpers import (
log_unsupported_format_error,
log_empty_sandbox_report_error,
log_unsupported_cape_report_error,
log_unsupported_vmray_report_error,
log_unsupported_drakvuf_report_error,
)
from capa.exceptions import (
@@ -80,6 +82,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
@@ -259,6 +262,7 @@ def install_common_args(parser, wanted=None):
(FORMAT_SC64, "64-bit shellcode"),
(FORMAT_CAPE, "CAPE sandbox report"),
(FORMAT_DRAKVUF, "DRAKVUF sandbox report"),
(FORMAT_VMRAY, "VMRay sandbox report"),
(FORMAT_FREEZE, "features previously frozen by capa"),
]
format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats])
@@ -281,6 +285,7 @@ def install_common_args(parser, wanted=None):
(BACKEND_FREEZE, "capa freeze"),
(BACKEND_CAPE, "CAPE"),
(BACKEND_DRAKVUF, "DRAKVUF"),
(BACKEND_VMRAY, "VMRay"),
]
backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends])
parser.add_argument(
@@ -552,6 +557,9 @@ def get_backend_from_cli(args, input_format: str) -> str:
if input_format == FORMAT_DRAKVUF:
return BACKEND_DRAKVUF
elif input_format == FORMAT_VMRAY:
return BACKEND_VMRAY
elif input_format == FORMAT_DOTNET:
return BACKEND_DOTNET
@@ -576,7 +584,7 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
if backend in (BACKEND_CAPE, BACKEND_DRAKVUF):
if backend in (BACKEND_CAPE, BACKEND_DRAKVUF, BACKEND_VMRAY):
return None
else:
return args.input_file
@@ -690,6 +698,8 @@ def get_file_extractors_from_cli(args, input_format: str) -> List[FeatureExtract
log_unsupported_cape_report_error(str(e))
elif input_format == FORMAT_DRAKVUF:
log_unsupported_drakvuf_report_error(str(e))
elif input_format == FORMAT_VMRAY:
log_unsupported_vmray_report_error(str(e))
else:
log_unsupported_format_error()
raise ShouldExitError(E_INVALID_FILE_TYPE) from e
@@ -809,6 +819,8 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
log_unsupported_cape_report_error(str(e))
elif input_format == FORMAT_DRAKVUF:
log_unsupported_drakvuf_report_error(str(e))
elif input_format == FORMAT_VMRAY:
log_unsupported_vmray_report_error(str(e))
else:
log_unsupported_format_error()
raise ShouldExitError(E_INVALID_FILE_TYPE) from e

View File

@@ -22,7 +22,7 @@ import capa.features.address
import capa.features.freeze.features as frzf
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.helpers import assert_never
from capa.helpers import assert_never, load_json_from_path
class FrozenModel(BaseModel):
@@ -668,4 +668,5 @@ class ResultDocument(FrozenModel):
@classmethod
def from_file(cls, path: Path) -> "ResultDocument":
return cls.model_validate_json(path.read_text(encoding="utf-8"))
report = load_json_from_path(path)
return cls.model_validate(report)

View File

@@ -80,6 +80,7 @@ dependencies = [
"humanize>=4",
"protobuf>=5",
"msgspec>=0.18.6",
"xmltodict>=0.13.0",
# ---------------------------------------
# Dependencies that we develop

View File

@@ -28,6 +28,7 @@ pyasn1-modules==0.2.8
pycparser==2.22
pydantic==2.7.3
pydantic-core==2.18.4
xmltodict==0.13.0
pyelftools==0.31
pygments==2.18.0
python-flirt==0.8.10

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
"""
Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
Extract files relevant to capa analysis from VMRay Analysis Archive and create a new ZIP file.
"""
import sys
import logging
import zipfile
import argparse
from pathlib import Path
from capa.features.extractors.vmray import DEFAULT_ARCHIVE_PASSWORD, VMRayAnalysis
logger = logging.getLogger(__name__)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(
description="Minimize VMRay Analysis Archive to ZIP file only containing relevant files"
)
parser.add_argument(
"analysis_archive",
type=Path,
help="path to VMRay Analysis Archive downloaded from Dynamic Analysis Report page",
)
parser.add_argument(
"-p", "--password", type=str, default="infected", help="password used to unzip and zip protected archives"
)
args = parser.parse_args(args=argv)
analysis_archive = args.analysis_archive
vmra = VMRayAnalysis(analysis_archive)
sv2_json = vmra.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
flog_xml = vmra.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
sample_file_buf = vmra.sample_file_buf
assert vmra.sample_file_analysis is not None
sample_sha256: str = vmra.sample_file_analysis.hash_values.sha256.lower()
new_zip_name = f"{analysis_archive.parent / analysis_archive.stem}_min.zip"
with zipfile.ZipFile(new_zip_name, "w") as new_zip:
new_zip.writestr("logs/summary_v2.json", sv2_json)
new_zip.writestr("logs/flog.xml", flog_xml)
new_zip.writestr(f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}", sample_file_buf)
new_zip.setpassword(args.password.encode("ascii"))
# ensure capa loads the minimized archive
assert isinstance(VMRayAnalysis(Path(new_zip_name)), VMRayAnalysis)
print(f"Created minimized VMRay archive '{new_zip_name}' with password '{args.password}'.")
if __name__ == "__main__":
sys.exit(main())

View File

@@ -229,40 +229,37 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
for p in processes:
print(f"proc: {extractor.get_process_name(p)} (ppid={p.address.ppid}, pid={p.address.pid})")
for feature, addr in extractor.extract_process_features(p):
for feature, _ in extractor.extract_process_features(p):
if is_global_feature(feature):
continue
print(f" proc: {extractor.get_process_name(p)}: {feature}")
for t in extractor.get_threads(p):
print(f" thread: {t.address.tid}")
for feature, addr in extractor.extract_thread_features(p, t):
for t in extractor.get_threads(p):
print(f" thread: {t.address.tid}")
for feature, addr in extractor.extract_thread_features(p, t):
if is_global_feature(feature):
continue
if feature != Feature(0):
print(f" {format_address(addr)}: {feature}")
for call in extractor.get_calls(p, t):
apis = []
arguments = []
for feature, addr in extractor.extract_call_features(p, t, call):
if is_global_feature(feature):
continue
if feature != Feature(0):
print(f" {format_address(addr)}: {feature}")
if isinstance(feature, API):
assert isinstance(addr, capa.features.address.DynamicCallAddress)
apis.append((addr.id, str(feature.value)))
for call in extractor.get_calls(p, t):
apis = []
arguments = []
for feature, addr in extractor.extract_call_features(p, t, call):
if is_global_feature(feature):
continue
if isinstance(feature, (Number, String)):
arguments.append(str(feature.value))
if isinstance(feature, API):
assert isinstance(addr, capa.features.address.DynamicCallAddress)
apis.append((addr.id, str(feature.value)))
if isinstance(feature, (Number, String)):
arguments.append(str(feature.value))
if not apis:
print(f" arguments=[{', '.join(arguments)}]")
for cid, api in apis:
print(f" call {cid}: {api}({', '.join(arguments)})")
for cid, api in apis:
print(f" call {cid}: {api}({', '.join(arguments)})")
def ida_main():

View File

@@ -209,6 +209,13 @@ def get_drakvuf_extractor(path):
return DrakvufExtractor.from_report(report)
@lru_cache(maxsize=1)
def get_vmray_extractor(path):
from capa.features.extractors.vmray.extractor import VMRayExtractor
return VMRayExtractor.from_zipfile(path)
@lru_cache(maxsize=1)
def get_ghidra_extractor(path: Path):
import capa.features.extractors.ghidra.extractor
@@ -395,7 +402,7 @@ def get_data_path_by_name(name) -> Path:
/ "v2.2"
/ "d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json.gz"
)
elif name.startswith("93b2d1"):
elif name.startswith("93b2d1-drakvuf"):
return (
CD
/ "data"
@@ -403,6 +410,14 @@ def get_data_path_by_name(name) -> Path:
/ "drakvuf"
/ "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795.log.gz"
)
elif name.startswith("93b2d1-vmray"):
return (
CD
/ "data"
/ "dynamic"
/ "vmray"
/ "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795_min_archive.zip"
)
elif name.startswith("ea2876"):
return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_"
elif name.startswith("1038a2"):
@@ -1537,4 +1552,7 @@ def a076114_rd():
@pytest.fixture
def dynamic_a0000a6_rd():
# python -m capa.main tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json --json > tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json
return get_result_doc(CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json")
# gzip tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json
return get_result_doc(
CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz"
)

View File

@@ -15,26 +15,31 @@ import capa.features.common
DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS = sorted(
[
("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
("93b2d1-drakvuf", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), True),
("93b2d1-drakvuf", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), True),
# thread/api calls
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False),
# call/string argument
(
"93b2d1",
"93b2d1-drakvuf",
"process=(3564:4852),thread=6592,call=1",
capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'),
True,
),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), False),
(
"93b2d1-drakvuf",
"process=(3564:4852),thread=6592,call=1",
capa.features.common.String("non_existant"),
False,
),
# call/number argument
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), True),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), False),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
@@ -43,26 +48,26 @@ DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS = sorted(
DYNAMIC_DRAKVUF_FEATURE_COUNT_TESTS = sorted(
[
("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
("93b2d1-drakvuf", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), 1),
("93b2d1-drakvuf", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), 1),
# thread/api calls
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), 9),
("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), 9),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), 1),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), 0),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), 1),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), 0),
# call/string argument
(
"93b2d1",
"93b2d1-drakvuf",
"process=(3564:4852),thread=6592,call=1",
capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'),
1,
),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), 0),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), 0),
# call/number argument
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), 1),
("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), 0),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), 1),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.

View File

@@ -27,7 +27,7 @@ def get_binary_file_path():
return str(CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_")
def get_report_file_path():
def get_cape_report_file_path():
return str(
CD
/ "data"
@@ -63,9 +63,10 @@ def get_rule_path():
pytest.param("show-capabilities-by-function.py", [get_binary_file_path()]),
pytest.param("show-features.py", [get_binary_file_path()]),
pytest.param("show-features.py", ["-F", "0x407970", get_binary_file_path()]),
pytest.param("show-features.py", ["-P", "MicrosoftEdgeUpdate.exe", get_report_file_path()]),
pytest.param("show-features.py", ["-P", "MicrosoftEdgeUpdate.exe", get_cape_report_file_path()]),
pytest.param("show-unused-features.py", [get_binary_file_path()]),
pytest.param("capa_as_library.py", [get_binary_file_path()]),
pytest.param("capa-as-library.py", [get_binary_file_path()]),
# not testing "minimize-vmray-results.py" as we don't currently upload full VMRay analysis archives
],
)
def test_scripts(script, args):

View File

@@ -0,0 +1,89 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import fixtures
import capa.main
import capa.features.file
import capa.features.insn
import capa.features.common
DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS = sorted(
[
("93b2d1-vmray", "file", capa.features.common.String("api.%x%x.%s"), True),
("93b2d1-vmray", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), True),
# thread/api calls
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), True),
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), True),
# call/string argument
(
"93b2d1-vmray",
"process=(2176:0),thread=7,call=10323",
capa.features.common.String("raw.githubusercontent.com"),
True,
),
# call/number argument
# VirtualAlloc(4096, 4)
("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4096), True),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4), True),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_VMRAY_FEATURE_COUNT_TESTS = sorted(
[
# file/imports
("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), 1),
# thread/api calls
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), 5),
# call/api
("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("GetAddrInfoW"), 0),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), 1),
# call/string argument
(
"93b2d1-vmray",
"process=(2176:0),thread=7,call=10323",
capa.features.common.String("raw.githubusercontent.com"),
1,
),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10323", capa.features.common.String("non_existant"), 0),
# call/number argument
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4096), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(404), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
@fixtures.parametrize(
"sample,scope,feature,expected",
DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_vmray_features(sample, scope, feature, expected):
fixtures.do_test_feature_presence(fixtures.get_vmray_extractor, sample, scope, feature, expected)
@fixtures.parametrize(
"sample,scope,feature,expected",
DYNAMIC_VMRAY_FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_vmray_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_vmray_extractor, sample, scope, feature, expected)

160
tests/test_vmray_model.py Normal file
View File

@@ -0,0 +1,160 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import textwrap
from capa.features.extractors.vmray.models import (
Param,
PEFile,
ElfFile,
FunctionCall,
AnalysisMetadata,
hexint,
xml_to_dict,
)
def test_vmray_model_param():
param_str = textwrap.dedent(
"""
<param name="addrlen" type="signed_32bit" value="16"/>
"""
)
param: Param = Param.model_validate(xml_to_dict(param_str)["param"])
assert param.value is not None
assert hexint(param.value) == 16
def test_vmray_model_param_deref():
param_str = textwrap.dedent(
"""
<param name="buf" type="ptr" value="0xaaaaaaaa">
<deref type="str" value="Hello world"/>
</param>
"""
)
param: Param = Param.model_validate(xml_to_dict(param_str)["param"])
assert param.deref is not None
assert param.deref.value == "Hello world"
def test_vmray_model_function_call():
function_call_str = textwrap.dedent(
"""
<fncall fncall_id="18" process_id="1" thread_id="1" name="sys_time">
<in>
<param name="tloc" type="unknown" value="0x0"/>
</in>
<out>
<param name="ret_val" type="unknown" value="0xaaaaaaaa"/>
</out>
</fncall>
"""
)
function_call: FunctionCall = FunctionCall.model_validate(xml_to_dict(function_call_str)["fncall"])
assert function_call.fncall_id == 18
assert function_call.process_id == 1
assert function_call.thread_id == 1
assert function_call.name == "time"
assert function_call.params_in is not None
assert function_call.params_in.params[0].value is not None
assert hexint(function_call.params_in.params[0].value) == 0
assert function_call.params_out is not None
assert function_call.params_out.params[0].value is not None
assert hexint(function_call.params_out.params[0].value) == 2863311530
def test_vmray_model_analysis_metadata():
analysis_metadata: AnalysisMetadata = AnalysisMetadata.model_validate_json(
"""
{
"sample_type": "Linux ELF Executable (x86-64)",
"submission_filename": "abcd1234"
}
"""
)
assert analysis_metadata.sample_type == "Linux ELF Executable (x86-64)"
assert analysis_metadata.submission_filename == "abcd1234"
def test_vmray_model_elffile():
elffile: ElfFile = ElfFile.model_validate_json(
"""
{
"sections": [
{
"header": {
"sh_name": "abcd1234",
"sh_addr": 2863311530
}
}
]
}
"""
)
assert elffile.sections[0].header.sh_name == "abcd1234"
assert elffile.sections[0].header.sh_addr == 2863311530
def test_vmray_model_pefile():
pefile: PEFile = PEFile.model_validate_json(
"""
{
"basic_info": {
"image_base": 2863311530
},
"imports": [
{
"apis": [
{
"address": 2863311530,
"api": {
"name": "Sleep"
}
}
],
"dll": "KERNEL32.dll"
}
],
"sections": [
{
"name": ".text",
"virtual_address": 2863311530
}
],
"exports": [
{
"api": {
"name": "HelloWorld",
"ordinal": 10
},
"address": 2863311530
}
]
}
"""
)
assert pefile.basic_info.image_base == 2863311530
assert pefile.imports[0].dll == "KERNEL32.dll"
assert pefile.imports[0].apis[0].address == 2863311530
assert pefile.imports[0].apis[0].api.name == "Sleep"
assert pefile.sections[0].name == ".text"
assert pefile.sections[0].virtual_address == 2863311530
assert pefile.exports[0].address == 2863311530
assert pefile.exports[0].api.name == "HelloWorld"
assert pefile.exports[0].api.ordinal == 10