cape: model: document the data we'll use in cape

This commit is contained in:
Willi Ballenthin
2023-08-16 08:57:17 +00:00
committed by GitHub
parent 25aabcd7e4
commit 046427cf55

View File

@@ -29,206 +29,22 @@ class Model(BaseModel):
model_config = ConfigDict(extra="forbid")
# use this type to indicate that we won't model this data.
# because its not relevant to our use in capa.
#
# while its nice to have full coverage of the data shape,
# it can easily change and break our parsing.
# so we really only want to describe what we'll use.
Skip: TypeAlias = Optional[Any]
# mark fields that we haven't seen yet and need to model.
# pydantic should raise an error when encountering data
# in a field with this type.
# then we can update the model with the discovered shape.
TODO: TypeAlias = None
ListTODO: TypeAlias = List[None]
class DictTODO(Model):
pass
class Statistic(Model):
name: str
time: float
class Statistics(Model):
processing: List[Statistic]
signatures: List[Statistic]
reporting: List[Statistic]
class Yara(Model):
name: str
strings: List[str]
addresses: Dict[str, int]
meta: Dict[str, str]
class ClamAV(Model):
name: str
class Payload(Model):
cape_type_code: Optional[int] = None
cape_type: str
name: str
path: str
guest_paths: str
size: int
crc32: str
md5: str
sha1: str
sha256: str
sha512: str
sha3_384: str
ssdeep: str
type: str
yara: List[Yara]
cape_yara: List[Yara]
clamav: List[ClamAV]
tlsh: str
pid: int
process_path: str
process_name: str
module_path: str
virtual_address: Optional[HexInt] = None
target_pid: Optional[int] = None
target_path: Optional[str] = None
target_process: Optional[str] = None
ep_bytes: Optional[HexBytes] = None
entrypoint: Optional[int] = None
timestamp: Optional[str] = None
class CAPE(Model):
payloads: List[Payload]
configs: ListTODO
class Machine(Model):
id: int
status: str
name: str
label: str
manager: str
started_on: str
shutdown_on: str
class Info(Model):
category: str
custom: str
distributed: Optional[DictTODO] = None
duration: int
ended: str
id: int
machine: Machine
options: DictTODO
package: str
parent_id: Optional[int] = None
parent_sample: DictTODO
route: Optional[bool] = None
shrike_refer: Optional[str] = None
shrike_sid: Optional[int] = None
shrike_msg: Optional[str] = None
shrike_url: Optional[str] = None
source_url: Optional[str] = None
started: str
timeout: bool
tlp: Optional[str] = None
user_id: Optional[int] = None
version: str
class Argument(Model):
name: str
value: Union[HexInt, str]
pretty_value: Optional[str] = None
class Call(Model):
timestamp: str
thread_id: int
caller: HexInt
parentcaller: HexInt
category: str
api: str
status: bool
return_: HexInt = Field(alias="return")
pretty_return: Optional[str] = None
arguments: List[Argument]
repeated: int
id: int
class Process(Model):
process_id: int
process_name: str
parent_id: int
module_path: str
first_seen: str
calls: List[Call]
threads: List[int]
environ: Dict[str, str]
class ProcessTree(Model):
name: str
pid: int
parent_id: int
module_path: str
threads: List[int]
environ: Dict[str, str]
children: List["ProcessTree"]
class Summary(Model):
files: List[str]
read_files: List[str]
write_files: List[str]
delete_files: List[str]
keys: List[str]
read_keys: List[str]
write_keys: List[str]
delete_keys: List[str]
executed_commands: List[str]
resolved_apis: List[str]
mutexes: List[str]
created_services: List[str]
started_services: List[str]
class EventFileData(Model):
file: str
pathtofile: Optional[str] = None
moduleaddress: Optional[HexInt] = None
class EventRegData(Model):
regkey: str
content: Optional[str] = None
class EventMoveData(Model):
from_: Optional[str] = Field(alias="from")
to: Optional[str] = None
class EnhancedEvent(Model):
event: str
object: str
timestamp: str
eid: int
data: Union[EventFileData, EventRegData, EventMoveData]
class Behavior(Model):
processes: List[Process]
anomaly: List[str]
processtree: List[ProcessTree]
summary: Summary
enhanced: List[EnhancedEvent]
encryptedbuffers: ListTODO
class Debug(Model):
log: str
errors: List[str]
DictTODO: TypeAlias = Model
class ImportedSymbol(Model):
@@ -330,73 +146,162 @@ class PE(Model):
guest_signers: Signer
class VirusTotalResult(Model):
vendor: str
sig: Optional[str]
class VirusTotalScan(Model):
result: str
detected: Optional[bool] = None
update: Optional[str] = None
version: Optional[str] = None
engine_name: Optional[str] = None
engine_version: Optional[str] = None
engine_update: Optional[str] = None
method: Optional[str] = None
category: Optional[str] = None
class VirusTotal(Model):
md5: str
sha1: str
sha256: str
tlsh: Optional[str] = None
permalink: str
positives: Optional[int] = None
positive: Optional[int] = None
detection: Optional[str] = None
total: int
resource: str
response_code: Optional[int] = None
names: Optional[List[str]] = None
results: List[VirusTotalResult]
scan_date: Optional[str] = None
scan_id: str
scans: Dict[str, VirusTotalScan]
verbose_msg: Optional[str] = None
class VirusTotalError(Model):
error: bool
msg: str
class File(Model):
type: str
cape_type_code: Optional[int] = None
cape_type: Optional[str] = None
name: Union[List[str], str]
path: str
guest_paths: Union[List[str], str, None]
timestamp: Optional[str] = None
size: int
entrypoint: Optional[int] = None
ep_bytes: Optional[HexBytes] = None
#
# hashes
#
crc32: str
md5: str
sha1: str
sha256: str
sha512: str
sha3_384: str
rh_hash: Optional[str] = None
ssdeep: str
tlsh: str
yara: List[Yara]
cape_yara: List[Yara]
clamav: List[ClamAV]
data: Optional[str] = None
rh_hash: Optional[str] = None
#
# other metadata, static analysis
#
size: int
pe: Optional[PE] = None
ep_bytes: Optional[HexBytes] = None
entrypoint: Optional[int] = None
data: Optional[str] = None
strings: Optional[List[str]] = None
virustotal: Optional[Union[VirusTotal, VirusTotalError]] = None
#
# detections (skip)
#
yara: Skip = None
cape_yara: Skip = None
clamav: Skip = None
virustotal: Skip = None
class ProcessFile(File):
#
# like a File, but also has dynamic analysis results
#
pid: int
process_path: str
process_name: str
module_path: str
virtual_address: Optional[HexInt] = None
target_pid: Optional[int] = None
target_path: Optional[str] = None
target_process: Optional[str] = None
class Argument(Model):
name: str
value: Union[HexInt, str]
pretty_value: Optional[str] = None
class Call(Model):
timestamp: str
thread_id: int
category: str
api: str
arguments: List[Argument]
status: bool
return_: HexInt = Field(alias="return")
pretty_return: Optional[str] = None
repeated: int
# virtual addresses
caller: HexInt
parentcaller: HexInt
# index into calls array
id: int
class Process(Model):
process_id: int
process_name: str
parent_id: int
module_path: str
first_seen: str
calls: List[Call]
threads: List[int]
environ: Dict[str, str]
class ProcessTree(Model):
name: str
pid: int
parent_id: int
module_path: str
threads: List[int]
environ: Dict[str, str]
children: List["ProcessTree"]
class EventFileData(Model):
file: str
pathtofile: Optional[str] = None
moduleaddress: Optional[HexInt] = None
class EventRegData(Model):
regkey: str
content: Optional[str] = None
class EventMoveData(Model):
from_: Optional[str] = Field(alias="from")
to: Optional[str] = None
class EnhancedEvent(Model):
event: str
object: str
timestamp: str
eid: int
data: Union[EventFileData, EventRegData, EventMoveData]
class Summary(Model):
files: List[str]
read_files: List[str]
write_files: List[str]
delete_files: List[str]
keys: List[str]
read_keys: List[str]
write_keys: List[str]
delete_keys: List[str]
executed_commands: List[str]
resolved_apis: List[str]
mutexes: List[str]
created_services: List[str]
started_services: List[str]
class Behavior(Model):
summary: Summary
# list of processes, of threads, of calls
processes: List[Process]
# tree of processes
processtree: List[ProcessTree]
anomaly: List[str]
enhanced: List[EnhancedEvent]
encryptedbuffers: ListTODO
class Host(Model):
@@ -411,7 +316,7 @@ class Domain(Model):
ip: str
class TcpConnection(Model):
class TcpEvent(Model):
src: str
sport: int
dst: str
@@ -420,7 +325,7 @@ class TcpConnection(Model):
time: float
class UdpConnection(Model):
class UdpEvent(Model):
src: str
sport: int
dst: str
@@ -429,21 +334,28 @@ class UdpConnection(Model):
time: float
class DnsResolution(Model):
class DnsEvent(Model):
request: str
type: str
answers: ListTODO
class IcmpEvent(Model):
src: str
dst: str
type: int
data: str
class Network(Model):
pcap_sha256: Optional[str] = None
hosts: Optional[List[Host]] = None
domains: Optional[List[Domain]] = None
tcp: Optional[List[TcpConnection]] = None
udp: Optional[List[UdpConnection]] = None
icmp: Optional[ListTODO] = None
tcp: Optional[List[TcpEvent]] = None
udp: Optional[List[UdpEvent]] = None
icmp: Optional[List[IcmpEvent]] = None
http: Optional[ListTODO] = None
dns: Optional[List[DnsResolution]] = None
dns: Optional[List[DnsEvent]] = None
smtp: Optional[ListTODO] = None
irc: Optional[ListTODO] = None
domainlookups: Optional[DictTODO] = None
@@ -454,22 +366,7 @@ class Network(Model):
dead_hosts: Optional[List[Tuple[str, int]]] = None
class FlareCapa(Model):
ATTCK: Dict[str, List[str]]
CAPABILITY: Dict[str, List[str]]
MBC: Dict[str, List[str]]
md5: str
sha1: str
sha256: str
path: str
class Static(Model):
pe: PE
flare_capa: Optional[FlareCapa] = None
class DnsEvent(Model):
class SuricataDnsEvent(Model):
id: int
type: str
rrname: str
@@ -491,7 +388,7 @@ class SuricataNetworkEntry(Model):
dest_ip: str
dest_port: int
dns: Optional[DnsEvent]
dns: Optional[SuricataDnsEvent]
class Suricata(Model):
@@ -503,13 +400,15 @@ class Suricata(Model):
perf: ListTODO
ssh: ListTODO
tls: ListTODO
alert_log_full_path: Optional[str] = None
dns_log_full_path: Optional[str] = None
eve_log_full_path: Optional[str] = None
file_log_full_path: Optional[str] = None
http_log_full_path: Optional[str] = None
ssh_log_full_path: Optional[str] = None
tls_log_full_path: Optional[str] = None
# paths to log files, not relevant to capa
alert_log_full_path: Skip = None
dns_log_full_path: Skip = None
eve_log_full_path: Skip = None
file_log_full_path: Skip = None
http_log_full_path: Skip = None
ssh_log_full_path: Skip = None
tls_log_full_path: Skip = None
class Target(Model):
@@ -517,36 +416,75 @@ class Target(Model):
file: File
class TTP(Model):
ttp: str
signature: str
class Static(Model):
pe: PE
flare_capa: Skip = None
class CAPE(Model):
payloads: List[ProcessFile]
configs: ListTODO
class CapeReport(Model):
# the input file, I think
target: Target
#
# static analysis results
#
static: Optional[Static] = None
strings: Optional[List[str]] = None
#
# dynamic analysis results
#
# post-processed results: process tree, anomalies, etc
behavior: Behavior
# post-processed results: payloads and extracted configs
CAPE: CAPE
curtain: Optional[TODO] = None
debug: Debug
deduplicated_shots: Optional[List[int]] = None
detections: Optional[str] = None
detections2pid: Optional[Dict[int, List[str]]] = None
network: Network
suricata: Suricata
dropped: List[File]
info: Info
procdump: List[ProcessFile]
procmemory: ListTODO
#
# unknown shapes
#
# seems to have to do with processing powershell logs.
# disabled by default, and i don't see the source on github.
curtain: Optional[TODO] = None
sysmon: Optional[ListTODO] = None
#
# information we won't use in capa
#
# screenshot hash values
deduplicated_shots: Skip = None
# info about the processing job, like machine and distributed metadata.
info: Skip = None
# k-v pairs describing the time it took to run each stage.
statistics: Skip = None
# k-v pairs of ATT&CK ID to signature name or similar.
ttps: Skip = None
# debug log messages
debug: Skip = None
# various signature matches
# we could potentially extend capa to use this info one day,
# though it would be quite sandbox-specific,
# and more detection-oriented than capability detection.
signatures: List[Signature]
malfamily_tag: Optional[str] = None
malscore: float
network: Network
procdump: List[Payload]
procmemory: ListTODO
signatures: List[Signature]
static: Optional[Static] = None
statistics: Optional[Statistics] = None
strings: Optional[List[str]] = None
suricata: Suricata
sysmon: Optional[ListTODO] = None
target: Target
# List[TTP{ttp, signature}] or Dict[ttp, signature]
ttps: Union[List[TTP], Dict[str, str]]
virustotal: Optional[VirusTotal] = None
detections: Optional[str] = None
detections2pid: Optional[Dict[int, List[str]]] = None
# AV detections for the sample.
virustotal: Skip = None
@classmethod
def from_buf(cls, buf: bytes) -> "CapeReport":
@@ -564,9 +502,9 @@ if __name__ == "__main__":
import json
doc = json.loads(buf)
from pprint import pprint
#pprint(doc["target"]["file"]["pe"]["imports"])
# from pprint import pprint
# pprint(doc["network"]["icmp"][225])
report = CapeReport.from_buf(buf)
assert report is not None