Merge branch 'backend-lancelot' into fix-46

This commit is contained in:
William Ballenthin
2020-08-25 12:43:45 -06:00
18 changed files with 1424 additions and 344 deletions

View File

@@ -62,7 +62,8 @@ jobs:
with:
python-version: ${{ matrix.python }}
- name: Install capa
run: pip install -e .[dev]
# TODO: remove `pefile` when we bump lancelot >= 0.3.7
run: pip install -e .[dev] pefile
- name: Run tests
run: pytest tests/

View File

@@ -0,0 +1,92 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import lancelot
import capa.features.extractors
import capa.features.extractors.lancelot.file
import capa.features.extractors.lancelot.insn
import capa.features.extractors.lancelot.function
import capa.features.extractors.lancelot.basicblock
__all__ = ["file", "function", "basicblock", "insn"]
logger = logging.getLogger(__name__)
class BB(object):
"""extend the lancelot.BasicBlock with an __int__ method to access the address"""
def __init__(self, ws, bb):
super(BB, self).__init__()
self.ws = ws
self.address = bb.address
self.length = bb.length
self.predecessors = bb.predecessors
self.successors = bb.successors
def __int__(self):
return self.address
@property
def instructions(self):
va = self.address
while va < self.address + self.length:
try:
insn = self.ws.read_insn(va)
except ValueError:
logger.warning("failed to read instruction at 0x%x", va)
return
yield insn
va += insn.length
class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor):
def __init__(self, buf):
super(LancelotFeatureExtractor, self).__init__()
self.buf = buf
self.ws = lancelot.from_bytes(buf)
self.ctx = {}
def get_base_address(self):
return self.ws.base_address
def extract_file_features(self):
for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf):
yield feature, va
def get_functions(self):
for va in self.ws.get_functions():
# this is just the address of the function
yield va
def extract_function_features(self, f):
for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f):
yield feature, va
def get_basic_blocks(self, f):
try:
cfg = self.ws.build_cfg(f)
except:
logger.warning("failed to build CFG for 0x%x", f)
return
else:
for bb in cfg.basic_blocks.values():
yield BB(self.ws, bb)
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb):
yield feature, va
def get_instructions(self, f, bb):
return bb.instructions
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn):
yield feature, va

View File

@@ -0,0 +1,120 @@
import string
import struct
import logging
from lancelot import (
FLOW_VA,
OPERAND_SIZE,
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
)
from capa.features import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
logger = logging.getLogger(__name__)
def extract_bb_tight_loop(ws, bb):
""" check basic block for tight loop indicators """
if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors):
yield Characteristic("tight loop"), bb.address
def is_mov_imm_to_stack(insn):
if not insn.mnemonic.startswith("mov"):
return False
try:
dst, src = insn.operands
except ValueError:
# not two operands
return False
if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
return False
if src[IMMEDIATE_OPERAND_VALUE] < 0:
return False
if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
return False
if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"):
return False
return True
def is_printable_ascii(chars):
return all(c < 127 and chr(c) in string.printable for c in chars)
def is_printable_utf16le(chars):
if all(c == b"\x00" for c in chars[1::2]):
return is_printable_ascii(chars[::2])
def get_printable_len(operand):
"""
Return string length if all operand bytes are ascii or utf16-le printable
"""
operand_size = operand[OPERAND_SIZE]
if operand_size == 8:
chars = struct.pack("<B", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 16:
chars = struct.pack("<H", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 32:
chars = struct.pack("<I", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 64:
chars = struct.pack("<Q", operand[IMMEDIATE_OPERAND_VALUE])
else:
raise ValueError("unexpected operand size: " + str(operand_size))
if is_printable_ascii(chars):
return operand_size / 8
if is_printable_utf16le(chars):
return operand_size / 16
return 0
def _bb_has_stackstring(ws, bb):
"""
extract potential stackstring creation, using the following heuristics:
- basic block contains enough moves of constant bytes to the stack
"""
count = 0
for insn in bb.instructions:
if is_mov_imm_to_stack(insn):
# add number of operand bytes
src = insn.operands[1]
count += get_printable_len(src)
if count > MIN_STACKSTRING_LEN:
return True
return False
def extract_stackstring(ws, bb):
""" check basic block for stackstring indicators """
if _bb_has_stackstring(ws, bb):
yield Characteristic("stack string"), bb.address
def extract_basic_block_features(ws, bb):
yield BasicBlock(), bb.address
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, va in bb_handler(ws, bb):
yield feature, va
BASIC_BLOCK_HANDLERS = (
extract_bb_tight_loop,
extract_stackstring,
)

View File

@@ -0,0 +1,81 @@
import pefile
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section
def extract_file_embedded_pe(buf, pe):
buf = buf[2:]
total_offset = 2
while True:
try:
offset = buf.index(b"MZ")
except ValueError:
return
else:
rest = buf[offset:]
total_offset += offset
try:
_ = pefile.PE(data=rest)
except:
pass
else:
yield Characteristic("embedded pe"), total_offset
buf = rest[2:]
total_offset += 2
def extract_file_export_names(buf, pe):
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return
base_address = pe.OPTIONAL_HEADER.ImageBase
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
yield Export(exp.name.decode("ascii")), base_address + exp.address
def extract_file_import_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address
else:
impname = imp.name.decode("ascii")
yield Import("%s.%s" % (libname, impname)), imp.address
yield Import("%s" % (impname)), imp.address
def extract_file_section_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for section in pe.sections:
yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress
def extract_file_strings(buf, pe):
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
yield String(s.s), s.offset
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
yield String(s.s), s.offset
def extract_file_features(buf):
pe = pefile.PE(data=buf)
for file_handler in FILE_HANDLERS:
for feature, va in file_handler(buf, pe):
yield feature, va
FILE_HANDLERS = (
extract_file_embedded_pe,
extract_file_export_names,
extract_file_import_names,
extract_file_section_names,
extract_file_strings,
)

View File

@@ -0,0 +1,64 @@
import logging
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
FLOW_VA,
FLOW_TYPE,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
FLOW_TYPE_UNCONDITIONAL_JUMP,
)
from capa.features import Characteristic
from capa.features.extractors import loops
logger = logging.getLogger(__name__)
@lru_cache
def get_call_graph(ws):
return ws.build_call_graph()
def extract_function_calls_to(ws, f):
cg = get_call_graph(ws)
for caller in cg.calls_to.get(f, []):
yield Characteristic("calls to"), caller
def extract_function_loop(ws, f):
edges = []
for bb in ws.build_cfg(f).basic_blocks.values():
for flow in bb.successors:
if flow[FLOW_TYPE] in (
FLOW_TYPE_UNCONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
):
edges.append((bb.address, flow[FLOW_VA]))
continue
if edges and loops.has_loop(edges):
yield Characteristic("loop"), f
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
_not_implemented = set([])
def extract_function_features(ws, f):
for func_handler in FUNCTION_HANDLERS:
try:
for feature, va in func_handler(ws, f):
yield feature, va
except NotImplementedError:
if func_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", func_handler.__name__)
_not_implemented.add(func_handler.__name__)

View File

@@ -0,0 +1,33 @@
from lancelot import (
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
def get_operand_target(insn, op):
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
# call direct, x64
# rip relative
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
if op[MEMORY_OPERAND_BASE] == "rip":
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
# call direct, x32
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
elif op[MEMORY_OPERAND_BASE] == None:
return op[MEMORY_OPERAND_DISP]
# call via thunk
# mimikatz:0x455A41 call LsaQueryInformationPolicy
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
return op[IMMEDIATE_OPERAND_VALUE]
raise ValueError("memory operand has no target")

View File

@@ -0,0 +1,149 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import collections
from lancelot import (
FLOW_VA,
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
from capa.features.extractors.lancelot.helpers import get_operand_target
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
class NotFoundError(Exception):
pass
def read_instructions(ws, bb):
va = bb.address
while va < bb.address + bb.length:
try:
insn = ws.read_insn(va)
except ValueError:
return
yield insn
va += insn.length
def build_instruction_predecessors(ws, cfg):
preds = collections.defaultdict(set)
for bb in cfg.basic_blocks.values():
insns = list(read_instructions(ws, bb))
for i, insn in enumerate(insns):
if i == 0:
for pred in bb.predecessors:
pred_bb = cfg.basic_blocks[pred[FLOW_VA]]
preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address)
else:
preds[insn.address].add(insns[i - 1].address)
return preds
def find_definition(ws, f, insn):
"""
scan backwards from the given address looking for assignments to the given register.
if a constant, return that value.
args:
ws (lancelot.PE)
f (int): the function start address
insn (lancelot.Instruction): call instruction to resolve
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert insn.mnemonic == "call"
op0 = insn.operands[0]
assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
reg = op0[REGISTER_OPERAND_REGISTER]
cfg = ws.build_cfg(f)
preds = build_instruction_predecessors(ws, cfg)
q = collections.deque()
seen = set([])
q.extend(preds[insn.address])
while q:
cur = q.popleft()
# skip if we've already processed this location
if cur in seen:
continue
seen.add(cur)
insn = ws.read_insn(cur)
operands = insn.operands
if len(operands) == 0:
q.extend(preds[cur])
continue
op0 = operands[0]
if not (
op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and op0[REGISTER_OPERAND_REGISTER] == reg
and insn.mnemonic in DESTRUCTIVE_MNEMONICS
):
q.extend(preds[cur])
continue
# if we reach here, the instruction is destructive to our target register.
# we currently only support extracting the constant from something like: `mov $reg, IAT`
# so, any other pattern results in an unknown value, represented by None.
# this is a good place to extend in the future, if we need more robust support.
if insn.mnemonic != "mov":
return (cur, None)
else:
op1 = operands[1]
try:
target = get_operand_target(insn, op1)
except ValueError:
return (cur, None)
else:
return (cur, target)
raise NotFoundError()
def is_indirect_call(insn):
return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
def resolve_indirect_call(ws, f, insn):
"""
inspect the given indirect call instruction and attempt to resolve the target address.
args:
ws (lancelot.PE): the analysis workspace
f (int): the address of the function to analyze
insn (lancelot.Instruction): the instruction at which to start analysis
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert is_indirect_call(insn)
return find_definition(ws, f, insn)

View File

@@ -0,0 +1,487 @@
import logging
import itertools
import pefile
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
import capa.features.extractors.helpers
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
from capa.features.insn import Number, Offset, Mnemonic
from capa.features.extractors.lancelot.helpers import get_operand_target
from capa.features.extractors.lancelot.function import get_call_graph
from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call
logger = logging.getLogger(__name__)
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def get_arch(ws):
if ws.arch == "x32":
return ARCH_X32
elif ws.arch == "x64":
return ARCH_X64
else:
raise ValueError("unexpected architecture")
@lru_cache
def get_pefile(xtor):
return pefile.PE(data=xtor.buf)
@lru_cache
def get_imports(xtor):
pe = get_pefile(xtor)
imports = {}
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
imports[imp.address] = "%s.#%s" % (libname, imp.ordinal)
else:
impname = imp.name.decode("ascii")
imports[imp.address] = "%s.%s" % (libname, impname)
return imports
@lru_cache
def get_thunks(xtor):
thunks = {}
for va in xtor.ws.get_functions():
try:
insn = xtor.ws.read_insn(va)
except ValueError:
continue
if insn.mnemonic != "jmp":
continue
op0 = insn.operands[0]
try:
target = get_operand_target(insn, op0)
except ValueError:
continue
imports = get_imports(xtor)
if target not in imports:
continue
thunks[va] = imports[target]
return thunks
def extract_insn_api_features(xtor, f, bb, insn):
"""parse API features from the given instruction."""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
try:
(_, target) = resolve_indirect_call(xtor.ws, f, insn)
except NotFoundError:
return
if target is None:
return
else:
try:
target = get_operand_target(insn, op0)
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
yield feature, va
return
thunks = get_thunks(xtor)
if target in thunks:
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
yield feature, va
def extract_insn_mnemonic_features(xtor, f, bb, insn):
"""parse mnemonic features from the given instruction."""
yield Mnemonic(insn.mnemonic), insn.address
def extract_insn_number_features(xtor, f, bb, insn):
"""parse number features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
continue
v = operand[IMMEDIATE_OPERAND_VALUE]
if xtor.ws.probe(v) & PERMISSION_READ:
# v is a valid address
# therefore, assume its not also a constant.
continue
if (
insn.mnemonic == "add"
and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and operands[0][REGISTER_OPERAND_REGISTER] == "esp"
):
# skip things like:
#
# .text:00401140 call sub_407E2B
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.address
yield Number(v, arch=get_arch(xtor.ws)), insn.address
def extract_insn_offset_features(xtor, f, bb, insn):
"""parse structure offset features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
continue
if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"):
continue
# lancelot provides `None` when the displacement is not present.
v = operand[MEMORY_OPERAND_DISP] or 0
yield Offset(v), insn.address
yield Offset(v, arch=get_arch(xtor.ws)), insn.address
def derefs(xtor, p):
"""
recursively follow the given pointer, yielding the valid memory addresses along the way.
useful when you may have a pointer to string, or pointer to pointer to string, etc.
this is a "do what i mean" type of helper function.
"""
depth = 0
while True:
if not xtor.ws.probe(p) & PERMISSION_READ:
return
yield p
next = xtor.ws.read_pointer(p)
# sanity: pointer points to self
if next == p:
return
# sanity: avoid chains of pointers that are unreasonably deep
depth += 1
if depth > 10:
return
p = next
def read_bytes(xtor, va):
"""
read up to MAX_BYTES_FEATURE_SIZE from the given address.
raises:
ValueError: if the given address is not valid.
"""
start = va
end = va + MAX_BYTES_FEATURE_SIZE
pe = get_pefile(xtor)
for section in pe.sections:
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= start < section_end:
end = min(end, section_end)
return xtor.ws.read_bytes(start, end - start)
raise ValueError("invalid address")
# these are mnemonics that may flow (jump) elsewhere
FLOW_MNEMONICS = set(
[
"call",
"jb",
"jbe",
"jcxz",
"jecxz",
"jknzd",
"jkzd",
"jl",
"jle",
"jmp",
"jnb",
"jnbe",
"jnl",
"jnle",
"jno",
"jnp",
"jns",
"jnz",
"jo",
"jp",
"jrcxz",
"js",
"jz",
]
)
def extract_insn_bytes_features(xtor, f, bb, insn):
"""
parse byte sequence features from the given instruction.
"""
if insn.mnemonic in FLOW_MNEMONICS:
return
for operand in insn.operands:
try:
target = get_operand_target(insn, operand)
except ValueError:
continue
for ptr in derefs(xtor, target):
try:
buf = read_bytes(xtor, ptr)
except ValueError:
continue
if capa.features.extractors.helpers.all_zeros(buf):
continue
yield Bytes(buf), insn.address
def first(s):
"""enumerate the first element in the sequence"""
for i in s:
yield i
break
def extract_insn_string_features(xtor, f, bb, insn):
"""parse string features from the given instruction."""
for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn):
buf = bytez.value
for s in itertools.chain(
first(capa.features.extractors.strings.extract_ascii_strings(buf)),
first(capa.features.extractors.strings.extract_unicode_strings(buf)),
):
if s.offset == 0:
yield String(s.s), va
def is_security_cookie(xtor, f, bb, insn):
"""
check if an instruction is related to security cookie checks
"""
op1 = insn.operands[1]
if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in (
"esp",
"ebp",
"rbp",
"rsp",
):
return False
# expect security cookie init in first basic block within first bytes (instructions)
if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
insns = list(xtor.get_instructions(f, bb))
if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA):
return True
return False
def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn):
"""
parse non-zeroing XOR instruction from the given instruction.
ignore expected non-zeroing XORs, e.g. security cookies.
"""
if insn.mnemonic != "xor":
return
operands = insn.operands
if operands[0] == operands[1]:
return
if is_security_cookie(xtor, f, bb, insn):
return
yield Characteristic("nzxor"), insn.address
def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn):
"""
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
"""
for operand in insn.operands:
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "gs"
and operand[MEMORY_OPERAND_DISP] == 0x60
):
yield Characteristic("peb access"), insn.address
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "fs"
and operand[MEMORY_OPERAND_DISP] == 0x30
):
yield Characteristic("peb access"), insn.address
def extract_insn_segment_access_features(xtor, f, bb, insn):
""" parse the instruction for access to fs or gs """
for operand in insn.operands:
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs":
yield Characteristic("gs access"), insn.address
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs":
yield Characteristic("fs access"), insn.address
def get_section(xtor, va):
pe = get_pefile(xtor)
for i, section in enumerate(pe.sections):
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= va < section_end:
return i
raise ValueError("invalid address")
def extract_insn_cross_section_cflow(xtor, f, bb, insn):
"""
inspect the instruction for a CALL or JMP that crosses section boundaries.
"""
if insn.mnemonic not in FLOW_MNEMONICS:
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
if target in get_imports(xtor):
return
try:
if get_section(xtor, insn.address) != get_section(xtor, target):
yield Characteristic("cross section flow"), insn.address
except ValueError:
return
def extract_function_calls_from(xtor, f, bb, insn):
cg = get_call_graph(xtor.ws)
for callee in cg.calls_from.get(insn.address, []):
yield Characteristic("calls from"), callee
if callee == f:
yield Characteristic("recursive call"), insn.address
# lancelot doesn't count API calls when constructing the call graph
# so we still have to scan for calls to an import
if insn.mnemonic != "call":
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
yield Characteristic("calls from"), target
# this is a feature that's most relevant at the function or basic block scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn):
"""
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None:
yield Characteristic("indirect call"), insn.address
_not_implemented = set([])
def extract_insn_features(xtor, f, bb, insn):
for insn_handler in INSTRUCTION_HANDLERS:
try:
for feature, va in insn_handler(xtor, f, bb, insn):
yield feature, va
except NotImplementedError:
if insn_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", insn_handler.__name__)
_not_implemented.add(insn_handler.__name__)
INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_string_features,
extract_insn_bytes_features,
extract_insn_offset_features,
extract_insn_nzxor_characteristic_features,
extract_insn_mnemonic_features,
extract_insn_peb_access_characteristic_features,
extract_insn_cross_section_cflow,
extract_insn_segment_access_features,
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
)

View File

@@ -8,11 +8,7 @@
import types
import file
import insn
import function
import viv_utils
import basicblock
import capa.features.extractors
import capa.features.extractors.viv.file

View File

@@ -24,7 +24,10 @@ class Number(Feature):
super(Number, self).__init__(value, arch=arch, description=description)
def get_value_str(self):
return "0x%X" % self.value
if self.value < 0:
return "-0x%X" % (-self.value)
else:
return "0x%X" % self.value
class Offset(Feature):

View File

@@ -32,7 +32,7 @@ import capa.features.extractors
from capa.helpers import oint, get_file_taste
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SUPPORTED_FILE_MAGIC = set(["MZ"])
SUPPORTED_FILE_MAGIC = set([b"MZ"])
logger = logging.getLogger("capa")
@@ -290,7 +290,24 @@ class UnsupportedRuntimeError(RuntimeError):
def get_extractor_py3(path, format, disable_progress=False):
raise UnsupportedRuntimeError()
try:
import lancelot
import capa.features.extractors.lancelot
except ImportError:
logger.warning("lancelot not installed")
raise UnsupportedRuntimeError()
if format not in ("pe", "auto"):
raise UnsupportedFormatError(format)
if not is_supported_file_type(path):
raise UnsupportedFormatError()
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
def get_extractor(path, format, disable_progress=False):

View File

@@ -1,112 +1,112 @@
"""
Binary Ninja plugin that imports a capa report,
produced via `capa --json /path/to/sample`,
into the current database.
It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame
public UninstallService
UninstallService proc near
...
To use, invoke from the Binary Ninja Tools menu, or from the
command-palette.
Adapted for Binary Ninja by @psifertex
This script will verify that the report matches the workspace.
Check the log window for any errors, and/or the summary of changes.
Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py
"""
import os
import json
from binaryninja import *
def append_func_cmt(bv, va, cmt):
"""
add the given comment to the given function,
if it doesn't already exist.
"""
func = bv.get_function_at(va)
if not func:
raise ValueError("not a function")
if cmt in func.comment:
return
func.comment = func.comment + "\n" + cmt
def load_analysis(bv):
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
dirname = os.path.dirname(bv.file.filename)
log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
path = os.path.join(dirname, shortname + ".js")
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
path = os.path.join(dirname, shortname + ".json")
else:
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path, os.R_OK):
log_error("Invalid filename.")
return 0
log_info("Using capa file %s" % path)
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
log_error("doesn't appear to be a capa report")
return -1
a = doc["meta"]["sample"]["md5"].lower()
md5 = Transform["MD5"]
rawhex = Transform["RawHex"]
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
if not a == b:
log_error("sample mismatch")
return -2
rows = []
for rule in doc["rules"].values():
if rule["meta"].get("lib"):
continue
if rule["meta"].get("capa/subscope"):
continue
if rule["meta"]["scope"] != "function":
continue
name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys():
va = int(va)
rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together
rows = sorted(rows)
for ns, name, va in rows:
if ns:
cmt = "%s (%s)" % (name, ns)
else:
cmt = "%s" % (name,)
log_info("0x%x: %s" % (va, cmt))
try:
# message will look something like:
#
# capa: delete service (host-interaction/service/delete)
append_func_cmt(bv, va, "capa: " + cmt)
except ValueError:
continue
log_info("ok")
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)
"""
Binary Ninja plugin that imports a capa report,
produced via `capa --json /path/to/sample`,
into the current database.
It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame
public UninstallService
UninstallService proc near
...
To use, invoke from the Binary Ninja Tools menu, or from the
command-palette.
Adapted for Binary Ninja by @psifertex
This script will verify that the report matches the workspace.
Check the log window for any errors, and/or the summary of changes.
Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py
"""
import os
import json
from binaryninja import *
def append_func_cmt(bv, va, cmt):
"""
add the given comment to the given function,
if it doesn't already exist.
"""
func = bv.get_function_at(va)
if not func:
raise ValueError("not a function")
if cmt in func.comment:
return
func.comment = func.comment + "\n" + cmt
def load_analysis(bv):
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
dirname = os.path.dirname(bv.file.filename)
log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
path = os.path.join(dirname, shortname + ".js")
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
path = os.path.join(dirname, shortname + ".json")
else:
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path, os.R_OK):
log_error("Invalid filename.")
return 0
log_info("Using capa file %s" % path)
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
log_error("doesn't appear to be a capa report")
return -1
a = doc["meta"]["sample"]["md5"].lower()
md5 = Transform["MD5"]
rawhex = Transform["RawHex"]
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
if not a == b:
log_error("sample mismatch")
return -2
rows = []
for rule in doc["rules"].values():
if rule["meta"].get("lib"):
continue
if rule["meta"].get("capa/subscope"):
continue
if rule["meta"]["scope"] != "function":
continue
name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys():
va = int(va)
rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together
rows = sorted(rows)
for ns, name, va in rows:
if ns:
cmt = "%s (%s)" % (name, ns)
else:
cmt = "%s" % (name,)
log_info("0x%x: %s" % (va, cmt))
try:
# message will look something like:
#
# capa: delete service (host-interaction/service/delete)
append_func_cmt(bv, va, "capa: " + cmt)
except ValueError:
continue
log_info("ok")
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)

View File

@@ -1,117 +1,117 @@
"""
IDA Pro script that imports a capa report,
produced via `capa --json /path/to/sample`,
into the current database.
It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame
public UninstallService
UninstallService proc near
...
To use, invoke from the IDA Pro scripting dialog,
such as via Alt-F9,
and then select the existing capa report from the file system.
This script will verify that the report matches the workspace.
Check the output window for any errors, and/or the summary of changes.
Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import json
import logging
import idc
import idautils
import ida_idaapi
import ida_kernwin
logger = logging.getLogger("capa")
def append_func_cmt(va, cmt, repeatable=False):
"""
add the given comment to the given function,
if it doesn't already exist.
"""
func = ida_funcs.get_func(va)
if not func:
raise ValueError("not a function")
existing = ida_funcs.get_func_cmt(func, repeatable) or ""
if cmt in existing:
return
new = existing + "\n" + cmt
ida_funcs.set_func_cmt(func, new, repeatable)
def main():
path = ida_kernwin.ask_file(False, "*", "capa report")
if not path:
return 0
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
logger.error("doesn't appear to be a capa report")
return -1
# in IDA 7.4, the MD5 hash may be truncated, for example:
# wanted: 84882c9d43e23d63b82004fae74ebb61
# found: b'84882C9D43E23D63B82004FAE74EBB6\x00'
#
# see: https://github.com/idapython/bin/issues/11
a = doc["meta"]["sample"]["md5"].lower()
b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00")
if not a.startswith(b):
logger.error("sample mismatch")
return -2
rows = []
for rule in doc["rules"].values():
if rule["meta"].get("lib"):
continue
if rule["meta"].get("capa/subscope"):
continue
if rule["meta"]["scope"] != "function":
continue
name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys():
va = int(va)
rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together
rows = sorted(rows)
for ns, name, va in rows:
if ns:
cmt = "%s (%s)" % (name, ns)
else:
cmt = "%s" % (name,)
logger.info("0x%x: %s", va, cmt)
try:
# message will look something like:
#
# capa: delete service (host-interaction/service/delete)
append_func_cmt(va, "capa: " + cmt, repeatable=False)
except ValueError:
continue
logger.info("ok")
main()
"""
IDA Pro script that imports a capa report,
produced via `capa --json /path/to/sample`,
into the current database.
It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame
public UninstallService
UninstallService proc near
...
To use, invoke from the IDA Pro scripting dialog,
such as via Alt-F9,
and then select the existing capa report from the file system.
This script will verify that the report matches the workspace.
Check the output window for any errors, and/or the summary of changes.
Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import json
import logging
import idc
import idautils
import ida_idaapi
import ida_kernwin
logger = logging.getLogger("capa")
def append_func_cmt(va, cmt, repeatable=False):
"""
add the given comment to the given function,
if it doesn't already exist.
"""
func = ida_funcs.get_func(va)
if not func:
raise ValueError("not a function")
existing = ida_funcs.get_func_cmt(func, repeatable) or ""
if cmt in existing:
return
new = existing + "\n" + cmt
ida_funcs.set_func_cmt(func, new, repeatable)
def main():
path = ida_kernwin.ask_file(False, "*", "capa report")
if not path:
return 0
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
logger.error("doesn't appear to be a capa report")
return -1
# in IDA 7.4, the MD5 hash may be truncated, for example:
# wanted: 84882c9d43e23d63b82004fae74ebb61
# found: b'84882C9D43E23D63B82004FAE74EBB6\x00'
#
# see: https://github.com/idapython/bin/issues/11
a = doc["meta"]["sample"]["md5"].lower()
b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00")
if not a.startswith(b):
logger.error("sample mismatch")
return -2
rows = []
for rule in doc["rules"].values():
if rule["meta"].get("lib"):
continue
if rule["meta"].get("capa/subscope"):
continue
if rule["meta"]["scope"] != "function":
continue
name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys():
va = int(va)
rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together
rows = sorted(rows)
for ns, name, va in rows:
if ns:
cmt = "%s (%s)" % (name, ns)
else:
cmt = "%s" % (name,)
logger.info("0x%x: %s", va, cmt)
try:
# message will look something like:
#
# capa: delete service (host-interaction/service/delete)
append_func_cmt(va, "capa: " + cmt, repeatable=False)
except ValueError:
continue
logger.info("ok")
main()

View File

@@ -17,6 +17,7 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r
if sys.version_info >= (3, 0):
# py3
requirements.append("networkx")
requirements.append("pylancelot~=0.3.6")
else:
# py2
requirements.append("enum34")
@@ -54,7 +55,7 @@ setuptools.setup(
"pycodestyle",
"black ; python_version>'3.0'",
"isort",
]
],
},
zip_safe=False,
keywords="capa",

View File

@@ -80,6 +80,16 @@ def get_viv_extractor(path):
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
@lru_cache
def get_lancelot_extractor(path):
import capa.features.extractors.lancelot
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
@lru_cache()
def extract_file_features(extractor):
features = collections.defaultdict(set)
@@ -429,7 +439,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
def get_extractor(path):
if sys.version_info >= (3, 0):
raise RuntimeError("no supported py3 backends yet")
extractor = get_lancelot_extractor(path)
else:
extractor = get_viv_extractor(path)

View File

@@ -1,104 +1,104 @@
# run this script from within IDA with ./tests/data/mimikatz.exe open
import sys
import logging
import os.path
import binascii
import traceback
import pytest
try:
sys.path.append(os.path.dirname(__file__))
from fixtures import *
finally:
sys.path.pop()
logger = logging.getLogger("test_ida_features")
def check_input_file(wanted):
import idautils
# some versions (7.4) of IDA return a truncated version of the MD5.
# https://github.com/idapython/bin/issues/11
try:
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
except UnicodeDecodeError:
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
# rather than the hex digest
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
if not wanted.startswith(found):
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida
return capa.features.extractors.ida.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
if __name__ == "__main__":
print("-" * 80)
# invoke all functions in this module that start with `test_`
for name in dir(sys.modules[__name__]):
if not name.startswith("test_"):
continue
test = getattr(sys.modules[__name__], name)
logger.debug("invoking test: %s", name)
sys.stderr.flush()
test()
print("DONE")
# run this script from within IDA with ./tests/data/mimikatz.exe open
import sys
import logging
import os.path
import binascii
import traceback
import pytest
try:
sys.path.append(os.path.dirname(__file__))
from fixtures import *
finally:
sys.path.pop()
logger = logging.getLogger("test_ida_features")
def check_input_file(wanted):
import idautils
# some versions (7.4) of IDA return a truncated version of the MD5.
# https://github.com/idapython/bin/issues/11
try:
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
except UnicodeDecodeError:
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
# rather than the hex digest
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
if not wanted.startswith(found):
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida
return capa.features.extractors.ida.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
else:
print("OK %s" % (id))
if __name__ == "__main__":
print("-" * 80)
# invoke all functions in this module that start with `test_`
for name in dir(sys.modules[__name__]):
if not name.startswith("test_"):
continue
test = getattr(sys.modules[__name__], name)
logger.debug("invoking test: %s", name)
sys.stderr.flush()
test()
print("DONE")

View File

@@ -0,0 +1,26 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from fixtures import *
@parametrize(
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_features(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected)
@parametrize(
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_feature_counts(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected)

View File

@@ -47,7 +47,7 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces")
def test_main_shellcode(z499c2_extractor):
path = z499c2_extractor.path
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0