Merge branch 'vmray-extractor' into vmray_extractor

This commit is contained in:
r-sm2024
2024-06-18 16:41:36 -05:00
committed by GitHub
10 changed files with 363 additions and 15 deletions

View File

@@ -461,6 +461,7 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_VMRAY = "vmray"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
@@ -474,6 +475,7 @@ STATIC_FORMATS = {
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_VMRAY,
FORMAT_FREEZE,
FORMAT_RESULT,
}

View File

@@ -0,0 +1,63 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Dict
from capa.exceptions import UnsupportedFormatError
from capa.features.extractors.vmray.models import File, Analysis, SummaryV2, StaticData
class VMRayAnalysis:
def __init__(self, sv2: SummaryV2, flog: Analysis):
self.sv2 = sv2 # logs/summary_v2.json
self.flog = flog # logs/flog.xml
self.exports: Dict[int, str] = {}
self.imports: Dict[int, str] = {}
self.sections: Dict[int, str] = {}
self.base_address: int
self.sample_file_name: str
self.sample_file_analysis: File
self.sample_file_static_data: StaticData
self._find_sample_file()
self._compute_base_address()
self._compute_exports()
self._compute_sections()
if not self.sample_file_static_data.pe:
raise UnsupportedFormatError("VMRay feature extractor only supports PE at this time")
def _find_sample_file(self):
for file_name, file_analysis in self.sv2.files.items():
if file_analysis.is_sample:
# this indicates the sample submitted for analysis??
self.sample_file_name = file_name
self.sample_file_analysis = file_analysis
if file_analysis.ref_static_data:
self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]]
break
def _compute_base_address(self):
if self.sample_file_static_data.pe:
self.base_address = self.sample_file_static_data.pe.basic_info.image_base
def _compute_exports(self):
if self.sample_file_static_data.pe:
for export in self.sample_file_static_data.pe.exports:
self.exports[export.address] = export.api.name
def _compute_imports(self):
# TODO (meh)
...
def _compute_sections(self):
if self.sample_file_static_data.pe:
for section in self.sample_file_static_data.pe.sections:
self.sections[section.virtual_address] = section.name

View File

@@ -0,0 +1,100 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
from typing import Tuple, Iterator
from pathlib import Path
from zipfile import ZipFile
import capa.helpers
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import Process, Analysis, SummaryV2
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)
# TODO also/or look into xmltodict?
class VMRayExtractor(DynamicFeatureExtractor):
def __init__(self, analysis: VMRayAnalysis):
super().__init__(
hashes=SampleHashes(
md5=analysis.sample_file_analysis.hash_values.md5.lower(),
sha1=analysis.sample_file_analysis.hash_values.sha1.lower(),
sha256=analysis.sample_file_analysis.hash_values.sha256.lower(),
)
)
self.analysis = analysis
# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis))
def get_base_address(self) -> Address:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.analysis.base_address)
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.vmray.file.get_processes(self.analysis)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
# TODO (meh)
yield from []
def get_process_name(self, ph) -> str:
process: Process = ph.inner
return process.image_name
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
# TODO (meh)
yield from []
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield from []
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
# TODO (meh)
yield from []
def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
# TODO (meh)
yield from []
def get_call_name(self, ph, th, ch) -> str:
# TODO (meh)
raise NotImplementedError()
@classmethod
def from_zipfile(cls, zipfile_path: Path):
with ZipFile(zipfile_path, "r") as zipfile:
sv2_json = json.loads(zipfile.read("logs/summary_v2.json", pwd=b"infected"))
sv2 = SummaryV2.model_validate(sv2_json)
flog_xml = zipfile.read("logs/flog.xml", pwd=b"infected")
flog = Analysis.from_xml(flog_xml)
return cls(VMRayAnalysis(sv2, flog))

View File

