Merge pull request #930 from mandiant/feature-insn-scope

feature: instruction scope
This commit is contained in:
Willi Ballenthin
2022-04-04 16:42:56 -06:00
committed by GitHub
21 changed files with 711 additions and 173 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -37,6 +37,8 @@ jobs:
run: isort --profile black --length-sort --line-width 120 -c .
- name: Lint with black
run: black -l 120 --check .
- name: Lint with pycodestyle
run: pycodestyle --show-source capa/ scripts/ tests/
- name: Check types with mypy
run: mypy --config-file .github/mypy/mypy.ini capa/ scripts/ tests/

2
.gitignore vendored
View File

@@ -118,3 +118,5 @@ rule-linter-output.log
scripts/perf/*.txt
scripts/perf/*.svg
scripts/perf/*.zip
.direnv
.envrc

View File

@@ -4,8 +4,14 @@
### New Features
- add new scope "instruction" for matching mnemonics and operands #767 @williballenthin
- add new feature "operand[{0, 1, 2}].number" for matching instruction operand immediate values #767 @williballenthin
- add new feature "operand[{0, 1, 2}].offset" for matching instruction operand offsets #767 @williballenthin
### Breaking Changes
- instruction scope and operand feature are new and are not backwards compatible with older versions of capa
### New Rules (4)
- data-manipulation/encryption/aes/manually-build-aes-constants huynh.t.nhan@gmail.com

View File

@@ -235,8 +235,8 @@ class Subscope(Statement):
the engine should preprocess rules to extract subscope statements into their own rules.
"""
def __init__(self, scope, child):
super(Subscope, self).__init__()
def __init__(self, scope, child, description=None):
super(Subscope, self).__init__(description=description)
self.scope = scope
self.child = child

View File

@@ -7,6 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License.
import re
import abc
import codecs
import logging
import collections
@@ -96,7 +97,7 @@ class Result:
return self.success
class Feature:
class Feature(abc.ABC):
def __init__(self, value: Union[str, int, bytes], bitness=None, description=None):
"""
Args:
@@ -168,6 +169,8 @@ class Feature:
kwargs = args[-1]
args = args[:-1]
return cls(*args, **kwargs)
else:
return cls(*args)
class MatchedRule(Feature):
@@ -178,7 +181,6 @@ class MatchedRule(Feature):
class Characteristic(Feature):
def __init__(self, value: str, description=None):
super(Characteristic, self).__init__(value, description=description)

View File

@@ -12,7 +12,7 @@ import idautils
import capa.features.extractors.helpers
import capa.features.extractors.ida.helpers
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.insn import API, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import (
BITNESS_X32,
BITNESS_X64,
@@ -143,7 +143,11 @@ def extract_insn_number_features(f, bb, insn):
# .text:00401145 add esp, 0Ch
return
for op in capa.features.extractors.ida.helpers.get_insn_ops(insn, target_ops=(idaapi.o_imm, idaapi.o_mem)):
for i, op in enumerate(insn.ops):
if op.type == idaapi.o_void:
break
if op.type not in (idaapi.o_imm, idaapi.o_mem):
continue
# skip things like:
# .text:00401100 shr eax, offset loc_C
if capa.features.extractors.ida.helpers.is_op_offset(insn, op):
@@ -156,6 +160,7 @@ def extract_insn_number_features(f, bb, insn):
yield Number(const), insn.ea
yield Number(const, bitness=get_bitness(f.ctx)), insn.ea
yield OperandNumber(i, const), insn.ea
def extract_insn_bytes_features(f, bb, insn):
@@ -208,9 +213,14 @@ def extract_insn_offset_features(f, bb, insn):
example:
.text:0040112F cmp [esi+4], ebx
"""
for op in capa.features.extractors.ida.helpers.get_insn_ops(insn, target_ops=(idaapi.o_phrase, idaapi.o_displ)):
for i, op in enumerate(insn.ops):
if op.type == idaapi.o_void:
break
if op.type not in (idaapi.o_phrase, idaapi.o_displ):
continue
if capa.features.extractors.ida.helpers.is_op_stack_var(insn.ea, op.n):
continue
p_info = capa.features.extractors.ida.helpers.get_op_phrase_info(op)
op_off = p_info.get("offset", 0)
if idaapi.is_mapped(op_off):
@@ -225,6 +235,7 @@ def extract_insn_offset_features(f, bb, insn):
yield Offset(op_off), insn.ea
yield Offset(op_off, bitness=get_bitness(f.ctx)), insn.ea
yield OperandOffset(i, op_off), insn.ea
def contains_stack_cookie_keywords(s):

View File

@@ -218,7 +218,7 @@ def extract_insn_offset_features(f, bb, insn):
# mov eax, [esi + ecx + 16384]
operands = [o.strip() for o in insn.operands.split(",")]
for operand in operands:
if not "ptr" in operand:
if "ptr" not in operand:
continue
if "esp" in operand or "ebp" in operand or "rbp" in operand:
continue

View File

@@ -17,7 +17,7 @@ import envi.archs.amd64.disasm
import capa.features.extractors.helpers
import capa.features.extractors.viv.helpers
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.insn import API, Number, Offset, Mnemonic, OperandNumber, OperandOffset
from capa.features.common import (
BITNESS_X32,
BITNESS_X64,
@@ -171,37 +171,6 @@ def extract_insn_api_features(f, bb, insn):
yield API(name), insn.va
def extract_insn_number_features(f, bb, insn):
"""parse number features from the given instruction."""
# example:
#
# push 3136B0h ; dwControlCode
for oper in insn.opers:
# this is for both x32 and x64
if not isinstance(oper, (envi.archs.i386.disasm.i386ImmOper, envi.archs.i386.disasm.i386ImmMemOper)):
continue
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
else:
v = oper.getOperAddr(oper)
if f.vw.probeMemory(v, 1, envi.memory.MM_READ):
# this is a valid address
# assume its not also a constant.
continue
if insn.mnem == "add" and insn.opers[0].isReg() and insn.opers[0].reg == envi.archs.i386.regs.REG_ESP:
# skip things like:
#
# .text:00401140 call sub_407E2B
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.va
yield Number(v, bitness=get_bitness(f.vw)), insn.va
def derefs(vw, p):
"""
recursively follow the given pointer, yielding the valid memory addresses along the way.
@@ -340,75 +309,6 @@ def read_string(vw, offset: int) -> str:
raise ValueError("not a string", offset)
def extract_insn_string_features(f, bb, insn):
"""parse string features from the given instruction."""
# example:
#
# push offset aAcr ; "ACR > "
for oper in insn.opers:
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
elif isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper):
# like 0x10056CB4 in `lea eax, dword [0x10056CB4]`
v = oper.imm
elif isinstance(oper, envi.archs.i386.disasm.i386SibOper):
# like 0x401000 in `mov eax, 0x401000[2 * ebx]`
v = oper.imm
elif isinstance(oper, envi.archs.amd64.disasm.Amd64RipRelOper):
v = oper.getOperAddr(insn)
else:
continue
for v in derefs(f.vw, v):
try:
s = read_string(f.vw, v)
except ValueError:
continue
else:
yield String(s.rstrip("\x00")), insn.va
def extract_insn_offset_features(f, bb, insn):
"""parse structure offset features from the given instruction."""
# example:
#
# .text:0040112F cmp [esi+4], ebx
for oper in insn.opers:
# this is for both x32 and x64
# like [esi + 4]
# reg ^
# disp
if isinstance(oper, envi.archs.i386.disasm.i386RegMemOper):
if oper.reg == envi.archs.i386.regs.REG_ESP:
continue
if oper.reg == envi.archs.i386.regs.REG_EBP:
continue
# TODO: do x64 support for real.
if oper.reg == envi.archs.amd64.regs.REG_RBP:
continue
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield Offset(v, bitness=get_bitness(f.vw)), insn.va
# like: [esi + ecx + 16384]
# reg ^ ^
# index ^
# disp
elif isinstance(oper, envi.archs.i386.disasm.i386SibOper):
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield Offset(v, bitness=get_bitness(f.vw)), insn.va
def is_security_cookie(f, bb, insn) -> bool:
"""
check if an instruction is related to security cookie checks
@@ -625,6 +525,121 @@ def extract_function_indirect_call_characteristic_features(f, bb, insn):
yield Characteristic("indirect call"), insn.va
def extract_op_number_features(f, bb, insn, i, oper):
"""parse number features from the given operand."""
# example:
#
# push 3136B0h ; dwControlCode
# this is for both x32 and x64
if not isinstance(oper, (envi.archs.i386.disasm.i386ImmOper, envi.archs.i386.disasm.i386ImmMemOper)):
return
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
else:
v = oper.getOperAddr(oper)
if f.vw.probeMemory(v, 1, envi.memory.MM_READ):
# this is a valid address
# assume its not also a constant.
return
if insn.mnem == "add" and insn.opers[0].isReg() and insn.opers[0].reg == envi.archs.i386.regs.REG_ESP:
# skip things like:
#
# .text:00401140 call sub_407E2B
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.va
yield Number(v, bitness=get_bitness(f.vw)), insn.va
yield OperandNumber(i, v), insn.va
def extract_op_offset_features(f, bb, insn, i, oper):
"""parse structure offset features from the given operand."""
# example:
#
# .text:0040112F cmp [esi+4], ebx
# this is for both x32 and x64
# like [esi + 4]
# reg ^
# disp
if isinstance(oper, envi.archs.i386.disasm.i386RegMemOper):
if oper.reg == envi.archs.i386.regs.REG_ESP:
return
if oper.reg == envi.archs.i386.regs.REG_EBP:
return
# TODO: do x64 support for real.
if oper.reg == envi.archs.amd64.regs.REG_RBP:
return
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield Offset(v, bitness=get_bitness(f.vw)), insn.va
yield OperandOffset(i, v), insn.va
# like: [esi + ecx + 16384]
# reg ^ ^
# index ^
# disp
elif isinstance(oper, envi.archs.i386.disasm.i386SibOper):
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield Offset(v, bitness=get_bitness(f.vw)), insn.va
yield OperandOffset(i, v), insn.va
def extract_op_string_features(f, bb, insn, i, oper):
"""parse string features from the given operand."""
# example:
#
# push offset aAcr ; "ACR > "
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
elif isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper):
# like 0x10056CB4 in `lea eax, dword [0x10056CB4]`
v = oper.imm
elif isinstance(oper, envi.archs.i386.disasm.i386SibOper):
# like 0x401000 in `mov eax, 0x401000[2 * ebx]`
v = oper.imm
elif isinstance(oper, envi.archs.amd64.disasm.Amd64RipRelOper):
v = oper.getOperAddr(insn)
else:
return
for v in derefs(f.vw, v):
try:
s = read_string(f.vw, v)
except ValueError:
continue
else:
yield String(s.rstrip("\x00")), insn.va
def extract_operand_features(f, bb, insn):
for i, oper in enumerate(insn.opers):
for op_handler in OPERAND_HANDLERS:
for feature, va in op_handler(f, bb, insn, i, oper):
yield feature, va
OPERAND_HANDLERS = (
extract_op_number_features,
extract_op_offset_features,
extract_op_string_features,
)
def extract_features(f, bb, insn):
"""
extract features from the given insn.
@@ -644,10 +659,7 @@ def extract_features(f, bb, insn):
INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_string_features,
extract_insn_bytes_features,
extract_insn_offset_features,
extract_insn_nzxor_characteristic_features,
extract_insn_mnemonic_features,
extract_insn_obfs_call_plus_5_characteristic_features,
@@ -656,4 +668,5 @@ INSTRUCTION_HANDLERS = (
extract_insn_segment_access_features,
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
extract_operand_features,
)

View File

@@ -51,6 +51,7 @@ See the License for the specific language governing permissions and limitations
import json
import zlib
import logging
from typing import Dict, Type
import capa.features.file
import capa.features.insn
@@ -58,6 +59,7 @@ import capa.features.common
import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.helpers import hex
from capa.features.common import Feature
logger = logging.getLogger(__name__)
@@ -66,7 +68,8 @@ def serialize_feature(feature):
return feature.freeze_serialize()
KNOWN_FEATURES = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()}
KNOWN_FEATURES: Dict[str, Type[Feature]] = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()}
KNOWN_FEATURES.update({F.__name__: F for F in capa.features.insn._Operand.__subclasses__()}) # type: ignore
def deserialize_feature(doc):

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import abc
import capa.render.utils
from capa.features.common import Feature
@@ -39,3 +40,51 @@ class Offset(Feature):
class Mnemonic(Feature):
def __init__(self, value: str, description=None):
super(Mnemonic, self).__init__(value, description=description)
MAX_OPERAND_INDEX = 3
class _Operand(Feature, abc.ABC):
# superclass: don't use directly
# subclasses should set self.name and provide the value string formatter
def __init__(self, index: int, value: int, description=None):
super(_Operand, self).__init__(value, description=description)
self.index = index
def __hash__(self):
return hash((self.name, self.value, self.bitness))
def __eq__(self, other):
return super().__eq__(other) and self.index == other.index
def freeze_serialize(self):
return (self.__class__.__name__, [self.index, self.value])
class OperandNumber(_Operand):
# cached names so we don't do extra string formatting every ctor
NAMES = ["operand[%d].number" % i for i in range(MAX_OPERAND_INDEX)]
# operand[i].number: 0x12
def __init__(self, index: int, value: int, description=None):
super(OperandNumber, self).__init__(index, value, description=description)
self.name = self.NAMES[index]
def get_value_str(self) -> str:
assert isinstance(self.value, int)
return capa.render.utils.hex(self.value)
class OperandOffset(_Operand):
# cached names so we don't do extra string formatting every ctor
NAMES = ["operand[%d].offset" % i for i in range(MAX_OPERAND_INDEX)]
# operand[i].offset: 0x12
def __init__(self, index: int, value: int, description=None):
super(OperandOffset, self).__init__(index, value, description=description)
self.name = self.NAMES[index]
def get_value_str(self) -> str:
assert isinstance(self.value, int)
return capa.render.utils.hex(self.value)

View File

@@ -45,7 +45,7 @@ import capa.features.extractors.elffile
from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import get_file_taste
from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
@@ -85,45 +85,109 @@ def set_vivisect_log_level(level):
logging.getLogger("envi.codeflow").setLevel(level)
def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle):
# contains features from:
# - insns
# - function
def find_instruction_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Tuple[FeatureSet, MatchResults]:
"""
find matches for the given rules for the given instruction.
returns: tuple containing (features for instruction, match results for instruction)
"""
# all features found for the instruction.
features = collections.defaultdict(set) # type: FeatureSet
for feature, va in itertools.chain(
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
):
features[feature].add(va)
# matches found at this instruction.
_, matches = ruleset.match(Scope.INSTRUCTION, features, int(insn))
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return features, matches
def find_basic_block_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
"""
find matches for the given rules within the given basic block.
returns: tuple containing (features for basic block, match results for basic block, match results for instructions)
"""
# all features found within this basic block,
# includes features found within instructions.
features = collections.defaultdict(set) # type: FeatureSet
# matches found at the instruction scope.
# might be found at different instructions, thats ok.
insn_matches = collections.defaultdict(list) # type: MatchResults
for insn in extractor.get_instructions(f, bb):
ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
for feature, vas in ifeatures.items():
features[feature].update(vas)
for rule_name, res in imatches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
):
features[feature].add(va)
# matches found within this basic block.
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, int(bb))
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return features, matches, insn_matches
def find_code_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given function.
returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features)
"""
# all features found within this function,
# includes features found within basic blocks (and instructions).
function_features = collections.defaultdict(set) # type: FeatureSet
# matches found at the basic block scope.
# might be found at different basic blocks, thats ok.
bb_matches = collections.defaultdict(list) # type: MatchResults
# matches found at the instruction scope.
# might be found at different instructions, thats ok.
insn_matches = collections.defaultdict(list) # type: MatchResults
for bb in extractor.get_basic_blocks(f):
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, f, bb)
for feature, vas in features.items():
function_features[feature].update(vas)
for rule_name, res in bmatches.items():
bb_matches[rule_name].extend(res)
for rule_name, res in imatches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
function_features[feature].add(va)
for bb in extractor.get_basic_blocks(f):
# contains features from:
# - insns
# - basic blocks
bb_features = collections.defaultdict(set)
for feature, va in itertools.chain(
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
):
bb_features[feature].add(va)
function_features[feature].add(va)
for insn in extractor.get_instructions(f, bb):
for feature, va in itertools.chain(
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
):
bb_features[feature].add(va)
function_features[feature].add(va)
_, matches = ruleset.match(Scope.BASIC_BLOCK, bb_features, int(bb))
for rule_name, res in matches.items():
bb_matches[rule_name].extend(res)
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(function_features, rule, [va])
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, int(f))
return function_matches, bb_matches, len(function_features)
return function_matches, bb_matches, insn_matches, len(function_features)
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
@@ -150,6 +214,7 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults
all_bb_matches = collections.defaultdict(list) # type: MatchResults
all_insn_matches = collections.defaultdict(list) # type: MatchResults
meta = {
"feature_counts": {
@@ -182,7 +247,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
continue
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][function_address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)
@@ -190,11 +255,15 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in insn_matches.items():
all_insn_matches[rule_name].extend(res)
# collection of features that captures the rule matches within function and BB scopes.
# collection of features that captures the rule matches within function, BB, and instruction scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()):
for rule_name, results in itertools.chain(
all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
):
locations = set(map(lambda p: p[0], results))
rule = ruleset[rule_name]
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
@@ -208,6 +277,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_insn_matches.items(),
all_bb_matches.items(),
all_function_matches.items(),
all_file_matches.items(),

View File

@@ -74,14 +74,24 @@ class Scope(str, Enum):
FILE = "file"
FUNCTION = "function"
BASIC_BLOCK = "basic block"
INSTRUCTION = "instruction"
FILE_SCOPE = Scope.FILE.value
FUNCTION_SCOPE = Scope.FUNCTION.value
BASIC_BLOCK_SCOPE = Scope.BASIC_BLOCK.value
INSTRUCTION_SCOPE = Scope.INSTRUCTION.value
# used only to specify supported features per scope.
# not used to validate rules.
GLOBAL_SCOPE = "global"
SUPPORTED_FEATURES = {
SUPPORTED_FEATURES: Dict[str, Set] = {
GLOBAL_SCOPE: {
# these will be added to other scopes, see below.
capa.features.common.OS,
capa.features.common.Arch,
},
FILE_SCOPE: {
capa.features.common.MatchedRule,
capa.features.file.Export,
@@ -91,20 +101,23 @@ SUPPORTED_FEATURES = {
capa.features.common.Characteristic("embedded pe"),
capa.features.common.String,
capa.features.common.Format,
capa.features.common.OS,
capa.features.common.Arch,
},
FUNCTION_SCOPE: {
# plus basic block scope features, see below
capa.features.common.MatchedRule,
capa.features.basicblock.BasicBlock,
capa.features.common.Characteristic("calls from"),
capa.features.common.Characteristic("calls to"),
capa.features.common.Characteristic("loop"),
capa.features.common.Characteristic("recursive call"),
capa.features.common.OS,
capa.features.common.Arch,
# plus basic block scope features, see below
},
BASIC_BLOCK_SCOPE: {
capa.features.common.MatchedRule,
capa.features.common.Characteristic("tight loop"),
capa.features.common.Characteristic("stack string"),
# plus instruction scope features, see below
},
INSTRUCTION_SCOPE: {
capa.features.common.MatchedRule,
capa.features.insn.API,
capa.features.insn.Number,
@@ -112,20 +125,26 @@ SUPPORTED_FEATURES = {
capa.features.common.Bytes,
capa.features.insn.Offset,
capa.features.insn.Mnemonic,
capa.features.insn.OperandNumber,
capa.features.insn.OperandOffset,
capa.features.common.Characteristic("nzxor"),
capa.features.common.Characteristic("peb access"),
capa.features.common.Characteristic("fs access"),
capa.features.common.Characteristic("gs access"),
capa.features.common.Characteristic("cross section flow"),
capa.features.common.Characteristic("tight loop"),
capa.features.common.Characteristic("stack string"),
capa.features.common.Characteristic("indirect call"),
capa.features.common.Characteristic("call $+5"),
capa.features.common.OS,
capa.features.common.Arch,
capa.features.common.Characteristic("cross section flow"),
},
}
# global scope features are available in all other scopes
SUPPORTED_FEATURES[INSTRUCTION_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[FILE_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
# all instruction scope features are also basic block features
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[INSTRUCTION_SCOPE])
# all basic block scope features are also function scope features
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE])
@@ -341,7 +360,14 @@ def parse_description(s: Union[str, int, bytes], value_type: str, description=No
# the string "10" that needs to become the number 10.
if value_type == "bytes":
value = parse_bytes(value)
elif value_type in ("number", "offset") or value_type.startswith(("number/", "offset/")):
elif (
value_type in ("number", "offset")
or value_type.startswith(("number/", "offset/"))
or (
value_type.startswith("operand[")
and (value_type.endswith("].number") or value_type.endswith("].offset"))
)
):
try:
value = parse_int(value)
except ValueError:
@@ -419,7 +445,7 @@ def build_statements(d, scope: str):
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE))
return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE), description=description)
elif key == "basic block":
if scope != FUNCTION_SCOPE:
@@ -428,7 +454,30 @@ def build_statements(d, scope: str):
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE))
return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE), description=description)
elif key == "instruction":
if scope not in (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE):
raise InvalidRule("instruction subscope supported only for function and basic block scope")
if len(d[key]) == 1:
statements = build_statements(d[key][0], INSTRUCTION_SCOPE)
else:
# for instruction subscopes, we support a shorthand in which the top level AND is implied.
# the following are equivalent:
#
# - instruction:
# - and:
# - arch: i386
# - mnemonic: cmp
#
# - instruction:
# - arch: i386
# - mnemonic: cmp
#
statements = ceng.And([build_statements(dd, INSTRUCTION_SCOPE) for dd in d[key]])
return ceng.Subscope(INSTRUCTION_SCOPE, statements, description=description)
elif key.startswith("count(") and key.endswith(")"):
# e.g.:
@@ -485,6 +534,37 @@ def build_statements(d, scope: str):
raise InvalidRule("unexpected range: %s" % (count))
elif key == "string" and not isinstance(d[key], str):
raise InvalidRule("ambiguous string value %s, must be defined as explicit string" % d[key])
elif key.startswith("operand[") and key.endswith("].number"):
index = key[len("operand[") : -len("].number")]
try:
index = int(index)
except ValueError:
raise InvalidRule("operand index must be an integer")
value, description = parse_description(d[key], key, d.get("description"))
try:
feature = capa.features.insn.OperandNumber(index, value, description=description)
except ValueError as e:
raise InvalidRule(str(e))
ensure_feature_valid_for_scope(scope, feature)
return feature
elif key.startswith("operand[") and key.endswith("].offset"):
index = key[len("operand[") : -len("].offset")]
try:
index = int(index)
except ValueError:
raise InvalidRule("operand index must be an integer")
value, description = parse_description(d[key], key, d.get("description"))
try:
feature = capa.features.insn.OperandOffset(index, value, description=description)
except ValueError as e:
raise InvalidRule(str(e))
ensure_feature_valid_for_scope(scope, feature)
return feature
elif (
(key == "os" and d[key] not in capa.features.common.VALID_OS)
or (key == "format" and d[key] not in capa.features.common.VALID_FORMAT)
@@ -978,6 +1058,7 @@ class RuleSet:
self.file_rules = self._get_rules_for_scope(rules, FILE_SCOPE)
self.function_rules = self._get_rules_for_scope(rules, FUNCTION_SCOPE)
self.basic_block_rules = self._get_rules_for_scope(rules, BASIC_BLOCK_SCOPE)
self.instruction_rules = self._get_rules_for_scope(rules, INSTRUCTION_SCOPE)
self.rules = {rule.name: rule for rule in rules}
self.rules_by_namespace = index_rules_by_namespace(rules)
@@ -989,6 +1070,9 @@ class RuleSet:
(self._easy_basic_block_rules_by_feature, self._hard_basic_block_rules) = self._index_rules_by_feature(
self.basic_block_rules
)
(self._easy_instruction_rules_by_feature, self._hard_instruction_rules) = self._index_rules_by_feature(
self.instruction_rules
)
def __len__(self):
return len(self.rules)
@@ -1014,6 +1098,9 @@ class RuleSet:
at this time, a rule evaluator can't do anything special with
the "hard rules". it must still do a full top-down match of each
rule, in topological order.
this does not index global features, because these are not selective, and
won't be used as the sole feature used to match.
"""
# we'll do a couple phases:
@@ -1052,9 +1139,21 @@ class RuleSet:
# hard feature: requires scan or match lookup
rules_with_hard_features.add(rule_name)
elif isinstance(node, capa.features.common.Feature):
# easy feature: hash lookup
rules_with_easy_features.add(rule_name)
rules_by_feature[node].add(rule_name)
if capa.features.common.is_global_feature(node):
# we don't want to index global features
# because they're not very selective.
#
# they're global, so if they match at one location in a file,
# they'll match at every location in a file.
# so thats not helpful to decide how to downselect.
#
# and, a global rule will never be the sole selector in a rule.
# TODO: probably want a lint for this.
pass
else:
# easy feature: hash lookup
rules_with_easy_features.add(rule_name)
rules_by_feature[node].add(rule_name)
elif isinstance(node, (ceng.Not)):
# `not:` statements are tricky to deal with.
#
@@ -1214,6 +1313,9 @@ class RuleSet:
elif scope is Scope.BASIC_BLOCK:
easy_rules_by_feature = self._easy_basic_block_rules_by_feature
hard_rule_names = self._hard_basic_block_rules
elif scope is Scope.INSTRUCTION:
easy_rules_by_feature = self._easy_instruction_rules_by_feature
hard_rule_names = self._hard_instruction_rules
else:
assert_never(scope)

View File

@@ -311,7 +311,7 @@ def convert_rule(rule, rulename, cround, depth):
return yara_strings, yara_condition
############################## end def do_statement
# end: def do_statement
yara_strings_list = []
yara_condition_list = []
@@ -390,7 +390,9 @@ def convert_rule(rule, rulename, cround, depth):
logger.info("kid coming: " + repr(kid.name))
# logger.info("grandchildren: " + repr(kid.children))
##### here we go into RECURSION ##################################################################################
#
# here we go into RECURSION
#
yara_strings_sub, yara_condition_sub, rule_comment_sub, incomplete_sub = convert_rule(
kid, rulename, cround, depth
)
@@ -496,9 +498,7 @@ def convert_rule(rule, rulename, cround, depth):
yara_condition = "\n\t" + yara_condition_list[0]
logger.info(
f"################# end of convert_rule() #strings: {len(yara_strings_list)} #conditions: {len(yara_condition_list)}"
)
logger.info(f"# end of convert_rule() #strings: {len(yara_strings_list)} #conditions: {len(yara_condition_list)}")
logger.info(f"strings: {yara_strings} conditions: {yara_condition}")
return yara_strings, yara_condition, rule_comment, incomplete
@@ -617,7 +617,7 @@ def convert_rules(rules, namespaces, cround):
# examples in capa can contain the same hash several times with different offset, so check if it's already there:
# (keeping the offset might be interessting for some but breaks yara-ci for checking of the final rules
if not value in seen_hashes:
if value not in seen_hashes:
yara_meta += "\t" + meta_name + ' = "' + value + '"\n'
seen_hashes.append(value)

View File

@@ -247,7 +247,7 @@ class InvalidAttckOrMbcTechnique(Lint):
self.enabled_frameworks = []
# This regex matches the format defined in the recommendation attribute
self.reg = re.compile("^([\w\s-]+)::(.+) \[([A-Za-z0-9.]+)\]$")
self.reg = re.compile(r"^([\w\s-]+)::(.+) \[([A-Za-z0-9.]+)\]$")
def _entry_check(self, framework, category, entry, eid):
if category not in self.data[framework].keys():

21
setup.cfg Normal file
View File

@@ -0,0 +1,21 @@
[bdist_wheel]
universal = 1
[aliases]
test = pytest
[pycodestyle]
# the following suppress lints that conflict with the project's style:
#
# E203 Whitespace before :
# E302 expected 2 blank lines, found 1
# E402 module level import not at top of file
# E501 line too long (209 > 180 characters)
# E712 comparison to False should be 'if cond is False:' or 'if not cond:'
# E722 do not use bare 'except'
# E731 do not assign a lambda expression, use a def
# W291 trailing whitespace
# W503 line break before binary operator
ignore = E203, E302, E402, E501, E712, E722, E731, W291, W503
max-line-length = 180
statistics = True

View File

@@ -418,6 +418,12 @@ FEATURE_PRESENCE_TESTS = sorted(
("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("xor"), True),
("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("in"), False),
("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("out"), False),
# insn/operand.number
("mimikatz", "function=0x40105D,bb=0x401073", capa.features.insn.OperandNumber(1, 0xFF), True),
("mimikatz", "function=0x40105D,bb=0x401073", capa.features.insn.OperandNumber(0, 0xFF), False),
# insn/operand.offset
("mimikatz", "function=0x40105D,bb=0x4010B0", capa.features.insn.OperandOffset(0, 4), True),
("mimikatz", "function=0x40105D,bb=0x4010B0", capa.features.insn.OperandOffset(1, 4), False),
# insn/number
("mimikatz", "function=0x40105D", capa.features.insn.Number(0xFF), True),
("mimikatz", "function=0x40105D", capa.features.insn.Number(0x3136B0), True),

View File

@@ -326,6 +326,62 @@ def test_count_bb(z9324d_extractor):
assert "count bb" in capabilities
def test_instruction_scope(z9324d_extractor):
# .text:004071A4 68 E8 03 00 00 push 3E8h
rules = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: push 1000
namespace: test
scope: instruction
features:
- and:
- mnemonic: push
- number: 1000
"""
)
)
]
)
capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor)
assert "push 1000" in capabilities
assert 0x4071A4 in set(map(lambda result: result[0], capabilities["push 1000"]))
def test_instruction_subscope(z9324d_extractor):
# .text:00406F60 sub_406F60 proc near
# [...]
# .text:004071A4 68 E8 03 00 00 push 3E8h
rules = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: push 1000 on i386
namespace: test
scope: function
features:
- and:
- arch: i386
- instruction:
- mnemonic: push
- number: 1000
"""
)
)
]
)
capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor)
assert "push 1000 on i386" in capabilities
assert 0x406F60 in set(map(lambda result: result[0], capabilities["push 1000 on i386"]))
def test_fix262(pma16_01_extractor, capsys):
# tests rules can be loaded successfully and all output modes
path = pma16_01_extractor.path

View File

@@ -531,3 +531,57 @@ def test_match_not_not():
_, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0)
assert "test rule" in matches
def test_match_operand_number():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- and:
- operand[0].number: 0x10
"""
)
r = capa.rules.Rule.from_yaml(rule)
assert capa.features.insn.OperandNumber(0, 0x10) in {capa.features.insn.OperandNumber(0, 0x10)}
_, matches = match([r], {capa.features.insn.OperandNumber(0, 0x10): {1, 2}}, 0x0)
assert "test rule" in matches
# mismatching index
_, matches = match([r], {capa.features.insn.OperandNumber(1, 0x10): {1, 2}}, 0x0)
assert "test rule" not in matches
# mismatching value
_, matches = match([r], {capa.features.insn.OperandNumber(0, 0x11): {1, 2}}, 0x0)
assert "test rule" not in matches
def test_match_operand_offset():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- and:
- operand[0].offset: 0x10
"""
)
r = capa.rules.Rule.from_yaml(rule)
assert capa.features.insn.OperandOffset(0, 0x10) in {capa.features.insn.OperandOffset(0, 0x10)}
_, matches = match([r], {capa.features.insn.OperandOffset(0, 0x10): {1, 2}}, 0x0)
assert "test rule" in matches
# mismatching index
_, matches = match([r], {capa.features.insn.OperandOffset(1, 0x10): {1, 2}}, 0x0)
assert "test rule" not in matches
# mismatching value
_, matches = match([r], {capa.features.insn.OperandOffset(0, 0x11): {1, 2}}, 0x0)
assert "test rule" not in matches

View File

@@ -0,0 +1,133 @@
# Copyright (C) 2022 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import textwrap
import pytest
import capa.rules
def test_rule_scope_instruction():
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: instruction
features:
- and:
- mnemonic: mov
- arch: i386
- os: windows
"""
)
)
with pytest.raises(capa.rules.InvalidRule):
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: instruction
features:
- characteristic: embedded pe
"""
)
)
def test_rule_subscope_instruction():
rules = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: function
features:
- and:
- instruction:
- and:
- mnemonic: mov
- arch: i386
- os: windows
"""
)
)
]
)
# the function rule scope will have one rules:
# - `test rule`
assert len(rules.function_rules) == 1
# the insn rule scope have one rule:
# - the rule on which `test rule` depends
assert len(rules.instruction_rules) == 1
def test_scope_instruction_implied_and():
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: function
features:
- and:
- instruction:
- mnemonic: mov
- arch: i386
- os: windows
"""
)
)
def test_scope_instruction_description():
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: function
features:
- and:
- instruction:
- description: foo
- mnemonic: mov
- arch: i386
- os: windows
"""
)
)
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scope: function
features:
- and:
- instruction:
- description: foo
- mnemonic: mov
- arch: i386
- os: windows
"""
)
)

View File

@@ -22,6 +22,14 @@ def test_smda_features(sample, scope, feature, expected):
if scope.__name__ == "file" and isinstance(feature, capa.features.file.FunctionName) and expected is True:
pytest.xfail("SMDA has no function ID")
if "bb=" in scope.__name__ and isinstance(feature, capa.features.insn.OperandNumber) and expected is True:
# SMDA not currently maintained, see: https://github.com/mandiant/capa/issues/937
pytest.xfail("SMDA doesn't support operand numbers")
if "bb=" in scope.__name__ and isinstance(feature, capa.features.insn.OperandOffset) and expected is True:
# SMDA not currently maintained, see: https://github.com/mandiant/capa/issues/937
pytest.xfail("SMDA doesn't support operand offsets")
fixtures.do_test_feature_presence(fixtures.get_smda_extractor, sample, scope, feature, expected)