extractor: add types throughout

This commit is contained in:
Willi Ballenthin
2022-04-08 11:53:42 -06:00
parent 1b79aae836
commit fc1709ba6c
10 changed files with 118 additions and 90 deletions

View File

@@ -2,36 +2,38 @@ import io
import logging
import binascii
import contextlib
from typing import Tuple, Iterator
import pefile
import capa.features
import capa.features.extractors.elf
import capa.features.extractors.pefile
from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String
from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String, Feature
from capa.features.freeze import is_freeze
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress
logger = logging.getLogger(__name__)
def extract_file_strings(buf, **kwargs):
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]:
"""
extract ASCII and UTF-16 LE strings from file
"""
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
yield String(s.s), s.offset
yield String(s.s), FileOffsetAddress(s.offset)
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
yield String(s.s), s.offset
yield String(s.s), FileOffsetAddress(s.offset)
def extract_format(buf):
def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(b"MZ"):
yield Format(FORMAT_PE), 0x0
yield Format(FORMAT_PE), NO_ADDRESS
elif buf.startswith(b"\x7fELF"):
yield Format(FORMAT_ELF), 0x0
yield Format(FORMAT_ELF), NO_ADDRESS
elif is_freeze(buf):
yield Format(FORMAT_FREEZE), 0x0
yield Format(FORMAT_FREEZE), NO_ADDRESS
else:
# we likely end up here:
# 1. handling a file format (e.g. macho)
@@ -41,7 +43,7 @@ def extract_format(buf):
return
def extract_arch(buf):
def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(b"MZ"):
yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf))
@@ -53,7 +55,7 @@ def extract_arch(buf):
logger.debug("unsupported arch: %s", arch)
return
yield Arch(arch), 0x0
yield Arch(arch), NO_ADDRESS
else:
# we likely end up here:
@@ -70,9 +72,9 @@ def extract_arch(buf):
return
def extract_os(buf):
def extract_os(buf) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(b"MZ"):
yield OS(OS_WINDOWS), 0x0
yield OS(OS_WINDOWS), NO_ADDRESS
elif buf.startswith(b"\x7fELF"):
with contextlib.closing(io.BytesIO(buf)) as f:
os = capa.features.extractors.elf.detect_elf_os(f)
@@ -81,7 +83,7 @@ def extract_os(buf):
logger.debug("unsupported os: %s", os)
return
yield OS(os), 0x0
yield OS(os), NO_ADDRESS
else:
# we likely end up here:

View File

@@ -5,34 +5,35 @@ import dnfile
import pefile
from capa.features.common import OS, OS_ANY, ARCH_ANY, ARCH_I386, ARCH_AMD64, FORMAT_DOTNET, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
logger = logging.getLogger(__name__)
def extract_file_format(**kwargs):
yield Format(FORMAT_DOTNET), 0x0
yield Format(FORMAT_DOTNET), NO_ADDRESS
def extract_file_os(**kwargs):
yield OS(OS_ANY), 0x0
yield OS(OS_ANY), NO_ADDRESS
def extract_file_arch(pe, **kwargs):
# to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020
# .NET 4.5 added option: any CPU, 32-bit preferred
if pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE:
yield Arch(ARCH_I386), 0x0
yield Arch(ARCH_I386), NO_ADDRESS
elif not pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS:
yield Arch(ARCH_AMD64), 0x0
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
yield Arch(ARCH_ANY), 0x0
yield Arch(ARCH_ANY), NO_ADDRESS
def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, Address]]:
for file_handler in FILE_HANDLERS:
for feature, va in file_handler(pe=pe): # type: ignore
yield feature, va
for feature, address in file_handler(pe=pe): # type: ignore
yield feature, address
FILE_HANDLERS = (
@@ -45,10 +46,10 @@ FILE_HANDLERS = (
)
def extract_global_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
def extract_global_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, Address]]:
for handler in GLOBAL_HANDLERS:
for feature, va in handler(pe=pe): # type: ignore
yield feature, va
for feature, addr in handler(pe=pe): # type: ignore
yield feature, addr
GLOBAL_HANDLERS = (
@@ -63,8 +64,8 @@ class DnfileFeatureExtractor(FeatureExtractor):
self.path: str = path
self.pe: dnfile.dnPE = dnfile.dnPE(path)
def get_base_address(self) -> int:
return 0x0
def get_base_address(self) -> AbsoluteVirtualAddress:
return AbsoluteVirtualAddress(0x0)
def get_entry_point(self) -> int:
# self.pe.net.Flags.CLT_NATIVE_ENTRYPOINT

View File

@@ -1,3 +1,5 @@
from typing import List, Tuple
from smda.common.SmdaReport import SmdaReport
import capa.features.extractors.common
@@ -6,6 +8,8 @@ import capa.features.extractors.smda.insn
import capa.features.extractors.smda.global_
import capa.features.extractors.smda.function
import capa.features.extractors.smda.basicblock
from capa.features.common import Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import FeatureExtractor
@@ -18,7 +22,7 @@ class SmdaFeatureExtractor(FeatureExtractor):
self.buf = f.read()
# pre-compute these because we'll yield them at *every* scope.
self.global_features = []
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.common.extract_os(self.buf))
self.global_features.extend(capa.features.extractors.smda.global_.extract_arch(self.smda_report))

