mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
vmray: emit string file featureS
This commit is contained in:
@@ -5,17 +5,35 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
from collections import defaultdict
|
||||
|
||||
import xmltodict
|
||||
|
||||
from capa.exceptions import UnsupportedFormatError
|
||||
from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TODO (meh): is default password "infected" good enough?? https://github.com/mandiant/capa/issues/2148
|
||||
DEFAULT_ARCHIVE_PASSWORD = b"infected"
|
||||
|
||||
|
||||
class VMRayAnalysis:
|
||||
def __init__(self, sv2: SummaryV2, flog: Flog):
|
||||
self.sv2 = sv2 # logs/summary_v2.json
|
||||
self.flog = flog # logs/flog.xml
|
||||
def __init__(self, zipfile_path: Path):
|
||||
self.zipfile = ZipFile(zipfile_path, "r")
|
||||
|
||||
sv2_json = json.loads(self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD))
|
||||
self.sv2 = SummaryV2.model_validate(sv2_json)
|
||||
|
||||
flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
flog_json = xmltodict.parse(flog_xml, attr_prefix="")
|
||||
self.flog = Flog.model_validate(flog_json)
|
||||
|
||||
self.exports: Dict[int, str] = {}
|
||||
self.imports: Dict[int, str] = {}
|
||||
self.sections: Dict[int, str] = {}
|
||||
@@ -37,6 +55,13 @@ class VMRayAnalysis:
|
||||
if not self.sample_file_static_data.pe:
|
||||
raise UnsupportedFormatError("VMRay feature extractor only supports PE at this time")
|
||||
|
||||
sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower()
|
||||
sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}"
|
||||
|
||||
logger.debug("sample file path: %s", sample_file_path)
|
||||
|
||||
self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
|
||||
def _find_sample_file(self):
|
||||
for file_name, file_analysis in self.sv2.files.items():
|
||||
if file_analysis.is_sample:
|
||||
|
||||
@@ -6,12 +6,9 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from typing import Tuple, Iterator
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
import xmltodict
|
||||
|
||||
import capa.helpers
|
||||
import capa.features.extractors.vmray.call
|
||||
@@ -20,7 +17,7 @@ import capa.features.extractors.vmray.global_
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, DynamicCallAddress, AbsoluteVirtualAddress
|
||||
from capa.features.extractors.vmray import VMRayAnalysis
|
||||
from capa.features.extractors.vmray.models import Flog, Process, SummaryV2
|
||||
from capa.features.extractors.vmray.models import Process
|
||||
from capa.features.extractors.base_extractor import (
|
||||
CallHandle,
|
||||
SampleHashes,
|
||||
@@ -94,13 +91,4 @@ class VMRayExtractor(DynamicFeatureExtractor):
|
||||
|
||||
@classmethod
|
||||
def from_zipfile(cls, zipfile_path: Path):
|
||||
with ZipFile(zipfile_path, "r") as zipfile:
|
||||
# TODO (meh): is default password "infected" good enough?? https://github.com/mandiant/capa/issues/2148
|
||||
sv2_json = json.loads(zipfile.read("logs/summary_v2.json", pwd=b"infected"))
|
||||
sv2 = SummaryV2.model_validate(sv2_json)
|
||||
|
||||
flog_xml = zipfile.read("logs/flog.xml", pwd=b"infected")
|
||||
flog_json = xmltodict.parse(flog_xml, attr_prefix="")
|
||||
flog = Flog.model_validate(flog_json)
|
||||
|
||||
return cls(VMRayAnalysis(sv2, flog))
|
||||
return cls(VMRayAnalysis(zipfile_path))
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import logging
|
||||
from typing import Dict, Tuple, Iterator
|
||||
|
||||
import capa.features.extractors.common
|
||||
from capa.features.file import Export, Section
|
||||
from capa.features.common import String, Feature
|
||||
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
|
||||
@@ -70,6 +71,10 @@ def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[T
|
||||
yield String(registry_record.reg_key_name), NO_ADDRESS
|
||||
|
||||
|
||||
def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
|
||||
yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf)
|
||||
|
||||
|
||||
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
|
||||
for handler in FILE_HANDLERS:
|
||||
for feature, addr in handler(analysis):
|
||||
@@ -85,5 +90,5 @@ FILE_HANDLERS = (
|
||||
extract_referenced_domain_names,
|
||||
extract_referenced_ip_addresses,
|
||||
extract_referenced_registry_key_names,
|
||||
# extract_file_strings,
|
||||
extract_file_strings,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user