vmray: use xmltodict instead of pydantic_xml to improve performance

This commit is contained in:
Mike Hunhoff
2024-06-20 10:08:27 -06:00
parent 5be68d0751
commit ec21f3b3fc
5 changed files with 32 additions and 88 deletions

View File

@@ -8,11 +8,11 @@
from typing import Dict
from capa.exceptions import UnsupportedFormatError
from capa.features.extractors.vmray.models import File, Analysis, SummaryV2, StaticData
from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData
class VMRayAnalysis:
def __init__(self, sv2: SummaryV2, flog: Analysis):
def __init__(self, sv2: SummaryV2, flog: Flog):
self.sv2 = sv2 # logs/summary_v2.json
self.flog = flog # logs/flog.xml
self.exports: Dict[int, str] = {}

View File

@@ -5,7 +5,6 @@ from capa.helpers import assert_never
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.vmray.models import Analysis
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)

View File

@@ -11,13 +11,15 @@ from typing import Tuple, Iterator
from pathlib import Path
from zipfile import ZipFile
import xmltodict
import capa.helpers
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import Process, Analysis, SummaryV2
from capa.features.extractors.vmray.models import Flog, Process, SummaryV2
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
@@ -95,6 +97,7 @@ class VMRayExtractor(DynamicFeatureExtractor):
sv2 = SummaryV2.model_validate(sv2_json)
flog_xml = zipfile.read("logs/flog.xml", pwd=b"infected")
flog = Analysis.from_xml(flog_xml)
flog_json = xmltodict.parse(flog_xml, attr_prefix="")
flog = Flog.model_validate(flog_json)
return cls(VMRayAnalysis(sv2, flog))

View File

@@ -8,96 +8,38 @@
from typing import Dict, List, Optional
from pydantic import BaseModel
from pydantic_xml import BaseXmlModel, attr, element
from pydantic import Field, BaseModel
### models for flog.xml
class Param(BaseXmlModel, tag="param"):
name: str = attr()
type: str = attr()
value: Optional[str] = attr(default=None)
### models flog.xml files
class FunctionCall(BaseModel):
ts: str
fncall_id: str
process_id: str
thread_id: str
name: str
addr: str
from_addr: str = Field(alias="from")
# or see https://pydantic-xml.readthedocs.io/en/latest/pages/quickstart.html#wrapper
class In(BaseXmlModel, tag="in"):
params: List[Param] = element(name="in")
class FunctionReturn(BaseModel):
ts: str
fncall_id: str
addr: str
from_addr: str = Field(alias="from")
class Out(BaseXmlModel, tag="out"):
params: List[Param] = element(name="out")
class Analysis(BaseModel):
log_version: str
analyzer_version: str
analysis_date: str
function_calls: List[FunctionCall] = Field(alias="fncall", default=[])
function_returns: List[FunctionReturn] = Field(alias="fnret", default=[])
class FunctionCall(BaseXmlModel, tag="fncall"):
ts: int = attr()
fncall_id: int = attr()
process_id: int = attr()
thread_id: int = attr()
name: str = attr() # API call name?
address: str = attr(name="addr")
from_: str = attr(name="from")
in_: Optional[In] = element(tag="in", default=None)
out_: Optional[Out] = element(tag="out", default=None)
# note that not all fncalls always have an associated fnret, e.g. exit or WaitForSingleObject
class FunctionReturn(BaseXmlModel, tag="fnret"):
ts: int = attr()
fncall_id: int = attr()
address: str = attr(name="addr") # string that contains a hex value
from_: str = attr(name="from") # string that contains a hex value
# TODO check multiple are there
class MonitorProcess(BaseXmlModel, tag="monitor_process"):
ts: int = attr()
process_id: int = attr()
image_name: str = attr()
# TODO check multiple are there
class MonitorThread(BaseXmlModel, tag="monitor_thread"):
ts: int = attr()
thread_id: int = attr()
process_id: int = attr()
os_tid: str = attr() # TODO hex
class NewRegion(BaseXmlModel, tag="new_region"):
ts: int = attr()
region_id: int = attr()
process_id: int = attr()
start_va: str = attr()
end_va: str = attr()
entry_point: str = attr()
class RemoveRegion(BaseXmlModel, tag="remove_region"):
ts: int = attr()
region_id: int = attr()
# unordered is very slow, but elements may occur in any order
class Analysis(BaseXmlModel, tag="analysis", search_mode="unordered"):
log_version: str = attr()
analyzer_version: str = attr()
analysis_date: str = attr()
# super slow
# data: List[Union[MonitorProcess, MonitorThread, NewRegion, RemoveRegion, FunctionCall, FunctionReturn]]
# may want to preprocess file and remove/reorder entries for more efficient parsing
processes: List[MonitorProcess] = element(tag="monitor_process")
threads: List[MonitorThread] = element(tag="monitor_thread")
# not important and slow down parsing
# new_regions: List[NewRegion] = element(tag="new_region")
# remove_regions: List[RemoveRegion] = element(tag="remove_region")
# very slow alternative; calls: List[Union[FunctionCall, FunctionReturn]]
fncalls: List[FunctionCall] = element(tag="fncall")
fnrets: List[FunctionReturn] = element(tag="fnret")
class Flog(BaseModel):
analysis: Analysis
### models for summary_v2.json files

View File

@@ -28,7 +28,7 @@ pyasn1-modules==0.2.8
pycparser==2.22
pydantic==2.7.3
pydantic-core==2.18.4
pydantic-xml==2.11.0
xmltodict==0.13.0
pyelftools==0.31
pygments==2.18.0
python-flirt==0.8.10