@@ -0,0 +1,88 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
from capa.features.file import Export, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import Process
from capa.features.extractors.base_extractor import ProcessHandle
logger = logging.getLogger(__name__)
def get_processes(analysis: VMRayAnalysis) -> Iterator[ProcessHandle]:
processes: Dict[str, Process] = analysis.sv2.processes
for _, process in processes.items():
pid = process.os_pid
ppid = processes[process.ref_parent_process.path[1]].os_pid if process.ref_parent_process else 0
addr = ProcessAddress(pid=pid, ppid=ppid)
yield ProcessHandle(address=addr, inner=process)
def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.exports.items():
yield Export(name), AbsoluteVirtualAddress(addr)
def extract_import_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
# TODO (meh)
yield from []
def extract_section_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.sections.items():
yield Section(name), AbsoluteVirtualAddress(addr)
def extract_referenced_filenames(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for _, filename in analysis.sv2.filenames.items():
yield String(filename.filename), NO_ADDRESS
def extract_referenced_mutex_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for _, mutex in analysis.sv2.mutexes.items():
yield String(mutex.name), NO_ADDRESS
def extract_referenced_domain_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for _, domain in analysis.sv2.domains.items():
yield String(domain.domain), NO_ADDRESS
def extract_referenced_ip_addresses(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for _, ip_address in analysis.sv2.ip_addresses.items():
yield String(ip_address.ip_address), NO_ADDRESS
def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for _, registry_record in analysis.sv2.registry_records.items():
yield String(registry_record.reg_key_name), NO_ADDRESS
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(analysis):
yield feature, addr
FILE_HANDLERS = (
extract_import_names,
extract_export_names,
extract_section_names,
extract_referenced_filenames,
extract_referenced_mutex_names,
extract_referenced_domain_names,
extract_referenced_ip_addresses,
extract_referenced_registry_key_names,
# extract_file_strings,
)

View File

@@ -0,0 +1,61 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.common import OS, ARCH_I386, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.vmray import VMRayAnalysis
logger = logging.getLogger(__name__)
def extract_arch(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
sample_type: str = analysis.sv2.analysis_metadata.sample_type
if "x86-32" in sample_type:
yield Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in sample_type:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warning("unrecognized arch: %s", sample_type)
raise ValueError(f"unrecognized arch from the VMRay report: {sample_type}")
def extract_format(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
if analysis.sample_file_static_data.pe:
yield Format(FORMAT_PE), NO_ADDRESS
else:
logger.warning("unrecognized file format: %s", analysis.sv2.analysis_metadata.sample_type)
raise ValueError(
f"unrecognized file format from the VMRay report: {analysis.sv2.analysis_metadata.sample_type}"
)
def extract_os(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
sample_type: str = analysis.sv2.analysis_metadata.sample_type
if "windows" in sample_type.lower():
yield OS(OS_WINDOWS), NO_ADDRESS
else:
logger.warning("unrecognized OS: %s", sample_type)
raise ValueError(f"unrecognized OS from the VMRay report: {sample_type}")
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(analysis):
yield feature, addr
GLOBAL_HANDLER = (
extract_format,
extract_os,
extract_arch,
)

View File

@@ -63,5 +63,4 @@ class Analysis(BaseXmlModel, tag="analysis"):
new_regions: List[NewRegion] = element(tag="new_region")
remove_regions: List[RemoveRegion] = element(tag="remove_region")
fncalls: List[FunctionCall] = element(tag="fncall")
fnrets: List[FunctionReturn] = element(tag="fnret")
fnrets: List[FunctionReturn] = element(tag="fnret")

View File

@@ -12,8 +12,9 @@ import inspect
import logging
import contextlib
import importlib.util
from typing import NoReturn
from typing import List, NoReturn
from pathlib import Path
from zipfile import ZipFile
import tqdm
@@ -23,6 +24,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_UNKNOWN,
@@ -31,7 +33,7 @@ from capa.features.common import (
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz")
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", ".zip")
EXTENSIONS_ELF = "elf_"
EXTENSIONS_FREEZE = "frz"
@@ -83,14 +85,21 @@ def load_json_from_path(json_path: Path):
def get_format_from_report(sample: Path) -> str:
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE
if not sample.name.endswith(".zip"):
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE
if "target" in report and "info" in report and "behavior" in report:
# CAPE report that's missing the "CAPE" key,
# which is not going to be much use, but its correct.
return FORMAT_CAPE
if "target" in report and "info" in report and "behavior" in report:
# CAPE report that's missing the "CAPE" key,
# which is not going to be much use, but its correct.
return FORMAT_CAPE
else:
with ZipFile(sample, "r") as zipfile:
namelist: List[str] = zipfile.namelist()
if "logs/summary_v2.json" in namelist and "logs/flog.xml" in namelist:
# assume VMRay zipfile at a minimum has these files
return FORMAT_VMRAY
return FORMAT_UNKNOWN

View File

@@ -44,6 +44,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
)
from capa.features.address import Address
@@ -61,6 +62,7 @@ BACKEND_DOTNET = "dotnet"
BACKEND_BINJA = "binja"
BACKEND_PEFILE = "pefile"
BACKEND_CAPE = "cape"
BACKEND_VMRAY = "vmray"
BACKEND_FREEZE = "freeze"
@@ -199,6 +201,11 @@ def get_extractor(
report = capa.helpers.load_json_from_path(input_path)
return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
elif backend == BACKEND_VMRAY:
import capa.features.extractors.vmray.extractor
return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_path)
elif backend == BACKEND_DOTNET:
import capa.features.extractors.dnfile.extractor
@@ -316,6 +323,11 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr
report = capa.helpers.load_json_from_path(input_file)
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
elif input_format == FORMAT_VMRAY:
import capa.features.extractors.vmray.extractor
file_extractors.append(capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file))
return file_extractors

View File

@@ -42,7 +42,15 @@ import capa.render.result_document as rdoc
import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.loader import BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, BACKEND_DOTNET, BACKEND_FREEZE, BACKEND_PEFILE
from capa.loader import (
BACKEND_VIV,
BACKEND_CAPE,
BACKEND_BINJA,
BACKEND_VMRAY,
BACKEND_DOTNET,
BACKEND_FREEZE,
BACKEND_PEFILE,
)
from capa.helpers import (
get_file_taste,
get_auto_format,
@@ -70,6 +78,7 @@ from capa.features.common import (
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_VMRAY,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
@@ -232,6 +241,7 @@ def install_common_args(parser, wanted=None):
(FORMAT_SC32, "32-bit shellcode"),
(FORMAT_SC64, "64-bit shellcode"),
(FORMAT_CAPE, "CAPE sandbox report"),
(FORMAT_VMRAY, "VMRay sandbox report"),
(FORMAT_FREEZE, "features previously frozen by capa"),
]
format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats])
@@ -253,6 +263,7 @@ def install_common_args(parser, wanted=None):
(BACKEND_DOTNET, ".NET"),
(BACKEND_FREEZE, "capa freeze"),
(BACKEND_CAPE, "CAPE"),
(BACKEND_VMRAY, "VMRay"),
]
backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends])
parser.add_argument(
@@ -505,6 +516,9 @@ def get_backend_from_cli(args, input_format: str) -> str:
if input_format == FORMAT_CAPE:
return BACKEND_CAPE
elif input_format == FORMAT_VMRAY:
return BACKEND_VMRAY
elif input_format == FORMAT_DOTNET:
return BACKEND_DOTNET
@@ -529,7 +543,7 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
if backend == BACKEND_CAPE:
if backend in (BACKEND_CAPE, BACKEND_VMRAY):
return None
else:
return args.input_file

View File

@@ -227,13 +227,13 @@ def print_static_features(functions, extractor: StaticFeatureExtractor):
def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
for p in processes:
print(f"proc: {p.inner.process_name} (ppid={p.address.ppid}, pid={p.address.pid})")
print(f"proc: {extractor.get_process_name(p)} (ppid={p.address.ppid}, pid={p.address.pid})")
for feature, addr in extractor.extract_process_features(p):
if is_global_feature(feature):
continue
print(f" proc: {p.inner.process_name}: {feature}")
print(f" proc: {extractor.get_process_name(p)}: {feature}")
for t in extractor.get_threads(p):
print(f" thread: {t.address.tid}")