View File

@@ -8,27 +8,30 @@
import string
import struct
from typing import Tuple, Iterator
import envi
import envi.archs.i386.disasm
from capa.features.common import Characteristic
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
def interface_extract_basic_block_XXX(f, bb):
def interface_extract_basic_block_XXX(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse features from the given basic block.
args:
f (viv_utils.Function): the function to process.
bb (viv_utils.BasicBlock): the basic block to process.
f: the function to process.
bb: the basic block to process.
yields:
(Feature, int): the feature and the address at which its found.
(Feature, Address): the feature and the address at which its found.
"""
yield NotImplementedError("feature"), NotImplementedError("virtual address")
...
def _bb_has_tight_loop(f, bb):
@@ -44,7 +47,7 @@ def _bb_has_tight_loop(f, bb):
return False
def extract_bb_tight_loop(f, bb):
def extract_bb_tight_loop(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""check basic block for tight loop indicators"""
if _bb_has_tight_loop(f, bb):
yield Characteristic("tight loop"), bb.va
@@ -67,7 +70,7 @@ def _bb_has_stackstring(f, bb):
return False
def extract_stackstring(f, bb):
def extract_stackstring(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""check basic block for stackstring indicators"""
if _bb_has_stackstring(f, bb):
yield Characteristic("stack string"), bb.va
@@ -143,7 +146,7 @@ def is_printable_utf16le(chars: bytes) -> bool:
return False
def extract_features(f, bb):
def extract_features(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract features from the given basic block.
@@ -156,8 +159,8 @@ def extract_features(f, bb):
"""
yield BasicBlock(), bb.va
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, va in bb_handler(f, bb):
yield feature, va
for feature, addr in bb_handler(f, bb):
yield feature, addr
BASIC_BLOCK_HANDLERS = (

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import List, Tuple
import viv_utils
import viv_utils.flirt
@@ -16,7 +17,9 @@ import capa.features.extractors.viv.insn
import capa.features.extractors.viv.global_
import capa.features.extractors.viv.function
import capa.features.extractors.viv.basicblock
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
logger = logging.getLogger(__name__)
@@ -43,13 +46,13 @@ class VivisectFeatureExtractor(FeatureExtractor):
self.buf = f.read()
# pre-compute these because we'll yield them at *every* scope.
self.global_features = []
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.common.extract_os(self.buf))
self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw))
def get_base_address(self):
# assume there is only one file loaded into the vw
return list(self.vw.filemeta.values())[0]["imagebase"]
return AbsoluteVirtualAddress(list(self.vw.filemeta.values())[0]["imagebase"])
def extract_global_features(self):
yield from self.global_features

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Tuple, Iterator
import PE.carve as pe_carve # vivisect PE
import viv_utils
@@ -15,20 +16,21 @@ import capa.features.extractors.common
import capa.features.extractors.helpers
import capa.features.extractors.strings
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import String, Characteristic
from capa.features.common import String, Feature, Characteristic
from capa.features.address import Address, FileOffsetAddress, AbsoluteVirtualAddress
def extract_file_embedded_pe(buf, **kwargs):
def extract_file_embedded_pe(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]:
for offset, _ in pe_carve.carve(buf, 1):
yield Characteristic("embedded pe"), offset
yield Characteristic("embedded pe"), FileOffsetAddress(offset)
def extract_file_export_names(vw, **kwargs):
def extract_file_export_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]:
for va, _, name, _ in vw.getExports():
yield Export(name), va
yield Export(name), AbsoluteVirtualAddress(va)
def extract_file_import_names(vw, **kwargs):
def extract_file_import_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]:
"""
extract imported function names
1. imports by ordinal:
@@ -44,8 +46,9 @@ def extract_file_import_names(vw, **kwargs):
# replace ord prefix with #
impname = "#%s" % impname[len("ord") :]
addr = AbsoluteVirtualAddress(va)
for name in capa.features.extractors.helpers.generate_symbols(modname, impname):
yield Import(name), va
yield Import(name), addr
def is_viv_ord_impname(impname: str) -> bool:
@@ -62,36 +65,37 @@ def is_viv_ord_impname(impname: str) -> bool:
return True
def extract_file_section_names(vw, **kwargs):
def extract_file_section_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]:
for va, _, segname, _ in vw.getSegments():
yield Section(segname), va
yield Section(segname), AbsoluteVirtualAddress(va)
def extract_file_strings(buf, **kwargs):
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.common.extract_file_strings(buf)
def extract_file_function_names(vw, **kwargs):
def extract_file_function_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of statically-linked library functions.
"""
for va in sorted(vw.getFunctions()):
addr = AbsoluteVirtualAddress(va)
if viv_utils.flirt.is_library_function(vw, va):
name = viv_utils.get_function_name(vw, va)
yield FunctionName(name), va
yield FunctionName(name), addr
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), va
yield FunctionName(name[1:]), addr
def extract_file_format(buf, **kwargs):
def extract_file_format(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.common.extract_format(buf)
def extract_features(vw, buf: bytes):
def extract_features(vw, buf: bytes) -> Iterator[Tuple[Feature, Address]]:
"""
extract file features from given workspace
@@ -100,12 +104,12 @@ def extract_features(vw, buf: bytes):
buf: the raw input file bytes
yields:
Tuple[Feature, VA]: a feature and its location.
Tuple[Feature, Address]: a feature and its location.
"""
for file_handler in FILE_HANDLERS:
for feature, va in file_handler(vw=vw, buf=buf): # type: ignore
yield feature, va
for feature, addr in file_handler(vw=vw, buf=buf): # type: ignore
yield feature, addr
FILE_HANDLERS = (

View File

@@ -5,33 +5,37 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Tuple, Iterator
import envi
import viv_utils
import vivisect.const
from capa.features.common import Characteristic
from capa.features.common import Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle
def interface_extract_function_XXX(f):
def interface_extract_function_XXX(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse features from the given function.
args:
f (viv_utils.Function): the function to process.
f: the function to process.
yields:
(Feature, int): the feature and the address at which its found.
(Feature, Address): the feature and the address at which its found.
"""
yield NotImplementedError("feature"), NotImplementedError("virtual address")
...
def extract_function_calls_to(f):
def extract_function_calls_to(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE):
yield Characteristic("calls to"), src
def extract_function_loop(f):
def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse if a function has a loop
"""
@@ -53,7 +57,7 @@ def extract_function_loop(f):
yield Characteristic("loop"), f.va
def extract_features(f):
def extract_features(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract features from the given function.
@@ -64,8 +68,8 @@ def extract_features(f):
Tuple[Feature, int]: the features and their location found in this function.
"""
for func_handler in FUNCTION_HANDLERS:
for feature, va in func_handler(f):
yield feature, va
for feature, addr in func_handler(f):
yield feature, addr
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)

View File

@@ -1,19 +1,21 @@
import logging
from typing import Tuple, Iterator
import envi.archs.i386
import envi.archs.amd64
from capa.features.common import ARCH_I386, ARCH_AMD64, Arch
from capa.features.common import ARCH_I386, ARCH_AMD64, Arch, Feature
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
def extract_arch(vw):
def extract_arch(vw) -> Iterator[Tuple[Feature, Address]]:
if isinstance(vw.arch, envi.archs.amd64.Amd64Module):
yield Arch(ARCH_AMD64), 0x0
yield Arch(ARCH_AMD64), NO_ADDRESS
elif isinstance(vw.arch, envi.archs.i386.i386Module):
yield Arch(ARCH_I386), 0x0
yield Arch(ARCH_I386), NO_ADDRESS
else:
# we likely end up here:

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple, Callable, Iterator
import envi
import envi.exc
import viv_utils
@@ -18,7 +19,9 @@ import envi.archs.amd64.disasm
import capa.features.extractors.helpers
import capa.features.extractors.viv.helpers
from capa.features.insn import API, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Characteristic
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
@@ -26,19 +29,21 @@ from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_i
SECURITY_COOKIE_BYTES_DELTA = 0x40
def interface_extract_instruction_XXX(f, bb, insn):
def interface_extract_instruction_XXX(
f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse features from the given instruction.
args:
f (viv_utils.Function): the function to process.
bb (viv_utils.BasicBlock): the basic block to process.
insn (vivisect...Instruction): the instruction to process.
f: the function to process.
bb: the basic block to process.
insn: the instruction to process.
yields:
(Feature, int): the feature and the address at which its found.
(Feature, Address): the feature and the address at which its found.
"""
yield NotImplementedError("feature"), NotImplementedError("virtual address")
...
def get_imports(vw):
@@ -610,8 +615,8 @@ def extract_op_string_features(f, bb, insn, i, oper):
def extract_operand_features(f, bb, insn):
for i, oper in enumerate(insn.opers):
for op_handler in OPERAND_HANDLERS:
for feature, va in op_handler(f, bb, insn, i, oper):
yield feature, va
for feature, addr in op_handler(f, bb, insn, i, oper):
yield feature, addr
OPERAND_HANDLERS = (
@@ -621,7 +626,7 @@ OPERAND_HANDLERS = (
)
def extract_features(f, bb, insn):
def extract_features(f, bb, insn) -> Iterator[Tuple[Feature, Address]]:
"""
extract features from the given insn.
@@ -631,11 +636,11 @@ def extract_features(f, bb, insn):
insn (vivisect...Instruction): the instruction to process.
yields:
Tuple[Feature, int]: the features and their location found in this insn.
Tuple[Feature, Address]: the features and their location found in this insn.
"""
for insn_handler in INSTRUCTION_HANDLERS:
for feature, va in insn_handler(f, bb, insn):
yield feature, va
for feature, addr in insn_handler(f, bb, insn):
yield feature, addr
INSTRUCTION_HANDLERS = (

View File

@@ -114,18 +114,18 @@ def find_instruction_capabilities(
# all features found for the instruction.
features = collections.defaultdict(set) # type: FeatureSet
for feature, va in itertools.chain(
for feature, addr in itertools.chain(
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
):
features[feature].add(va)
features[feature].add(addr)
# matches found at this instruction.
_, matches = ruleset.match(Scope.INSTRUCTION, features, int(insn))
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches