mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 23:59:48 -08:00
Merge branch 'master' into ida-test-runner
This commit is contained in:
10
CHANGELOG.md
10
CHANGELOG.md
@@ -1,13 +1,15 @@
|
||||
# Change Log
|
||||
|
||||
## master (unreleased)
|
||||
- extract function and API names from ELF symtab entries @yelhamer https://github.com/mandiant/capa-rules/issues/736
|
||||
|
||||
### New Features
|
||||
- Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
||||
|
||||
### Breaking Changes
|
||||
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat
|
||||
|
||||
### New Rules (7)
|
||||
### New Rules (9)
|
||||
|
||||
- load-code/shellcode/execute-shellcode-via-windows-callback-function ervin.ocampo@mandiant.com jakub.jozwiak@mandiant.com
|
||||
- nursery/execute-shellcode-via-indirect-call ronnie.salomonsen@mandiant.com
|
||||
@@ -16,9 +18,13 @@
|
||||
- communication/mailslot/read-from-mailslot nick.simonian@mandiant.com
|
||||
- nursery/hash-data-using-sha512managed-in-dotnet jonathanlepore@google.com
|
||||
- nursery/compiled-with-exescript jonathanlepore@google.com
|
||||
- nursery/check-for-sandbox-via-mac-address-ouis-in-dotnet jonathanlepore@google.com
|
||||
- host-interaction/hardware/enumerate-devices-by-category @mr-tz
|
||||
-
|
||||
|
||||
### Bug Fixes
|
||||
- extractor: add a Binary Ninja test that asserts its version #1487 @xusheng6
|
||||
- extractor: update Binary Ninja stack string detection after the new constant outlining feature #1473 @xusheng6
|
||||
- extractor: update vivisect Arch extraction #1334 @mr-tz
|
||||
- extractor: avoid Binary Ninja exception when analyzing certain files #1441 @xusheng6
|
||||
- symtab: fix struct.unpack() format for 64-bit ELF files @yelhamer
|
||||
@@ -83,12 +89,14 @@ Thanks for all the support, especially to @xusheng6, @captainGeech42, @ggold7046
|
||||
- nursery/contain-a-thread-local-storage-tls-section-in-dotnet michael.hunhoff@mandiant.com
|
||||
|
||||
### Bug Fixes
|
||||
- extractor: interface of cache modified to prevent extracting file and global features multiple times @stevemk14ebr
|
||||
- extractor: removed '.dynsym' as the library name for ELF imports #1318 @stevemk14ebr
|
||||
- extractor: fix vivisect loop detection corner case #1310 @mr-tz
|
||||
- match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff
|
||||
- extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
- rule generator plugin now loads faster when jumping between functions @stevemk14ebr
|
||||
- fix exception when plugin loaded in IDA hosted under idat #1341 @mike-hunhoff
|
||||
- improve embedded PE detection performance and reduce FP potential #1344 @mike-hunhoff
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
[](https://pypi.org/project/flare-capa)
|
||||
[](https://github.com/mandiant/capa/releases)
|
||||
[](https://github.com/mandiant/capa-rules)
|
||||
[](https://github.com/mandiant/capa-rules)
|
||||
[](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
|
||||
[](https://github.com/mandiant/capa/releases)
|
||||
[](LICENSE.txt)
|
||||
|
||||
@@ -11,10 +11,13 @@ import string
|
||||
import struct
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
from binaryninja import Function
|
||||
from binaryninja import Function, Settings
|
||||
from binaryninja import BasicBlock as BinjaBasicBlock
|
||||
from binaryninja import (
|
||||
BinaryView,
|
||||
DataBuffer,
|
||||
SymbolType,
|
||||
RegisterValueType,
|
||||
VariableSourceType,
|
||||
MediumLevelILSetVar,
|
||||
MediumLevelILOperation,
|
||||
@@ -28,6 +31,66 @@ from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
|
||||
|
||||
use_const_outline: bool = False
|
||||
settings: Settings = Settings()
|
||||
if settings.contains("analysis.outlining.builtins") and settings.get_bool("analysis.outlining.builtins"):
|
||||
use_const_outline = True
|
||||
|
||||
|
||||
def get_printable_len_ascii(s: bytes) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
count = 0
|
||||
for c in s:
|
||||
if c == 0:
|
||||
return count
|
||||
if c < 127 and chr(c) in string.printable:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def get_printable_len_wide(s: bytes) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
if all(c == 0x00 for c in s[1::2]):
|
||||
return get_printable_len_ascii(s[::2])
|
||||
return 0
|
||||
|
||||
|
||||
def get_stack_string_len(f: Function, il: MediumLevelILInstruction) -> int:
|
||||
bv: BinaryView = f.view
|
||||
|
||||
if il.operation != MediumLevelILOperation.MLIL_CALL:
|
||||
return 0
|
||||
|
||||
target = il.dest
|
||||
if target.operation not in [MediumLevelILOperation.MLIL_CONST, MediumLevelILOperation.MLIL_CONST_PTR]:
|
||||
return 0
|
||||
|
||||
addr = target.value.value
|
||||
sym = bv.get_symbol_at(addr)
|
||||
if not sym or sym.type != SymbolType.LibraryFunctionSymbol:
|
||||
return 0
|
||||
|
||||
if sym.name not in ["__builtin_strncpy", "__builtin_strcpy", "__builtin_wcscpy"]:
|
||||
return 0
|
||||
|
||||
if len(il.params) < 2:
|
||||
return 0
|
||||
|
||||
dest = il.params[0]
|
||||
if dest.operation != MediumLevelILOperation.MLIL_ADDRESS_OF:
|
||||
return 0
|
||||
|
||||
var = dest.src
|
||||
if var.source_type != VariableSourceType.StackVariableSourceType:
|
||||
return 0
|
||||
|
||||
src = il.params[1]
|
||||
if src.value.type != RegisterValueType.ConstantDataAggregateValue:
|
||||
return 0
|
||||
|
||||
s = f.get_constant_data(RegisterValueType.ConstantDataAggregateValue, src.value.value)
|
||||
return max(get_printable_len_ascii(bytes(s)), get_printable_len_wide(bytes(s)))
|
||||
|
||||
|
||||
def get_printable_len(il: MediumLevelILSetVar) -> int:
|
||||
"""Return string length if all operand bytes are ascii or utf16-le printable"""
|
||||
@@ -82,8 +145,11 @@ def bb_contains_stackstring(f: Function, bb: MediumLevelILBasicBlock) -> bool:
|
||||
"""
|
||||
count = 0
|
||||
for il in bb:
|
||||
if is_mov_imm_to_stack(il):
|
||||
count += get_printable_len(il)
|
||||
if use_const_outline:
|
||||
count += get_stack_string_len(f, il)
|
||||
else:
|
||||
if is_mov_imm_to_stack(il):
|
||||
count += get_printable_len(il)
|
||||
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
return True
|
||||
|
||||
@@ -91,6 +91,20 @@ class Shdr:
|
||||
entsize: int
|
||||
buf: bytes
|
||||
|
||||
@classmethod
|
||||
def from_viv(cls, section, buf: bytes) -> "Shdr":
|
||||
return cls(
|
||||
section.sh_name,
|
||||
section.sh_type,
|
||||
section.sh_flags,
|
||||
section.sh_addr,
|
||||
section.sh_offset,
|
||||
section.sh_size,
|
||||
section.sh_link,
|
||||
section.sh_entsize,
|
||||
buf,
|
||||
)
|
||||
|
||||
|
||||
class ELF:
|
||||
def __init__(self, f: BinaryIO):
|
||||
@@ -695,6 +709,29 @@ class SymTab:
|
||||
for symbol in self.symbols:
|
||||
yield symbol
|
||||
|
||||
@classmethod
|
||||
def from_Elf(cls, ElfBinary) -> Optional["SymTab"]:
|
||||
endian = "<" if ElfBinary.getEndian() == 0 else ">"
|
||||
bitness = ElfBinary.bits
|
||||
|
||||
SHT_SYMTAB = 0x2
|
||||
for section in ElfBinary.sections:
|
||||
if section.sh_info & SHT_SYMTAB:
|
||||
strtab_section = ElfBinary.sections[section.sh_link]
|
||||
sh_symtab = Shdr.from_viv(section, ElfBinary.readAtOffset(section.sh_offset, section.sh_size))
|
||||
sh_strtab = Shdr.from_viv(
|
||||
strtab_section, ElfBinary.readAtOffset(strtab_section.sh_offset, strtab_section.sh_size)
|
||||
)
|
||||
|
||||
try:
|
||||
return cls(endian, bitness, sh_symtab, sh_strtab)
|
||||
except NameError:
|
||||
return None
|
||||
except:
|
||||
# all exceptions that could be encountered by
|
||||
# cls._parse() imply a faulty symbol's table.
|
||||
raise CorruptElfFile("malformed symbol's table")
|
||||
|
||||
|
||||
def guess_os_from_osabi(elf: ELF) -> Optional[OS]:
|
||||
return elf.ei_osabi
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import logging
|
||||
from typing import List, Tuple, Iterator
|
||||
from typing import Any, Dict, List, Tuple, Iterator
|
||||
|
||||
import viv_utils
|
||||
import viv_utils.flirt
|
||||
@@ -49,8 +49,11 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
||||
yield from capa.features.extractors.viv.file.extract_features(self.vw, self.buf)
|
||||
|
||||
def get_functions(self) -> Iterator[FunctionHandle]:
|
||||
cache: Dict[str, Any] = {}
|
||||
for va in sorted(self.vw.getFunctions()):
|
||||
yield FunctionHandle(address=AbsoluteVirtualAddress(va), inner=viv_utils.Function(self.vw, va))
|
||||
yield FunctionHandle(
|
||||
address=AbsoluteVirtualAddress(va), inner=viv_utils.Function(self.vw, va), ctx={"cache": cache}
|
||||
)
|
||||
|
||||
def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
yield from capa.features.extractors.viv.function.extract_features(fh)
|
||||
|
||||
@@ -11,9 +11,11 @@ import envi
|
||||
import viv_utils
|
||||
import vivisect.const
|
||||
|
||||
from capa.features.file import FunctionName
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors import loops
|
||||
from capa.features.extractors.elf import SymTab
|
||||
from capa.features.extractors.base_extractor import FunctionHandle
|
||||
|
||||
|
||||
@@ -30,6 +32,28 @@ def interface_extract_function_XXX(fh: FunctionHandle) -> Iterator[Tuple[Feature
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def extract_function_symtab_names(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
if fh.inner.vw.metadata["Format"] == "elf":
|
||||
# the file's symbol table gets added to the metadata of the vivisect workspace.
|
||||
# this is in order to eliminate the computational overhead of refetching symtab each time.
|
||||
if "symtab" not in fh.ctx["cache"]:
|
||||
try:
|
||||
fh.ctx["cache"]["symtab"] = SymTab.from_Elf(fh.inner.vw.parsedbin)
|
||||
except:
|
||||
fh.ctx["cache"]["symtab"] = None
|
||||
|
||||
symtab = fh.ctx["cache"]["symtab"]
|
||||
if symtab:
|
||||
for symbol in symtab.get_symbols():
|
||||
sym_name = symtab.get_name(symbol)
|
||||
sym_value = symbol.value
|
||||
sym_info = symbol.info
|
||||
|
||||
STT_FUNC = 0x2
|
||||
if sym_value == fh.address and sym_info & STT_FUNC != 0:
|
||||
yield FunctionName(sym_name), fh.address
|
||||
|
||||
|
||||
def extract_function_calls_to(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
f: viv_utils.Function = fhandle.inner
|
||||
for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE):
|
||||
@@ -79,4 +103,8 @@ def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
FUNCTION_HANDLERS = (
|
||||
extract_function_symtab_names,
|
||||
extract_function_calls_to,
|
||||
extract_function_loop,
|
||||
)
|
||||
|
||||
@@ -19,9 +19,11 @@ import envi.archs.amd64.disasm
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
import capa.features.extractors.viv.helpers
|
||||
from capa.features.file import FunctionName
|
||||
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors.elf import Shdr, SymTab
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
|
||||
from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call
|
||||
|
||||
@@ -109,6 +111,26 @@ def extract_insn_api_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterato
|
||||
if not target:
|
||||
return
|
||||
|
||||
if f.vw.metadata["Format"] == "elf":
|
||||
if "symtab" not in fh.ctx["cache"]:
|
||||
# the symbol table gets stored as a function's attribute in order to avoid running
|
||||
# this code everytime the call is made, thus preventing the computational overhead.
|
||||
try:
|
||||
fh.ctx["cache"]["symtab"] = SymTab.from_Elf(f.vw.parsedbin)
|
||||
except:
|
||||
fh.ctx["cache"]["symtab"] = None
|
||||
|
||||
symtab = fh.ctx["cache"]["symtab"]
|
||||
if symtab:
|
||||
for symbol in symtab.get_symbols():
|
||||
sym_name = symtab.get_name(symbol)
|
||||
sym_value = symbol.value
|
||||
sym_info = symbol.info
|
||||
|
||||
STT_FUNC = 0x2
|
||||
if sym_value == target and sym_info & STT_FUNC != 0:
|
||||
yield API(sym_name), ih.address
|
||||
|
||||
if viv_utils.flirt.is_library_function(f.vw, target):
|
||||
name = viv_utils.get_function_name(f.vw, target)
|
||||
yield API(name), ih.address
|
||||
|
||||
@@ -22,7 +22,8 @@ import capa
|
||||
import capa.version
|
||||
import capa.render.utils as rutils
|
||||
import capa.features.common
|
||||
import capa.render.result_document
|
||||
import capa.features.freeze
|
||||
import capa.render.result_document as rdoc
|
||||
from capa.features.address import AbsoluteVirtualAddress
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
@@ -140,37 +141,35 @@ def collect_metadata(rules):
|
||||
else:
|
||||
os = "unknown os"
|
||||
|
||||
return {
|
||||
"timestamp": datetime.datetime.now().isoformat(),
|
||||
"argv": [],
|
||||
"sample": {
|
||||
"md5": md5,
|
||||
"sha1": "", # not easily accessible
|
||||
"sha256": sha256,
|
||||
"path": idaapi.get_input_file_path(),
|
||||
},
|
||||
"analysis": {
|
||||
"format": idaapi.get_file_type_name(),
|
||||
"arch": arch,
|
||||
"os": os,
|
||||
"extractor": "ida",
|
||||
"rules": rules,
|
||||
"base_address": idaapi.get_imagebase(),
|
||||
"layout": {
|
||||
return rdoc.Metadata(
|
||||
timestamp=datetime.datetime.now(),
|
||||
version=capa.version.__version__,
|
||||
argv=(),
|
||||
sample=rdoc.Sample(
|
||||
md5=md5,
|
||||
sha1="", # not easily accessible
|
||||
sha256=sha256,
|
||||
path=idaapi.get_input_file_path(),
|
||||
),
|
||||
analysis=rdoc.Analysis(
|
||||
format=idaapi.get_file_type_name(),
|
||||
arch=arch,
|
||||
os=os,
|
||||
extractor="ida",
|
||||
rules=rules,
|
||||
base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
|
||||
layout=rdoc.Layout(
|
||||
functions=tuple()
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
},
|
||||
),
|
||||
# ignore these for now - not used by IDA plugin.
|
||||
"feature_counts": {
|
||||
"file": {},
|
||||
"functions": {},
|
||||
},
|
||||
"library_functions": {},
|
||||
},
|
||||
"version": capa.version.__version__,
|
||||
}
|
||||
feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
|
||||
library_functions=tuple(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class IDAIO:
|
||||
@@ -217,12 +216,12 @@ def idb_contains_cached_results() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def load_and_verify_cached_results() -> Optional[capa.render.result_document.ResultDocument]:
|
||||
def load_and_verify_cached_results() -> Optional[rdoc.ResultDocument]:
|
||||
"""verifies that cached results have valid (mapped) addresses for the current database"""
|
||||
logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE)
|
||||
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
|
||||
doc = rdoc.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for location_, _ in rule.matches:
|
||||
|
||||
@@ -48,7 +48,8 @@ class CapaRuleGenFeatureCacheNode:
|
||||
|
||||
|
||||
class CapaRuleGenFeatureCache:
|
||||
def __init__(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
|
||||
def __init__(self, extractor: CapaExplorerFeatureExtractor):
|
||||
self.extractor = extractor
|
||||
self.global_features: FeatureSet = collections.defaultdict(set)
|
||||
|
||||
self.file_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(None, None)
|
||||
@@ -56,12 +57,11 @@ class CapaRuleGenFeatureCache:
|
||||
self.bb_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
|
||||
self.insn_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {}
|
||||
|
||||
self._find_global_features(extractor)
|
||||
self._find_file_features(extractor)
|
||||
self._find_function_and_below_features(fh_list, extractor)
|
||||
self._find_global_features()
|
||||
self._find_file_features()
|
||||
|
||||
def _find_global_features(self, extractor: CapaExplorerFeatureExtractor):
|
||||
for feature, addr in extractor.extract_global_features():
|
||||
def _find_global_features(self):
|
||||
for feature, addr in self.extractor.extract_global_features():
|
||||
# not all global features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
@@ -71,46 +71,45 @@ class CapaRuleGenFeatureCache:
|
||||
if feature not in self.global_features:
|
||||
self.global_features[feature] = set()
|
||||
|
||||
def _find_file_features(self, extractor: CapaExplorerFeatureExtractor):
|
||||
def _find_file_features(self):
|
||||
# not all file features may have virtual addresses.
|
||||
# if not, then at least ensure the feature shows up in the index.
|
||||
# the set of addresses will still be empty.
|
||||
for feature, addr in extractor.extract_file_features():
|
||||
for feature, addr in self.extractor.extract_file_features():
|
||||
if addr is not None:
|
||||
self.file_node.features[feature].add(addr)
|
||||
else:
|
||||
if feature not in self.file_node.features:
|
||||
self.file_node.features[feature] = set()
|
||||
|
||||
def _find_function_and_below_features(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor):
|
||||
for fh in fh_list:
|
||||
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
|
||||
def _find_function_and_below_features(self, fh: FunctionHandle):
|
||||
f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node)
|
||||
|
||||
# extract basic block and below features
|
||||
for bbh in extractor.get_basic_blocks(fh):
|
||||
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
|
||||
# extract basic block and below features
|
||||
for bbh in self.extractor.get_basic_blocks(fh):
|
||||
bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node)
|
||||
|
||||
# extract instruction features
|
||||
for ih in extractor.get_instructions(fh, bbh):
|
||||
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
|
||||
# extract instruction features
|
||||
for ih in self.extractor.get_instructions(fh, bbh):
|
||||
inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node)
|
||||
|
||||
for feature, addr in extractor.extract_insn_features(fh, bbh, ih):
|
||||
inode.features[feature].add(addr)
|
||||
for feature, addr in self.extractor.extract_insn_features(fh, bbh, ih):
|
||||
inode.features[feature].add(addr)
|
||||
|
||||
self.insn_nodes[inode.address] = inode
|
||||
self.insn_nodes[inode.address] = inode
|
||||
|
||||
# extract basic block features
|
||||
for feature, addr in extractor.extract_basic_block_features(fh, bbh):
|
||||
bb_node.features[feature].add(addr)
|
||||
# extract basic block features
|
||||
for feature, addr in self.extractor.extract_basic_block_features(fh, bbh):
|
||||
bb_node.features[feature].add(addr)
|
||||
|
||||
# store basic block features in cache and function parent
|
||||
self.bb_nodes[bb_node.address] = bb_node
|
||||
# store basic block features in cache and function parent
|
||||
self.bb_nodes[bb_node.address] = bb_node
|
||||
|
||||
# extract function features
|
||||
for feature, addr in extractor.extract_function_features(fh):
|
||||
f_node.features[feature].add(addr)
|
||||
# extract function features
|
||||
for feature, addr in self.extractor.extract_function_features(fh):
|
||||
f_node.features[feature].add(addr)
|
||||
|
||||
self.func_nodes[f_node.address] = f_node
|
||||
self.func_nodes[f_node.address] = f_node
|
||||
|
||||
def _find_instruction_capabilities(
|
||||
self, ruleset: RuleSet, insn: CapaRuleGenFeatureCacheNode
|
||||
@@ -155,7 +154,7 @@ class CapaRuleGenFeatureCache:
|
||||
def find_code_capabilities(
|
||||
self, ruleset: RuleSet, fh: FunctionHandle
|
||||
) -> Tuple[FeatureSet, MatchResults, MatchResults, MatchResults]:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
|
||||
if f_node is None:
|
||||
return {}, {}, {}, {}
|
||||
|
||||
@@ -195,8 +194,16 @@ class CapaRuleGenFeatureCache:
|
||||
_, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS)
|
||||
return features, matches
|
||||
|
||||
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
|
||||
def _get_cached_func_node(self, fh: FunctionHandle) -> Optional[CapaRuleGenFeatureCacheNode]:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None)
|
||||
if f_node is None:
|
||||
# function is not in our cache, do extraction now
|
||||
self._find_function_and_below_features(fh)
|
||||
f_node = self.func_nodes.get(fh.address, None)
|
||||
return f_node
|
||||
|
||||
def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet:
|
||||
f_node: Optional[CapaRuleGenFeatureCacheNode] = self._get_cached_func_node(fh)
|
||||
if f_node is None:
|
||||
return {}
|
||||
|
||||
|
||||
@@ -192,8 +192,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# caches used to speed up capa explorer analysis - these must be init to None
|
||||
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
|
||||
self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
|
||||
self.rulegen_feature_extractor: Optional[CapaExplorerFeatureExtractor] = None
|
||||
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
|
||||
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_current_function: Optional[FunctionHandle] = None
|
||||
|
||||
# models
|
||||
@@ -727,13 +729,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box(f"{text} ({self.process_count} of {self.process_total})")
|
||||
self.process_count += 1
|
||||
|
||||
update_wait_box("initializing feature extractor")
|
||||
|
||||
try:
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
self.feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.feature_extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s).", e, exc_info=True)
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -743,7 +743,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box("calculating analysis")
|
||||
|
||||
try:
|
||||
self.process_total += len(tuple(extractor.get_functions()))
|
||||
self.process_total += len(tuple(self.feature_extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
@@ -770,9 +770,13 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
try:
|
||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
|
||||
capabilities, counts = capa.main.find_capabilities(
|
||||
ruleset, self.feature_extractor, disable_progress=True
|
||||
)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
@@ -975,26 +979,21 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.rulegen_ruleset_cache)
|
||||
|
||||
# clear feature cache
|
||||
if self.rulegen_feature_cache is not None:
|
||||
self.rulegen_feature_cache = None
|
||||
|
||||
# clear cached function
|
||||
if self.rulegen_current_function is not None:
|
||||
self.rulegen_current_function = None
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("Initializing feature extractor")
|
||||
|
||||
try:
|
||||
# must use extractor to get function, as capa analysis requires casted object
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
# these are init once objects, create on tab change
|
||||
if self.rulegen_feature_cache is None or self.rulegen_feature_extractor is None:
|
||||
try:
|
||||
update_wait_box("performing one-time file analysis")
|
||||
self.rulegen_feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.rulegen_feature_cache = CapaRuleGenFeatureCache(self.rulegen_feature_extractor)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
else:
|
||||
logger.info("Reusing prior rulegen cache")
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
@@ -1006,7 +1005,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
try:
|
||||
f = idaapi.get_func(idaapi.get_screen_ea())
|
||||
if f is not None:
|
||||
self.rulegen_current_function = extractor.get_function(f.start_ea)
|
||||
self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea)
|
||||
except Exception as e:
|
||||
logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e, exc_info=True)
|
||||
return False
|
||||
@@ -1015,21 +1014,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
# extract features
|
||||
try:
|
||||
fh_list: List[FunctionHandle] = []
|
||||
if self.rulegen_current_function is not None:
|
||||
fh_list.append(self.rulegen_current_function)
|
||||
|
||||
self.rulegen_feature_cache = CapaRuleGenFeatureCache(fh_list, extractor)
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract features (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("generating function rule matches")
|
||||
|
||||
all_function_features: FeatureSet = collections.defaultdict(set)
|
||||
@@ -1261,7 +1245,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
elif index == 1:
|
||||
self.set_view_status_label(self.view_status_label_rulegen_cache)
|
||||
self.view_status_label_analysis_cache = status_prev
|
||||
|
||||
self.view_reset_button.setText("Clear")
|
||||
|
||||
def slot_rulegen_editor_update(self):
|
||||
|
||||
113
capa/main.py
113
capa/main.py
@@ -38,9 +38,11 @@ import capa.rules.cache
|
||||
import capa.render.default
|
||||
import capa.render.verbose
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
import capa.features.freeze as frz
|
||||
import capa.render.vverbose
|
||||
import capa.features.extractors
|
||||
import capa.render.result_document
|
||||
import capa.render.result_document as rdoc
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.dnfile_
|
||||
@@ -245,13 +247,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
all_bb_matches = collections.defaultdict(list) # type: MatchResults
|
||||
all_insn_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
meta = {
|
||||
"feature_counts": {
|
||||
"file": 0,
|
||||
"functions": {},
|
||||
},
|
||||
"library_functions": {},
|
||||
} # type: Dict[str, Any]
|
||||
feature_counts = rdoc.FeatureCounts(file=0, functions=tuple())
|
||||
library_functions: Tuple[rdoc.LibraryFunction, ...] = tuple()
|
||||
|
||||
with redirecting_print_to_tqdm(disable_progress):
|
||||
with tqdm.contrib.logging.logging_redirect_tqdm():
|
||||
@@ -270,8 +267,10 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
if extractor.is_library_function(f.address):
|
||||
function_name = extractor.get_function_name(f.address)
|
||||
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
|
||||
meta["library_functions"][f.address] = function_name
|
||||
n_libs = len(meta["library_functions"])
|
||||
library_functions += (
|
||||
rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name),
|
||||
)
|
||||
n_libs = len(library_functions)
|
||||
percentage = round(100 * (n_libs / n_funcs))
|
||||
if isinstance(pb, tqdm.tqdm):
|
||||
pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)")
|
||||
@@ -280,7 +279,9 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(
|
||||
ruleset, extractor, f
|
||||
)
|
||||
meta["feature_counts"]["functions"][f.address] = feature_count
|
||||
feature_counts.functions += (
|
||||
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
|
||||
)
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)
|
||||
|
||||
for rule_name, res in function_matches.items():
|
||||
@@ -301,7 +302,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
|
||||
|
||||
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
|
||||
meta["feature_counts"]["file"] = feature_count
|
||||
feature_counts.file = feature_count
|
||||
|
||||
matches = {
|
||||
rule_name: results
|
||||
@@ -316,6 +317,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
)
|
||||
}
|
||||
|
||||
meta = {
|
||||
"feature_counts": feature_counts,
|
||||
"library_functions": library_functions,
|
||||
}
|
||||
|
||||
return matches, meta
|
||||
|
||||
|
||||
@@ -739,7 +745,7 @@ def collect_metadata(
|
||||
os_: str,
|
||||
rules_path: List[str],
|
||||
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
|
||||
):
|
||||
) -> rdoc.Metadata:
|
||||
md5 = hashlib.md5()
|
||||
sha1 = hashlib.sha1()
|
||||
sha256 = hashlib.sha256()
|
||||
@@ -758,34 +764,37 @@ def collect_metadata(
|
||||
arch = get_arch(sample_path)
|
||||
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
|
||||
|
||||
return {
|
||||
"timestamp": datetime.datetime.now().isoformat(),
|
||||
"version": capa.version.__version__,
|
||||
"argv": argv,
|
||||
"sample": {
|
||||
"md5": md5.hexdigest(),
|
||||
"sha1": sha1.hexdigest(),
|
||||
"sha256": sha256.hexdigest(),
|
||||
"path": os.path.normpath(sample_path),
|
||||
},
|
||||
"analysis": {
|
||||
"format": format_,
|
||||
"arch": arch,
|
||||
"os": os_,
|
||||
"extractor": extractor.__class__.__name__,
|
||||
"rules": rules_path,
|
||||
"base_address": extractor.get_base_address(),
|
||||
"layout": {
|
||||
return rdoc.Metadata(
|
||||
timestamp=datetime.datetime.now(),
|
||||
version=capa.version.__version__,
|
||||
argv=tuple(argv) if argv else None,
|
||||
sample=rdoc.Sample(
|
||||
md5=md5.hexdigest(),
|
||||
sha1=sha1.hexdigest(),
|
||||
sha256=sha256.hexdigest(),
|
||||
path=os.path.normpath(sample_path),
|
||||
),
|
||||
analysis=rdoc.Analysis(
|
||||
format=format_,
|
||||
arch=arch,
|
||||
os=os_,
|
||||
extractor=extractor.__class__.__name__,
|
||||
rules=tuple(rules_path),
|
||||
base_address=frz.Address.from_capa(extractor.get_base_address()),
|
||||
layout=rdoc.Layout(
|
||||
functions=tuple(),
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
},
|
||||
},
|
||||
}
|
||||
),
|
||||
feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
|
||||
library_functions=tuple(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def compute_layout(rules, extractor, capabilities):
|
||||
def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
|
||||
"""
|
||||
compute a metadata structure that links basic blocks
|
||||
to the functions in which they're found.
|
||||
@@ -810,17 +819,19 @@ def compute_layout(rules, extractor, capabilities):
|
||||
assert addr in functions_by_bb
|
||||
matched_bbs.add(addr)
|
||||
|
||||
layout = {
|
||||
"functions": {
|
||||
f: {
|
||||
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
|
||||
# this object is open to extension in the future,
|
||||
layout = rdoc.Layout(
|
||||
functions=tuple(
|
||||
rdoc.FunctionLayout(
|
||||
address=frz.Address.from_capa(f),
|
||||
matched_basic_blocks=tuple(
|
||||
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
|
||||
) # this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
}
|
||||
)
|
||||
for f, bbs in bbs_by_function.items()
|
||||
if len([bb for bb in bbs if bb in matched_bbs]) > 0
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
return layout
|
||||
|
||||
@@ -1197,8 +1208,7 @@ def main(argv=None):
|
||||
logger.debug("file limitation short circuit, won't analyze fully.")
|
||||
return E_FILE_LIMITATION
|
||||
|
||||
# TODO: #1411 use a real type, not a dict here.
|
||||
meta: Dict[str, Any]
|
||||
meta: rdoc.Metadata
|
||||
capabilities: MatchResults
|
||||
counts: Dict[str, Any]
|
||||
|
||||
@@ -1214,7 +1224,7 @@ def main(argv=None):
|
||||
if format_ == FORMAT_FREEZE:
|
||||
# freeze format deserializes directly into an extractor
|
||||
with open(args.sample, "rb") as f:
|
||||
extractor = capa.features.freeze.load(f.read())
|
||||
extractor = frz.load(f.read())
|
||||
else:
|
||||
# all other formats we must create an extractor,
|
||||
# such as viv, binary ninja, etc. workspaces
|
||||
@@ -1255,15 +1265,16 @@ def main(argv=None):
|
||||
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
|
||||
|
||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
# do show the output in verbose mode, though.
|
||||
if not (args.verbose or args.vverbose or args.json):
|
||||
return E_FILE_LIMITATION
|
||||
|
||||
if args.json:
|
||||
print(capa.render.json.render(meta, rules, capabilities))
|
||||
elif args.vverbose:
|
||||
@@ -1308,7 +1319,9 @@ def ida_main():
|
||||
meta = capa.ida.helpers.collect_metadata([rules_path])
|
||||
|
||||
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
|
||||
meta["analysis"].update(counts)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
|
||||
if has_file_limitation(rules, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
|
||||
|
||||
@@ -28,42 +28,47 @@ class FrozenModel(BaseModel):
|
||||
extra = "forbid"
|
||||
|
||||
|
||||
class Sample(FrozenModel):
|
||||
class Model(BaseModel):
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
|
||||
|
||||
class Sample(Model):
|
||||
md5: str
|
||||
sha1: str
|
||||
sha256: str
|
||||
path: str
|
||||
|
||||
|
||||
class BasicBlockLayout(FrozenModel):
|
||||
class BasicBlockLayout(Model):
|
||||
address: frz.Address
|
||||
|
||||
|
||||
class FunctionLayout(FrozenModel):
|
||||
class FunctionLayout(Model):
|
||||
address: frz.Address
|
||||
matched_basic_blocks: Tuple[BasicBlockLayout, ...]
|
||||
|
||||
|
||||
class Layout(FrozenModel):
|
||||
class Layout(Model):
|
||||
functions: Tuple[FunctionLayout, ...]
|
||||
|
||||
|
||||
class LibraryFunction(FrozenModel):
|
||||
class LibraryFunction(Model):
|
||||
address: frz.Address
|
||||
name: str
|
||||
|
||||
|
||||
class FunctionFeatureCount(FrozenModel):
|
||||
class FunctionFeatureCount(Model):
|
||||
address: frz.Address
|
||||
count: int
|
||||
|
||||
|
||||
class FeatureCounts(FrozenModel):
|
||||
class FeatureCounts(Model):
|
||||
file: int
|
||||
functions: Tuple[FunctionFeatureCount, ...]
|
||||
|
||||
|
||||
class Analysis(FrozenModel):
|
||||
class Analysis(Model):
|
||||
format: str
|
||||
arch: str
|
||||
os: str
|
||||
@@ -75,92 +80,13 @@ class Analysis(FrozenModel):
|
||||
library_functions: Tuple[LibraryFunction, ...]
|
||||
|
||||
|
||||
class Metadata(FrozenModel):
|
||||
class Metadata(Model):
|
||||
timestamp: datetime.datetime
|
||||
version: str
|
||||
argv: Optional[Tuple[str, ...]]
|
||||
sample: Sample
|
||||
analysis: Analysis
|
||||
|
||||
@classmethod
|
||||
def from_capa(cls, meta: Any) -> "Metadata":
|
||||
return cls(
|
||||
timestamp=meta["timestamp"],
|
||||
version=meta["version"],
|
||||
argv=meta["argv"] if "argv" in meta else None,
|
||||
sample=Sample(
|
||||
md5=meta["sample"]["md5"],
|
||||
sha1=meta["sample"]["sha1"],
|
||||
sha256=meta["sample"]["sha256"],
|
||||
path=meta["sample"]["path"],
|
||||
),
|
||||
analysis=Analysis(
|
||||
format=meta["analysis"]["format"],
|
||||
arch=meta["analysis"]["arch"],
|
||||
os=meta["analysis"]["os"],
|
||||
extractor=meta["analysis"]["extractor"],
|
||||
rules=meta["analysis"]["rules"],
|
||||
base_address=frz.Address.from_capa(meta["analysis"]["base_address"]),
|
||||
layout=Layout(
|
||||
functions=tuple(
|
||||
FunctionLayout(
|
||||
address=frz.Address.from_capa(address),
|
||||
matched_basic_blocks=tuple(
|
||||
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
|
||||
),
|
||||
)
|
||||
for address, f in meta["analysis"]["layout"]["functions"].items()
|
||||
)
|
||||
),
|
||||
feature_counts=FeatureCounts(
|
||||
file=meta["analysis"]["feature_counts"]["file"],
|
||||
functions=tuple(
|
||||
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
|
||||
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
|
||||
),
|
||||
),
|
||||
library_functions=tuple(
|
||||
LibraryFunction(address=frz.Address.from_capa(address), name=name)
|
||||
for address, name in meta["analysis"]["library_functions"].items()
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
def to_capa(self) -> Dict[str, Any]:
|
||||
capa_meta = {
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"version": self.version,
|
||||
"sample": {
|
||||
"md5": self.sample.md5,
|
||||
"sha1": self.sample.sha1,
|
||||
"sha256": self.sample.sha256,
|
||||
"path": self.sample.path,
|
||||
},
|
||||
"analysis": {
|
||||
"format": self.analysis.format,
|
||||
"arch": self.analysis.arch,
|
||||
"os": self.analysis.os,
|
||||
"extractor": self.analysis.extractor,
|
||||
"rules": self.analysis.rules,
|
||||
"base_address": self.analysis.base_address.to_capa(),
|
||||
"layout": {
|
||||
"functions": {
|
||||
f.address.to_capa(): {
|
||||
"matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks]
|
||||
}
|
||||
for f in self.analysis.layout.functions
|
||||
}
|
||||
},
|
||||
"feature_counts": {
|
||||
"file": self.analysis.feature_counts.file,
|
||||
"functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions},
|
||||
},
|
||||
"library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions},
|
||||
},
|
||||
}
|
||||
|
||||
return capa_meta
|
||||
|
||||
|
||||
class CompoundStatementType:
|
||||
AND = "and"
|
||||
@@ -642,7 +568,7 @@ class ResultDocument(FrozenModel):
|
||||
rules: Dict[str, RuleMatches]
|
||||
|
||||
@classmethod
|
||||
def from_capa(cls, meta, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument":
|
||||
def from_capa(cls, meta: Metadata, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument":
|
||||
rule_matches: Dict[str, RuleMatches] = {}
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
@@ -659,10 +585,9 @@ class ResultDocument(FrozenModel):
|
||||
),
|
||||
)
|
||||
|
||||
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
|
||||
return ResultDocument(meta=meta, rules=rule_matches)
|
||||
|
||||
def to_capa(self) -> Tuple[Dict, Dict]:
|
||||
meta = self.meta.to_capa()
|
||||
def to_capa(self) -> Tuple[Metadata, Dict]:
|
||||
capabilities: Dict[
|
||||
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
|
||||
] = collections.defaultdict(list)
|
||||
@@ -678,4 +603,4 @@ class ResultDocument(FrozenModel):
|
||||
|
||||
capabilities[rule_name].append((addr.to_capa(), result))
|
||||
|
||||
return meta, capabilities
|
||||
return self.meta, capabilities
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 188e65528e...368a27e739
@@ -131,8 +131,10 @@ def get_capa_results(args):
|
||||
|
||||
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||
|
||||
|
||||
@@ -172,10 +172,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
capa_output: Any = False
|
||||
|
||||
if output_format == "dictionary":
|
||||
# ...as python dictionary, simplified as textable but in dictionary
|
||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||
|
||||
@@ -178,8 +178,10 @@ def main(argv=None):
|
||||
|
||||
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if capa.main.has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
|
||||
10
setup.py
10
setup.py
@@ -21,13 +21,13 @@ requirements = [
|
||||
"viv-utils[flirt]==0.7.9",
|
||||
"halo==0.0.31",
|
||||
"networkx==2.5.1", # newer versions no longer support py3.7.
|
||||
"ruamel.yaml==0.17.28",
|
||||
"ruamel.yaml==0.17.32",
|
||||
"vivisect==1.1.1",
|
||||
"pefile==2023.2.7",
|
||||
"pyelftools==0.29",
|
||||
"dnfile==0.13.0",
|
||||
"dncil==1.0.2",
|
||||
"pydantic==1.10.7",
|
||||
"pydantic==1.10.9",
|
||||
"protobuf==4.23.2",
|
||||
]
|
||||
|
||||
@@ -72,7 +72,7 @@ setuptools.setup(
|
||||
"pytest==7.3.1",
|
||||
"pytest-sugar==0.9.4",
|
||||
"pytest-instafail==0.5.0",
|
||||
"pytest-cov==4.0.0",
|
||||
"pytest-cov==4.1.0",
|
||||
"pycodestyle==2.10.0",
|
||||
"ruff==0.0.270",
|
||||
"black==23.3.0",
|
||||
@@ -80,7 +80,7 @@ setuptools.setup(
|
||||
"mypy==1.3.0",
|
||||
"psutil==5.9.2",
|
||||
"stix2==3.0.1",
|
||||
"requests==2.28.0",
|
||||
"requests==2.31.0",
|
||||
"mypy-protobuf==3.4.0",
|
||||
# type stubs for mypy
|
||||
"types-backports==0.1.3",
|
||||
@@ -89,7 +89,7 @@ setuptools.setup(
|
||||
"types-tabulate==0.9.0.1",
|
||||
"types-termcolor==1.1.4",
|
||||
"types-psutil==5.8.23",
|
||||
"types_requests==2.28.1",
|
||||
"types_requests==2.31.0.1",
|
||||
"types-protobuf==4.23.0.1",
|
||||
],
|
||||
"build": [
|
||||
|
||||
Submodule tests/data updated: 00db2da0eb...a37873c8a5
@@ -761,6 +761,47 @@ FEATURE_PRESENCE_TESTS = sorted(
|
||||
key=lambda t: (t[0], t[1]),
|
||||
)
|
||||
|
||||
# this list should be merged into the one above (FEATURE_PRESENSE_TESTS)
|
||||
# once the debug symbol functionality has been added to all backends
|
||||
FEATURE_SYMTAB_FUNC_TESTS = [
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4027b3,bb=0x402861,insn=0x40286d",
|
||||
capa.features.insn.API("__GI_connect"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4027b3,bb=0x402861,insn=0x40286d",
|
||||
capa.features.insn.API("connect"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4027b3,bb=0x402861,insn=0x40286d",
|
||||
capa.features.insn.API("__libc_connect"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4088a4",
|
||||
capa.features.file.FunctionName("__GI_connect"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4088a4",
|
||||
capa.features.file.FunctionName("connect"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2bf18d",
|
||||
"function=0x4088a4",
|
||||
capa.features.file.FunctionName("__libc_connect"),
|
||||
True,
|
||||
),
|
||||
]
|
||||
|
||||
FEATURE_PRESENCE_TESTS_DOTNET = sorted(
|
||||
[
|
||||
("b9f5b", "file", Arch(ARCH_I386), True),
|
||||
|
||||
@@ -55,3 +55,9 @@ def test_standalone_binja_backend():
|
||||
CD = os.path.dirname(__file__)
|
||||
test_path = os.path.join(CD, "..", "tests", "data", "Practical Malware Analysis Lab 01-01.exe_")
|
||||
assert capa.main.main([test_path, "-b", capa.main.BACKEND_BINJA]) == 0
|
||||
|
||||
|
||||
@pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed")
|
||||
def test_binja_version():
|
||||
version = binaryninja.core_version_info()
|
||||
assert version.major == 3 and version.minor == 4
|
||||
|
||||
@@ -282,5 +282,5 @@ def test_rdoc_to_capa():
|
||||
rd = rdoc.ResultDocument.parse_file(path)
|
||||
|
||||
meta, capabilites = rd.to_capa()
|
||||
assert isinstance(meta, dict)
|
||||
assert isinstance(meta, rdoc.Metadata)
|
||||
assert isinstance(capabilites, dict)
|
||||
|
||||
@@ -11,7 +11,7 @@ from fixtures import *
|
||||
|
||||
@fixtures.parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_PRESENCE_TESTS,
|
||||
fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_SYMTAB_FUNC_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_viv_features(sample, scope, feature, expected):
|
||||
|
||||
Reference in New Issue
Block a user