Merge branch 'fix-622' into feature-447

This commit is contained in:
William Ballenthin
2021-06-09 22:41:10 -06:00
59 changed files with 1540 additions and 1522 deletions

View File

@@ -128,6 +128,7 @@ It includes many new rules, including all new techniques introduced in MITRE ATT
- json: breaking change: record all matching strings for regex #159 @williballenthin
- main: implement file limitations via rules not code #390 @williballenthin
- json: breaking change: correctly render negative offsets #619 @williballenthin
- library: breaking change: remove logic from `__init__.py` throughout #622 @williballenthin
### Development

View File

@@ -9,7 +9,7 @@
import copy
import collections
import capa.features
import capa.features.common
class Statement(object):
@@ -199,37 +199,6 @@ class Subscope(Statement):
raise ValueError("cannot evaluate a subscope directly!")
def topologically_order_rules(rules):
"""
order the given rules such that dependencies show up before dependents.
this means that as we match rules, we can add features for the matches, and these
will be matched by subsequent rules if they follow this order.
assumes that the rule dependency graph is a DAG.
"""
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = capa.rules.index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
seen = set([])
ret = []
def rec(rule):
if rule.name in seen:
return
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
ret.append(rule)
seen.add(rule.name)
for rule in rules.values():
rec(rule)
return ret
def match(rules, features, va):
"""
Args:
@@ -254,12 +223,12 @@ def match(rules, features, va):
res = rule.evaluate(features)
if res:
results[rule.name].append((va, res))
features[capa.features.MatchedRule(rule.name)].add(va)
features[capa.features.common.MatchedRule(rule.name)].add(va)
namespace = rule.meta.get("namespace")
if namespace:
while namespace:
features[capa.features.MatchedRule(namespace)].add(va)
features[capa.features.common.MatchedRule(namespace)].add(va)
namespace, _, _ = namespace.rpartition("/")
return (features, results)

View File

@@ -1,254 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import re
import codecs
import logging
import collections
import capa.engine
import capa.features
logger = logging.getLogger(__name__)
MAX_BYTES_FEATURE_SIZE = 0x100
# thunks may be chained so we specify a delta to control the depth to which these chains are explored
THUNK_CHAIN_DEPTH_DELTA = 5
# identifiers for supported architectures names that tweak a feature
# for example, offset/x32
ARCH_X32 = "x32"
ARCH_X64 = "x64"
VALID_ARCH = (ARCH_X32, ARCH_X64)
def bytes_to_str(b):
return str(codecs.encode(b, "hex").decode("utf-8"))
def hex_string(h):
"""render hex string e.g. "0a40b1" as "0A 40 B1" """
return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper()
def escape_string(s):
"""escape special characters"""
s = repr(s)
if not s.startswith(('"', "'")):
# u'hello\r\nworld' -> hello\\r\\nworld
s = s[2:-1]
else:
# 'hello\r\nworld' -> hello\\r\\nworld
s = s[1:-1]
s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove
s = s.replace('"', '\\"') # repr() does not escape '"', add
return s
class Feature(object):
def __init__(self, value, arch=None, description=None):
"""
Args:
value (any): the value of the feature, such as the number or string.
arch (str): one of the VALID_ARCH values, or None.
When None, then the feature applies to any architecture.
Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`.
description (str): a human-readable description that explains the feature value.
"""
super(Feature, self).__init__()
if arch is not None:
if arch not in VALID_ARCH:
raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH))
self.name = self.__class__.__name__.lower() + "/" + arch
else:
self.name = self.__class__.__name__.lower()
self.value = value
self.arch = arch
self.description = description
def __hash__(self):
return hash((self.name, self.value, self.arch))
def __eq__(self, other):
return self.name == other.name and self.value == other.value and self.arch == other.arch
def get_value_str(self):
"""
render the value of this feature, for use by `__str__` and friends.
subclasses should override to customize the rendering.
Returns: any
"""
return self.value
def __str__(self):
if self.value is not None:
if self.description:
return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description)
else:
return "%s(%s)" % (self.name, self.get_value_str())
else:
return "%s" % self.name
def __repr__(self):
return str(self)
def evaluate(self, ctx):
return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, []))
def freeze_serialize(self):
if self.arch is not None:
return (self.__class__.__name__, [self.value, {"arch": self.arch}])
else:
return (self.__class__.__name__, [self.value])
@classmethod
def freeze_deserialize(cls, args):
# as you can see below in code,
# if the last argument is a dictionary,
# consider it to be kwargs passed to the feature constructor.
if len(args) == 1:
return cls(*args)
elif isinstance(args[-1], dict):
kwargs = args[-1]
args = args[:-1]
return cls(*args, **kwargs)
class MatchedRule(Feature):
def __init__(self, value, description=None):
super(MatchedRule, self).__init__(value, description=description)
self.name = "match"
class Characteristic(Feature):
def __init__(self, value, description=None):
super(Characteristic, self).__init__(value, description=description)
class String(Feature):
def __init__(self, value, description=None):
super(String, self).__init__(value, description=description)
class Regex(String):
def __init__(self, value, description=None):
super(Regex, self).__init__(value, description=description)
pat = self.value[len("/") : -len("/")]
flags = re.DOTALL
if value.endswith("/i"):
pat = self.value[len("/") : -len("/i")]
flags |= re.IGNORECASE
try:
self.re = re.compile(pat, flags)
except re.error:
if value.endswith("/i"):
value = value[: -len("i")]
raise ValueError(
"invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value
)
def evaluate(self, ctx):
# mapping from string value to list of locations.
# will unique the locations later on.
matches = collections.defaultdict(list)
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.String,)):
continue
# `re.search` finds a match anywhere in the given string
# which implies leading and/or trailing whitespace.
# using this mode cleans is more convenient for rule authors,
# so that they don't have to prefix/suffix their terms like: /.*foo.*/.
if self.re.search(feature.value):
matches[feature.value].extend(locations)
if matches:
# finalize: defaultdict -> dict
# which makes json serialization easier
matches = dict(matches)
# collect all locations
locations = set()
for s in matches.keys():
matches[s] = list(set(matches[s]))
locations.update(matches[s])
# unlike other features, we cannot return put a reference to `self` directly in a `Result`.
# this is because `self` may match on many strings, so we can't stuff the matched value into it.
# instead, return a new instance that has a reference to both the regex and the matched values.
# see #262.
return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations)
else:
return capa.engine.Result(False, _MatchedRegex(self, None), [])
def __str__(self):
return "regex(string =~ %s)" % self.value
class _MatchedRegex(Regex):
"""
this represents specific match instances of a regular expression feature.
treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched.
note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API.
"""
def __init__(self, regex, matches):
"""
args:
regex (Regex): the regex feature that matches.
match (Dict[string, List[int]]|None): mapping from matching string to its locations.
"""
super(_MatchedRegex, self).__init__(regex.value, description=regex.description)
# we want this to collide with the name of `Regex` above,
# so that it works nicely with the renderers.
self.name = "regex"
# this may be None if the regex doesn't match
self.matches = matches
def __str__(self):
return "regex(string =~ %s, matches = %s)" % (
self.value,
", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())),
)
class StringFactory(object):
def __new__(cls, value, description=None):
if value.startswith("/") and (value.endswith("/") or value.endswith("/i")):
return Regex(value, description=description)
return String(value, description=description)
class Bytes(Feature):
def __init__(self, value, description=None):
super(Bytes, self).__init__(value, description=description)
def evaluate(self, ctx):
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.Bytes,)):
continue
if feature.value.startswith(self.value):
return capa.engine.Result(True, self, [], locations=locations)
return capa.engine.Result(False, self, [])
def get_value_str(self):
return hex_string(bytes_to_str(self.value))
def freeze_serialize(self):
return (self.__class__.__name__, [bytes_to_str(self.value).upper()])
@classmethod
def freeze_deserialize(cls, args):
return cls(*[codecs.decode(x, "hex") for x in args])

View File

@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from capa.features import Feature
from capa.features.common import Feature
class BasicBlock(Feature):

254
capa/features/common.py Normal file
View File

@@ -0,0 +1,254 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import re
import codecs
import logging
import collections
import capa.engine
import capa.features.common
logger = logging.getLogger(__name__)
MAX_BYTES_FEATURE_SIZE = 0x100
# thunks may be chained so we specify a delta to control the depth to which these chains are explored
THUNK_CHAIN_DEPTH_DELTA = 5
# identifiers for supported architectures names that tweak a feature
# for example, offset/x32
ARCH_X32 = "x32"
ARCH_X64 = "x64"
VALID_ARCH = (ARCH_X32, ARCH_X64)
def bytes_to_str(b):
return str(codecs.encode(b, "hex").decode("utf-8"))
def hex_string(h):
"""render hex string e.g. "0a40b1" as "0A 40 B1" """
return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper()
def escape_string(s):
"""escape special characters"""
s = repr(s)
if not s.startswith(('"', "'")):
# u'hello\r\nworld' -> hello\\r\\nworld
s = s[2:-1]
else:
# 'hello\r\nworld' -> hello\\r\\nworld
s = s[1:-1]
s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove
s = s.replace('"', '\\"') # repr() does not escape '"', add
return s
class Feature(object):
def __init__(self, value, arch=None, description=None):
"""
Args:
value (any): the value of the feature, such as the number or string.
arch (str): one of the VALID_ARCH values, or None.
When None, then the feature applies to any architecture.
Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`.
description (str): a human-readable description that explains the feature value.
"""
super(Feature, self).__init__()
if arch is not None:
if arch not in VALID_ARCH:
raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH))
self.name = self.__class__.__name__.lower() + "/" + arch
else:
self.name = self.__class__.__name__.lower()
self.value = value
self.arch = arch
self.description = description
def __hash__(self):
return hash((self.name, self.value, self.arch))
def __eq__(self, other):
return self.name == other.name and self.value == other.value and self.arch == other.arch
def get_value_str(self):
"""
render the value of this feature, for use by `__str__` and friends.
subclasses should override to customize the rendering.
Returns: any
"""
return self.value
def __str__(self):
if self.value is not None:
if self.description:
return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description)
else:
return "%s(%s)" % (self.name, self.get_value_str())
else:
return "%s" % self.name
def __repr__(self):
return str(self)
def evaluate(self, ctx):
return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, []))
def freeze_serialize(self):
if self.arch is not None:
return (self.__class__.__name__, [self.value, {"arch": self.arch}])
else:
return (self.__class__.__name__, [self.value])
@classmethod
def freeze_deserialize(cls, args):
# as you can see below in code,
# if the last argument is a dictionary,
# consider it to be kwargs passed to the feature constructor.
if len(args) == 1:
return cls(*args)
elif isinstance(args[-1], dict):
kwargs = args[-1]
args = args[:-1]
return cls(*args, **kwargs)
class MatchedRule(Feature):
def __init__(self, value, description=None):
super(MatchedRule, self).__init__(value, description=description)
self.name = "match"
class Characteristic(Feature):
def __init__(self, value, description=None):
super(Characteristic, self).__init__(value, description=description)
class String(Feature):
def __init__(self, value, description=None):
super(String, self).__init__(value, description=description)
class Regex(String):
def __init__(self, value, description=None):
super(Regex, self).__init__(value, description=description)
pat = self.value[len("/") : -len("/")]
flags = re.DOTALL
if value.endswith("/i"):
pat = self.value[len("/") : -len("/i")]
flags |= re.IGNORECASE
try:
self.re = re.compile(pat, flags)
except re.error:
if value.endswith("/i"):
value = value[: -len("i")]
raise ValueError(
"invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value
)
def evaluate(self, ctx):
# mapping from string value to list of locations.
# will unique the locations later on.
matches = collections.defaultdict(list)
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.common.String,)):
continue
# `re.search` finds a match anywhere in the given string
# which implies leading and/or trailing whitespace.
# using this mode cleans is more convenient for rule authors,
# so that they don't have to prefix/suffix their terms like: /.*foo.*/.
if self.re.search(feature.value):
matches[feature.value].extend(locations)
if matches:
# finalize: defaultdict -> dict
# which makes json serialization easier
matches = dict(matches)
# collect all locations
locations = set()
for s in matches.keys():
matches[s] = list(set(matches[s]))
locations.update(matches[s])
# unlike other features, we cannot return put a reference to `self` directly in a `Result`.
# this is because `self` may match on many strings, so we can't stuff the matched value into it.
# instead, return a new instance that has a reference to both the regex and the matched values.
# see #262.
return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations)
else:
return capa.engine.Result(False, _MatchedRegex(self, None), [])
def __str__(self):
return "regex(string =~ %s)" % self.value
class _MatchedRegex(Regex):
"""
this represents specific match instances of a regular expression feature.
treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched.
note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API.
"""
def __init__(self, regex, matches):
"""
args:
regex (Regex): the regex feature that matches.
match (Dict[string, List[int]]|None): mapping from matching string to its locations.
"""
super(_MatchedRegex, self).__init__(regex.value, description=regex.description)
# we want this to collide with the name of `Regex` above,
# so that it works nicely with the renderers.
self.name = "regex"
# this may be None if the regex doesn't match
self.matches = matches
def __str__(self):
return "regex(string =~ %s, matches = %s)" % (
self.value,
", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())),
)
class StringFactory(object):
def __new__(cls, value, description=None):
if value.startswith("/") and (value.endswith("/") or value.endswith("/i")):
return Regex(value, description=description)
return String(value, description=description)
class Bytes(Feature):
def __init__(self, value, description=None):
super(Bytes, self).__init__(value, description=description)
def evaluate(self, ctx):
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.common.Bytes,)):
continue
if feature.value.startswith(self.value):
return capa.engine.Result(True, self, [], locations=locations)
return capa.engine.Result(False, self, [])
def get_value_str(self):
return hex_string(bytes_to_str(self.value))
def freeze_serialize(self):
return (self.__class__.__name__, [bytes_to_str(self.value).upper()])
@classmethod
def freeze_deserialize(cls, args):
return cls(*[codecs.decode(x, "hex") for x in args])

View File

@@ -1,321 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import abc
class FeatureExtractor(object):
"""
FeatureExtractor defines the interface for fetching features from a sample.
There may be multiple backends that support fetching features for capa.
For example, we use vivisect by default, but also want to support saving
and restoring features from a JSON file.
When we restore the features, we'd like to use exactly the same matching logic
to find matching rules.
Therefore, we can define a FeatureExtractor that provides features from the
serialized JSON file and do matching without a binary analysis pass.
Also, this provides a way to hook in an IDA backend.
This class is not instantiated directly; it is the base class for other implementations.
"""
__metaclass__ = abc.ABCMeta
def __init__(self):
#
# note: a subclass should define ctor parameters for its own use.
# for example, the Vivisect feature extract might require the vw and/or path.
# this base class doesn't know what to do with that info, though.
#
super(FeatureExtractor, self).__init__()
@abc.abstractmethod
def get_base_address(self):
"""
fetch the preferred load address at which the sample was analyzed.
returns: int
"""
raise NotImplemented
@abc.abstractmethod
def extract_file_features(self):
"""
extract file-scope features.
example::
extractor = VivisectFeatureExtractor(vw, path)
for feature, va in extractor.get_file_features():
print('0x%x: %s', va, feature)
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_functions(self):
"""
enumerate the functions and provide opaque values that will
subsequently be provided to `.extract_function_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_function_features()`.
the opaque value should support casting to int (`__int__`) for the function start address.
yields:
any: the opaque function value.
"""
raise NotImplemented
def is_library_function(self, va):
"""
is the given address a library function?
the backend may implement its own function matching algorithm, or none at all.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
this information is used to:
- filter out matches in library functions (by default), and
- recognize when to fetch symbol names for called (non-API) functions
args:
va (int): the virtual address of a function.
returns:
bool: True if the given address is the start of a library function.
"""
return False
def get_function_name(self, va):
"""
fetch any recognized name for the given address.
this is only guaranteed to return a value when the given function is a recognized library function.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
args:
va (int): the virtual address of a function.
returns:
str: the function name
raises:
KeyError: when the given function does not have a name.
"""
raise KeyError(va)
@abc.abstractmethod
def extract_function_features(self, f):
"""
extract function-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for feature, va in extractor.extract_function_features(function):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_basic_blocks(self, f):
"""
enumerate the basic blocks in the given function and provide opaque values that will
subsequently be provided to `.extract_basic_block_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_basic_block_features()`.
the opaque value should support casting to int (`__int__`) for the basic block start address.
yields:
any: the opaque basic block value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_basic_block_features(self, f, bb):
"""
extract basic block-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for bb in extractor.get_basic_blocks(function):
for feature, va in extractor.extract_basic_block_features(function, bb):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_instructions(self, f, bb):
"""
enumerate the instructions in the given basic block and provide opaque values that will
subsequently be provided to `.extract_insn_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_insn_features()`.
the opaque value should support casting to int (`__int__`) for the instruction address.
yields:
any: the opaque function value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_insn_features(self, f, bb, insn):
"""
extract instruction-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for bb in extractor.get_basic_blocks(function):
for insn in extractor.get_instructions(function, bb):
for feature, va in extractor.extract_insn_features(function, bb, insn):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
insn [any]: an opaque value previously fetched from `.get_instructions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
class NullFeatureExtractor(FeatureExtractor):
"""
An extractor that extracts some user-provided features.
The structure of the single parameter is demonstrated in the example below.
This is useful for testing, as we can provide expected values and see if matching works.
Also, this is how we represent features deserialized from a freeze file.
example::
extractor = NullFeatureExtractor({
'base address: 0x401000,
'file features': [
(0x402345, capa.features.Characteristic('embedded pe')),
],
'functions': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('nzxor')),
],
'basic blocks': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('tight-loop')),
],
'instructions': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('nzxor')),
],
},
0x401002: ...
}
},
0x401005: ...
}
},
0x40200: ...
}
)
"""
def __init__(self, features):
super(NullFeatureExtractor, self).__init__()
self.features = features
def get_base_address(self):
return self.features["base address"]
def extract_file_features(self):
for p in self.features.get("file features", []):
va, feature = p
yield feature, va
def get_functions(self):
for va in sorted(self.features["functions"].keys()):
yield va
def extract_function_features(self, f):
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
va, feature = p
yield feature, va
def get_basic_blocks(self, f):
for va in sorted(
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.keys()
):
yield va
def extract_basic_block_features(self, f, bb):
for p in (
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("features", [])
):
va, feature = p
yield feature, va
def get_instructions(self, f, bb):
for va in sorted(
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("instructions", {})
.keys()
):
yield va
def extract_insn_features(self, f, bb, insn):
for p in (
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("instructions", {})
.get(insn, {})
.get("features", [])
):
va, feature = p
yield feature, va

View File

@@ -0,0 +1,321 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import abc
class FeatureExtractor(object):
"""
FeatureExtractor defines the interface for fetching features from a sample.
There may be multiple backends that support fetching features for capa.
For example, we use vivisect by default, but also want to support saving
and restoring features from a JSON file.
When we restore the features, we'd like to use exactly the same matching logic
to find matching rules.
Therefore, we can define a FeatureExtractor that provides features from the
serialized JSON file and do matching without a binary analysis pass.
Also, this provides a way to hook in an IDA backend.
This class is not instantiated directly; it is the base class for other implementations.
"""
__metaclass__ = abc.ABCMeta
def __init__(self):
#
# note: a subclass should define ctor parameters for its own use.
# for example, the Vivisect feature extract might require the vw and/or path.
# this base class doesn't know what to do with that info, though.
#
super(FeatureExtractor, self).__init__()
@abc.abstractmethod
def get_base_address(self):
"""
fetch the preferred load address at which the sample was analyzed.
returns: int
"""
raise NotImplemented
@abc.abstractmethod
def extract_file_features(self):
"""
extract file-scope features.
example::
extractor = VivisectFeatureExtractor(vw, path)
for feature, va in extractor.get_file_features():
print('0x%x: %s', va, feature)
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_functions(self):
"""
enumerate the functions and provide opaque values that will
subsequently be provided to `.extract_function_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_function_features()`.
the opaque value should support casting to int (`__int__`) for the function start address.
yields:
any: the opaque function value.
"""
raise NotImplemented
def is_library_function(self, va):
"""
is the given address a library function?
the backend may implement its own function matching algorithm, or none at all.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
this information is used to:
- filter out matches in library functions (by default), and
- recognize when to fetch symbol names for called (non-API) functions
args:
va (int): the virtual address of a function.
returns:
bool: True if the given address is the start of a library function.
"""
return False
def get_function_name(self, va):
"""
fetch any recognized name for the given address.
this is only guaranteed to return a value when the given function is a recognized library function.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
args:
va (int): the virtual address of a function.
returns:
str: the function name
raises:
KeyError: when the given function does not have a name.
"""
raise KeyError(va)
@abc.abstractmethod
def extract_function_features(self, f):
"""
extract function-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for feature, va in extractor.extract_function_features(function):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_basic_blocks(self, f):
"""
enumerate the basic blocks in the given function and provide opaque values that will
subsequently be provided to `.extract_basic_block_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_basic_block_features()`.
the opaque value should support casting to int (`__int__`) for the basic block start address.
yields:
any: the opaque basic block value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_basic_block_features(self, f, bb):
"""
extract basic block-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for bb in extractor.get_basic_blocks(function):
for feature, va in extractor.extract_basic_block_features(function, bb):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_instructions(self, f, bb):
"""
enumerate the instructions in the given basic block and provide opaque values that will
subsequently be provided to `.extract_insn_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_insn_features()`.
the opaque value should support casting to int (`__int__`) for the instruction address.
yields:
any: the opaque function value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_insn_features(self, f, bb, insn):
"""
extract instruction-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
example::
extractor = VivisectFeatureExtractor(vw, path)
for function in extractor.get_functions():
for bb in extractor.get_basic_blocks(function):
for insn in extractor.get_instructions(function, bb):
for feature, va in extractor.extract_insn_features(function, bb, insn):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
insn [any]: an opaque value previously fetched from `.get_instructions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
"""
raise NotImplemented
class NullFeatureExtractor(FeatureExtractor):
"""
An extractor that extracts some user-provided features.
The structure of the single parameter is demonstrated in the example below.
This is useful for testing, as we can provide expected values and see if matching works.
Also, this is how we represent features deserialized from a freeze file.
example::
extractor = NullFeatureExtractor({
'base address: 0x401000,
'file features': [
(0x402345, capa.features.Characteristic('embedded pe')),
],
'functions': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('nzxor')),
],
'basic blocks': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('tight-loop')),
],
'instructions': {
0x401000: {
'features': [
(0x401000, capa.features.Characteristic('nzxor')),
],
},
0x401002: ...
}
},
0x401005: ...
}
},
0x40200: ...
}
)
"""
def __init__(self, features):
super(NullFeatureExtractor, self).__init__()
self.features = features
def get_base_address(self):
return self.features["base address"]
def extract_file_features(self):
for p in self.features.get("file features", []):
va, feature = p
yield feature, va
def get_functions(self):
for va in sorted(self.features["functions"].keys()):
yield va
def extract_function_features(self, f):
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
va, feature = p
yield feature, va
def get_basic_blocks(self, f):
for va in sorted(
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.keys()
):
yield va
def extract_basic_block_features(self, f, bb):
for p in (
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("features", [])
):
va, feature = p
yield feature, va
def get_instructions(self, f, bb):
for va in sorted(
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("instructions", {})
.keys()
):
yield va
def extract_insn_features(self, f, bb, insn):
for p in (
self.features.get("functions", {}) # noqa: E127 line over-indented
.get(f, {})
.get("basic blocks", {})
.get(bb, {})
.get("instructions", {})
.get(insn, {})
.get("features", [])
):
va, feature = p
yield feature, va

View File

@@ -1,108 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import idaapi
import capa.features.extractors.ida.file
import capa.features.extractors.ida.insn
import capa.features.extractors.ida.function
import capa.features.extractors.ida.basicblock
from capa.features.extractors import FeatureExtractor
class FunctionHandle:
"""this acts like an idaapi.func_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class BasicBlockHandle:
"""this acts like an idaapi.BasicBlock but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class InstructionHandle:
"""this acts like an idaapi.insn_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.ea
def __getattr__(self, name):
return getattr(self._inner, name)
class IdaFeatureExtractor(FeatureExtractor):
def __init__(self):
super(IdaFeatureExtractor, self).__init__()
def get_base_address(self):
return idaapi.get_imagebase()
def extract_file_features(self):
for (feature, ea) in capa.features.extractors.ida.file.extract_features():
yield feature, ea
def get_functions(self):
import capa.features.extractors.ida.helpers as ida_helpers
# data structure shared across functions yielded here.
# useful for caching analysis relevant across a single workspace.
ctx = {}
# ignore library functions and thunk functions as identified by IDA
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
setattr(f, "ctx", ctx)
yield FunctionHandle(f)
@staticmethod
def get_function(ea):
f = idaapi.get_func(ea)
setattr(f, "ctx", {})
return FunctionHandle(f)
def extract_function_features(self, f):
for (feature, ea) in capa.features.extractors.ida.function.extract_features(f):
yield feature, ea
def get_basic_blocks(self, f):
import capa.features.extractors.ida.helpers as ida_helpers
for bb in ida_helpers.get_function_blocks(f):
yield BasicBlockHandle(bb)
def extract_basic_block_features(self, f, bb):
for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb):
yield feature, ea
def get_instructions(self, f, bb):
import capa.features.extractors.ida.helpers as ida_helpers
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn):
yield feature, ea

View File

@@ -6,14 +6,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import string
import struct
import idaapi
import capa.features.extractors.ida.helpers
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.ida import helpers
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN

View File

@@ -0,0 +1,108 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import idaapi
import capa.features.extractors.ida.file
import capa.features.extractors.ida.insn
import capa.features.extractors.ida.function
import capa.features.extractors.ida.basicblock
from capa.features.extractors.base_extractor import FeatureExtractor
class FunctionHandle:
"""this acts like an idaapi.func_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class BasicBlockHandle:
"""this acts like an idaapi.BasicBlock but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class InstructionHandle:
"""this acts like an idaapi.insn_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.ea
def __getattr__(self, name):
return getattr(self._inner, name)
class IdaFeatureExtractor(FeatureExtractor):
def __init__(self):
super(IdaFeatureExtractor, self).__init__()
def get_base_address(self):
return idaapi.get_imagebase()
def extract_file_features(self):
for (feature, ea) in capa.features.extractors.ida.file.extract_features():
yield feature, ea
def get_functions(self):
import capa.features.extractors.ida.helpers as ida_helpers
# data structure shared across functions yielded here.
# useful for caching analysis relevant across a single workspace.
ctx = {}
# ignore library functions and thunk functions as identified by IDA
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
setattr(f, "ctx", ctx)
yield FunctionHandle(f)
@staticmethod
def get_function(ea):
f = idaapi.get_func(ea)
setattr(f, "ctx", {})
return FunctionHandle(f)
def extract_function_features(self, f):
for (feature, ea) in capa.features.extractors.ida.function.extract_features(f):
yield feature, ea
def get_basic_blocks(self, f):
import capa.features.extractors.ida.helpers as ida_helpers
for bb in ida_helpers.get_function_blocks(f):
yield BasicBlockHandle(bb)
def extract_basic_block_features(self, f, bb):
for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb):
yield feature, ea
def get_instructions(self, f, bb):
import capa.features.extractors.ida.helpers as ida_helpers
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn):
yield feature, ea

View File

@@ -10,7 +10,7 @@ import idaapi
import idautils
import capa.features.extractors.ida.helpers
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.extractors import loops

View File

@@ -6,9 +6,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import string
import idc
import idaapi
import idautils

View File

@@ -12,7 +12,8 @@ import idautils
import capa.features.extractors.helpers
import capa.features.extractors.ida.helpers
from capa.features import (
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.common import (
ARCH_X32,
ARCH_X64,
MAX_BYTES_FEATURE_SIZE,
@@ -21,7 +22,6 @@ from capa.features import (
String,
Characteristic,
)
from capa.features.insn import API, Number, Offset, Mnemonic
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features

View File

@@ -12,11 +12,10 @@ import pefile
import capa.features.extractors
import capa.features.extractors.helpers
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section
from capa.features.extractors import FeatureExtractor
from capa.features.common import String, Characteristic
from capa.features.extractors.base_extractor import FeatureExtractor
__all__ = ["file"]
logger = logging.getLogger(__name__)

View File

@@ -1,50 +0,0 @@
import sys
import types
from smda.common.SmdaReport import SmdaReport
from smda.common.SmdaInstruction import SmdaInstruction
import capa.features.extractors.smda.file
import capa.features.extractors.smda.insn
import capa.features.extractors.smda.function
import capa.features.extractors.smda.basicblock
from capa.main import UnsupportedRuntimeError
from capa.features.extractors import FeatureExtractor
class SmdaFeatureExtractor(FeatureExtractor):
def __init__(self, smda_report: SmdaReport, path):
super(SmdaFeatureExtractor, self).__init__()
self.smda_report = smda_report
self.path = path
def get_base_address(self):
return self.smda_report.base_addr
def extract_file_features(self):
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path):
yield feature, va
def get_functions(self):
for function in self.smda_report.getFunctions():
yield function
def extract_function_features(self, f):
for feature, va in capa.features.extractors.smda.function.extract_features(f):
yield feature, va
def get_basic_blocks(self, f):
for bb in f.getBlocks():
yield bb
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb):
yield feature, va
def get_instructions(self, f, bb):
for smda_ins in bb.getInstructions():
yield smda_ins
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn):
yield feature, va

View File

@@ -1,8 +1,7 @@
import sys
import string
import struct
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN

View File

@@ -0,0 +1,45 @@
from smda.common.SmdaReport import SmdaReport
import capa.features.extractors.smda.file
import capa.features.extractors.smda.insn
import capa.features.extractors.smda.function
import capa.features.extractors.smda.basicblock
from capa.features.extractors.base_extractor import FeatureExtractor
class SmdaFeatureExtractor(FeatureExtractor):
def __init__(self, smda_report: SmdaReport, path):
super(SmdaFeatureExtractor, self).__init__()
self.smda_report = smda_report
self.path = path
def get_base_address(self):
return self.smda_report.base_addr
def extract_file_features(self):
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path):
yield feature, va
def get_functions(self):
for function in self.smda_report.getFunctions():
yield function
def extract_function_features(self, f):
for feature, va in capa.features.extractors.smda.function.extract_features(f):
yield feature, va
def get_basic_blocks(self, f):
for bb in f.getBlocks():
yield bb
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb):
yield feature, va
def get_instructions(self, f, bb):
for smda_ins in bb.getInstructions():
yield smda_ins
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn):
yield feature, va

View File

@@ -1,12 +1,10 @@
import struct
# if we have SMDA we definitely have lief
import lief
import capa.features.extractors.helpers
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section
from capa.features.common import String, Characteristic
def extract_file_embedded_pe(smda_report, file_path):

View File

@@ -1,4 +1,4 @@
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.extractors import loops

View File

@@ -5,7 +5,8 @@ import struct
from smda.common.SmdaReport import SmdaReport
import capa.features.extractors.helpers
from capa.features import (
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.common import (
ARCH_X32,
ARCH_X64,
MAX_BYTES_FEATURE_SIZE,
@@ -14,7 +15,6 @@ from capa.features import (
String,
Characteristic,
)
from capa.features.insn import API, Number, Offset, Mnemonic
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
@@ -97,7 +97,7 @@ def read_bytes(smda_report, va, num_bytes=None):
rva = va - smda_report.base_addr
if smda_report.buffer is None:
return
raise ValueError("buffer is empty")
buffer_end = len(smda_report.buffer)
max_bytes = num_bytes if num_bytes is not None else MAX_BYTES_FEATURE_SIZE
if rva + max_bytes > buffer_end:

View File

@@ -1,78 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import viv_utils
import viv_utils.flirt
import capa.features.extractors
import capa.features.extractors.viv.file
import capa.features.extractors.viv.insn
import capa.features.extractors.viv.function
import capa.features.extractors.viv.basicblock
from capa.features.extractors import FeatureExtractor
__all__ = ["file", "function", "basicblock", "insn"]
logger = logging.getLogger(__name__)
class InstructionHandle:
"""this acts like a vivisect.Opcode but with an __int__() method"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.va
def __getattr__(self, name):
return getattr(self._inner, name)
class VivisectFeatureExtractor(FeatureExtractor):
def __init__(self, vw, path):
super(VivisectFeatureExtractor, self).__init__()
self.vw = vw
self.path = path
def get_base_address(self):
# assume there is only one file loaded into the vw
return list(self.vw.filemeta.values())[0]["imagebase"]
def extract_file_features(self):
for feature, va in capa.features.extractors.viv.file.extract_features(self.vw, self.path):
yield feature, va
def get_functions(self):
for va in sorted(self.vw.getFunctions()):
yield viv_utils.Function(self.vw, va)
def extract_function_features(self, f):
for feature, va in capa.features.extractors.viv.function.extract_features(f):
yield feature, va
def get_basic_blocks(self, f):
return f.basic_blocks
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb):
yield feature, va
def get_instructions(self, f, bb):
for insn in bb.instructions:
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn):
yield feature, va
def is_library_function(self, va):
return viv_utils.flirt.is_library_function(self.vw, va)
def get_function_name(self, va):
return viv_utils.get_function_name(self.vw, va)

View File

@@ -10,9 +10,9 @@ import string
import struct
import envi
import vivisect.const
import envi.archs.i386.disasm
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
@@ -37,7 +37,7 @@ def _bb_has_tight_loop(f, bb):
"""
if len(bb.instructions) > 0:
for bva, bflags in bb.instructions[-1].getBranches():
if bflags & vivisect.envi.BR_COND:
if bflags & envi.BR_COND:
if bva == bb.va:
return True
@@ -117,11 +117,15 @@ def get_printable_len(oper):
chars = struct.pack("<I", oper.imm)
elif oper.tsize == 8:
chars = struct.pack("<Q", oper.imm)
else:
raise ValueError("unexpected oper.tsize: %d" % (oper.tsize))
if is_printable_ascii(chars):
return oper.tsize
if is_printable_utf16le(chars):
elif is_printable_utf16le(chars):
return oper.tsize / 2
return 0
else:
return 0
def is_printable_ascii(chars):

View File

@@ -0,0 +1,77 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import viv_utils
import viv_utils.flirt
import capa.features.extractors
import capa.features.extractors.viv.file
import capa.features.extractors.viv.insn
import capa.features.extractors.viv.function
import capa.features.extractors.viv.basicblock
from capa.features.extractors.base_extractor import FeatureExtractor
logger = logging.getLogger(__name__)
class InstructionHandle:
"""this acts like a vivisect.Opcode but with an __int__() method"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.va
def __getattr__(self, name):
return getattr(self._inner, name)
class VivisectFeatureExtractor(FeatureExtractor):
def __init__(self, vw, path):
super(VivisectFeatureExtractor, self).__init__()
self.vw = vw
self.path = path
def get_base_address(self):
# assume there is only one file loaded into the vw
return list(self.vw.filemeta.values())[0]["imagebase"]
def extract_file_features(self):
for feature, va in capa.features.extractors.viv.file.extract_features(self.vw, self.path):
yield feature, va
def get_functions(self):
for va in sorted(self.vw.getFunctions()):
yield viv_utils.Function(self.vw, va)
def extract_function_features(self, f):
for feature, va in capa.features.extractors.viv.function.extract_features(f):
yield feature, va
def get_basic_blocks(self, f):
return f.basic_blocks
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb):
yield feature, va
def get_instructions(self, f, bb):
for insn in bb.instructions:
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn):
yield feature, va
def is_library_function(self, va):
return viv_utils.flirt.is_library_function(self.vw, va)
def get_function_name(self, va):
return viv_utils.get_function_name(self.vw, va)

View File

@@ -13,8 +13,8 @@ import viv_utils.flirt
import capa.features.insn
import capa.features.extractors.helpers
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import String, Characteristic
def extract_file_embedded_pe(vw, file_path):

View File

@@ -6,9 +6,10 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import envi
import vivisect.const
from capa.features import Characteristic
from capa.features.common import Characteristic
from capa.features.extractors import loops
@@ -41,9 +42,9 @@ def extract_function_loop(f):
for bva, bflags in bb.instructions[-1].getBranches():
# vivisect does not set branch flags for non-conditional jmp so add explicit check
if (
bflags & vivisect.envi.BR_COND
or bflags & vivisect.envi.BR_FALL
or bflags & vivisect.envi.BR_TABLE
bflags & envi.BR_COND
or bflags & envi.BR_FALL
or bflags & envi.BR_TABLE
or bb.instructions[-1].mnem == "jmp"
):
edges.append((bb.va, bva))

View File

@@ -5,15 +5,21 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import envi
import envi.exc
import viv_utils
import envi.memory
import viv_utils.flirt
import envi.archs.i386.regs
import envi.archs.amd64.regs
import envi.archs.i386.disasm
import envi.archs.amd64.disasm
import capa.features.extractors.viv
import capa.features.extractors.helpers
import capa.features.extractors.viv.helpers
from capa.features import (
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.common import (
ARCH_X32,
ARCH_X64,
MAX_BYTES_FEATURE_SIZE,
@@ -22,7 +28,6 @@ from capa.features import (
String,
Characteristic,
)
from capa.features.insn import API, Number, Offset, Mnemonic
from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
@@ -177,7 +182,7 @@ def extract_insn_number_features(f, bb, insn):
# assume its not also a constant.
continue
if insn.mnem == "add" and insn.opers[0].isReg() and insn.opers[0].reg == envi.archs.i386.disasm.REG_ESP:
if insn.mnem == "add" and insn.opers[0].isReg() and insn.opers[0].reg == envi.archs.i386.regs.REG_ESP:
# skip things like:
#
# .text:00401140 call sub_407E2B
@@ -233,7 +238,7 @@ def read_memory(vw, va, size):
mva, msize, mperms, mfname = mmap
offset = va - mva
return mbytes[offset : offset + size]
raise envi.SegmentationViolation(va)
raise envi.exc.SegmentationViolation(va)
def read_bytes(vw, va):
@@ -245,7 +250,7 @@ def read_bytes(vw, va):
"""
segm = vw.getSegment(va)
if not segm:
raise envi.SegmentationViolation(va)
raise envi.exc.SegmentationViolation(va)
segm_end = segm[0] + segm[1]
try:
@@ -254,7 +259,7 @@ def read_bytes(vw, va):
return read_memory(vw, va, segm_end - va)
else:
return read_memory(vw, va, MAX_BYTES_FEATURE_SIZE)
except envi.SegmentationViolation:
except envi.exc.SegmentationViolation:
raise
@@ -286,7 +291,7 @@ def extract_insn_bytes_features(f, bb, insn):
for v in derefs(f.vw, v):
try:
buf = read_bytes(f.vw, v)
except envi.SegmentationViolation:
except envi.exc.SegmentationViolation:
continue
if capa.features.extractors.helpers.all_zeros(buf):
@@ -298,7 +303,7 @@ def extract_insn_bytes_features(f, bb, insn):
def read_string(vw, offset):
try:
alen = vw.detectString(offset)
except envi.SegmentationViolation:
except envi.exc.SegmentationViolation:
pass
else:
if alen > 0:
@@ -306,7 +311,7 @@ def read_string(vw, offset):
try:
ulen = vw.detectUnicode(offset)
except envi.SegmentationViolation:
except envi.exc.SegmentationViolation:
pass
except IndexError:
# potential vivisect bug detecting Unicode at segment end
@@ -367,14 +372,14 @@ def extract_insn_offset_features(f, bb, insn):
# reg ^
# disp
if isinstance(oper, envi.archs.i386.disasm.i386RegMemOper):
if oper.reg == envi.archs.i386.disasm.REG_ESP:
if oper.reg == envi.archs.i386.regs.REG_ESP:
continue
if oper.reg == envi.archs.i386.disasm.REG_EBP:
if oper.reg == envi.archs.i386.regs.REG_EBP:
continue
# TODO: do x64 support for real.
if oper.reg == envi.archs.amd64.disasm.REG_RBP:
if oper.reg == envi.archs.amd64.regs.REG_RBP:
continue
# viv already decodes offsets as signed
@@ -402,11 +407,11 @@ def is_security_cookie(f, bb, insn):
# security cookie check should use SP or BP
oper = insn.opers[1]
if oper.isReg() and oper.reg not in [
envi.archs.i386.disasm.REG_ESP,
envi.archs.i386.disasm.REG_EBP,
envi.archs.i386.regs.REG_ESP,
envi.archs.i386.regs.REG_EBP,
# TODO: do x64 support for real.
envi.archs.amd64.disasm.REG_RBP,
envi.archs.amd64.disasm.REG_RSP,
envi.archs.amd64.regs.REG_RBP,
envi.archs.amd64.regs.REG_RSP,
]:
return False

View File

@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from capa.features import Feature
from capa.features.common import Feature
class Export(Feature):

View File

@@ -53,11 +53,11 @@ import json
import zlib
import logging
import capa.features
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
import capa.features.extractors
import capa.features.extractors.base_extractor
from capa.helpers import hex
logger = logging.getLogger(__name__)
@@ -67,7 +67,7 @@ def serialize_feature(feature):
return feature.freeze_serialize()
KNOWN_FEATURES = {F.__name__: F for F in capa.features.Feature.__subclasses__()}
KNOWN_FEATURES = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()}
def deserialize_feature(doc):
@@ -80,7 +80,7 @@ def dumps(extractor):
serialize the given extractor to a string
args:
extractor: capa.features.extractor.FeatureExtractor:
extractor: capa.features.extractors.base_extractor.FeatureExtractor:
returns:
str: the serialized features.
@@ -217,7 +217,7 @@ def loads(s):
feature = deserialize_feature(feature[:2])
features["functions"][loc[0]]["basic blocks"][loc[1]]["instructions"][loc[2]]["features"].append((va, feature))
return capa.features.extractors.NullFeatureExtractor(features)
return capa.features.extractors.base_extractor.NullFeatureExtractor(features)
MAGIC = "capa0000".encode("ascii")

View File

@@ -7,7 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License.
import capa.render.utils
from capa.features import Feature
from capa.features.common import Feature
class API(Feature):

View File

@@ -14,6 +14,8 @@ import idaapi
import idautils
import capa
import capa.version
import capa.features.common
logger = logging.getLogger("capa")
@@ -86,7 +88,7 @@ def get_file_md5():
""" """
md5 = idautils.GetInputFileMD5()
if not isinstance(md5, str):
md5 = capa.features.bytes_to_str(md5)
md5 = capa.features.common.bytes_to_str(md5)
return md5
@@ -94,7 +96,7 @@ def get_file_sha256():
""" """
sha256 = idaapi.retrieve_input_file_sha256()
if not isinstance(sha256, str):
sha256 = capa.features.bytes_to_str(sha256)
sha256 = capa.features.common.bytes_to_str(sha256)
return sha256

View File

@@ -20,9 +20,12 @@ from PyQt5 import QtGui, QtCore, QtWidgets
import capa.main
import capa.rules
import capa.engine
import capa.ida.helpers
import capa.render.utils as rutils
import capa.features.extractors.ida
import capa.render.json
import capa.features.common
import capa.render.result_document
import capa.features.extractors.ida.extractor
from capa.ida.plugin.icon import QICON
from capa.ida.plugin.view import (
CapaExplorerQtreeView,
@@ -97,7 +100,7 @@ def find_func_matches(f, ruleset, func_features, bb_features):
for (name, res) in matches.items():
bb_matches[name].extend(res)
for (ea, _) in res:
func_features[capa.features.MatchedRule(name)].add(ea)
func_features[capa.features.common.MatchedRule(name)].add(ea)
# find rule matches for function, function features include rule matches for basic blocks
_, matches = capa.engine.match(ruleset.function_rules, func_features, int(f))
@@ -155,7 +158,7 @@ class CapaExplorerProgressIndicator(QtCore.QObject):
self.progress.emit("extracting features from %s" % text)
class CapaExplorerFeatureExtractor(capa.features.extractors.ida.IdaFeatureExtractor):
class CapaExplorerFeatureExtractor(capa.features.extractors.ida.extractor.IdaFeatureExtractor):
"""subclass the IdaFeatureExtractor
track progress during feature extraction, also allow user to cancel feature extraction
@@ -770,7 +773,9 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("rendering results")
try:
self.doc = capa.render.convert_capabilities_to_result_document(meta, self.ruleset_cache, capabilities)
self.doc = capa.render.result_document.convert_capabilities_to_result_document(
meta, self.ruleset_cache, capabilities
)
except Exception as e:
logger.error("Failed to render results (error: %s)", e)
return False
@@ -865,7 +870,7 @@ class CapaExplorerForm(idaapi.PluginForm):
if rule.meta.get("capa/subscope-rule"):
continue
for (ea, _) in res:
func_features[capa.features.MatchedRule(name)].add(ea)
func_features[capa.features.common.MatchedRule(name)].add(ea)
except Exception as e:
logger.error("Failed to match function/basic block rule scope (error: %s)" % e)
return False
@@ -899,7 +904,7 @@ class CapaExplorerForm(idaapi.PluginForm):
if rule.meta.get("capa/subscope-rule"):
continue
for (ea, _) in res:
file_features[capa.features.MatchedRule(name)].add(ea)
file_features[capa.features.common.MatchedRule(name)].add(ea)
except Exception as e:
logger.error("Failed to match file scope rules (error: %s)" % e)
return False
@@ -1123,7 +1128,7 @@ class CapaExplorerForm(idaapi.PluginForm):
idaapi.info("No program analysis to save.")
return
s = json.dumps(self.doc, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder).encode("utf-8")
s = json.dumps(self.doc, sort_keys=True, cls=capa.render.json.CapaJsonObjectEncoder).encode("utf-8")
path = idaapi.ask_file(True, "*.json", "Choose file to save capa program analysis JSON")
if not path:

View File

@@ -6,7 +6,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import codecs
import idc

View File

@@ -13,9 +13,9 @@ import idaapi
from PyQt5 import QtGui, QtCore
import capa.rules
import capa.features
import capa.ida.helpers
import capa.render.utils as rutils
import capa.features.common
from capa.ida.plugin.item import (
CapaExplorerDataItem,
CapaExplorerRuleItem,
@@ -494,7 +494,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
value = feature[feature["type"]]
if value:
if key == "string":
value = '"%s"' % capa.features.escape_string(value)
value = '"%s"' % capa.features.common.escape_string(value)
if feature.get("description", ""):
return "%s(%s = %s)" % (key, value, feature["description"])
else:
@@ -560,7 +560,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
for s, locations in feature["matches"].items():
if location in locations:
return CapaExplorerStringViewItem(
parent, display, location, '"' + capa.features.escape_string(s) + '"'
parent, display, location, '"' + capa.features.common.escape_string(s) + '"'
)
# programming error: the given location should always be found in the regex matches
@@ -590,7 +590,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
if feature["type"] in ("string",):
# display string preview
return CapaExplorerStringViewItem(
parent, display, location, '"%s"' % capa.features.escape_string(feature[feature["type"]])
parent, display, location, '"%s"' % capa.features.common.escape_string(feature[feature["type"]])
)
if feature["type"] in ("import", "export", "function-name"):

View File

@@ -14,6 +14,7 @@ from PyQt5 import QtGui, QtCore, QtWidgets
import capa.rules
import capa.engine
import capa.ida.helpers
import capa.features.common
import capa.features.basicblock
from capa.ida.plugin.item import CapaExplorerFunctionItem
from capa.ida.plugin.model import CapaExplorerDataModel
@@ -624,8 +625,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget):
# single features
for (k, v) in filter(lambda t: t[1] == 1, counted):
if isinstance(k, (capa.features.String,)):
value = '"%s"' % capa.features.escape_string(k.get_value_str())
if isinstance(k, (capa.features.common.String,)):
value = '"%s"' % capa.features.common.escape_string(k.get_value_str())
else:
value = k.get_value_str()
self.new_feature_node(self.root, ("- %s: %s" % (k.name.lower(), value), ""))
@@ -633,8 +634,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget):
# n > 1 features
for (k, v) in filter(lambda t: t[1] > 1, counted):
if k.value:
if isinstance(k, (capa.features.String,)):
value = '"%s"' % capa.features.escape_string(k.get_value_str())
if isinstance(k, (capa.features.common.String,)):
value = '"%s"' % capa.features.common.escape_string(k.get_value_str())
else:
value = k.get_value_str()
display = "- count(%s(%s)): %d" % (k.name.lower(), value, v)
@@ -898,8 +899,8 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
""" """
name = feature.name.lower()
value = feature.get_value_str()
if isinstance(feature, (capa.features.String,)):
value = '"%s"' % capa.features.escape_string(value)
if isinstance(feature, (capa.features.common.String,)):
value = '"%s"' % capa.features.common.escape_string(value)
return "%s(%s)" % (name, value)
for (feature, eas) in sorted(features.items(), key=lambda k: sorted(k[1])):

View File

@@ -28,10 +28,13 @@ import colorama
import capa.rules
import capa.engine
import capa.render
import capa.version
import capa.features
import capa.render.json
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.pefile
from capa.helpers import get_file_taste
@@ -94,7 +97,7 @@ def find_function_capabilities(ruleset, extractor, f):
for rule_name, res in matches.items():
bb_matches[rule_name].extend(res)
for va, _ in res:
function_features[capa.features.MatchedRule(rule_name)].add(va)
function_features[capa.features.common.MatchedRule(rule_name)].add(va)
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f))
return function_matches, bb_matches, len(function_features)
@@ -169,7 +172,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
# mapping from feature (matched rule) to set of addresses at which it matched.
# schema: Dict[MatchedRule: Set[int]
function_and_lower_features = {
capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results))
capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results))
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items())
}
@@ -407,7 +410,7 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False):
from smda.SmdaConfig import SmdaConfig
from smda.Disassembler import Disassembler
import capa.features.extractors.smda
import capa.features.extractors.smda.extractor
smda_report = None
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
@@ -416,9 +419,9 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False):
smda_disasm = Disassembler(config)
smda_report = smda_disasm.disassembleFile(path)
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
else:
import capa.features.extractors.viv
import capa.features.extractors.viv.extractor
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
if format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_32):
@@ -433,7 +436,7 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False):
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
def is_nursery_rule_path(path):
@@ -835,13 +838,13 @@ def main(argv=None):
return -1
if args.json:
print(capa.render.render_json(meta, rules, capabilities))
print(capa.render.json.render(meta, rules, capabilities))
elif args.vverbose:
print(capa.render.render_vverbose(meta, rules, capabilities))
print(capa.render.vverbose.render(meta, rules, capabilities))
elif args.verbose:
print(capa.render.render_verbose(meta, rules, capabilities))
print(capa.render.verbose.render(meta, rules, capabilities))
else:
print(capa.render.render_default(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities))
colorama.deinit()
logger.debug("done.")
@@ -850,8 +853,10 @@ def main(argv=None):
def ida_main():
import capa.rules
import capa.ida.helpers
import capa.features.extractors.ida
import capa.render.default
import capa.features.extractors.ida.extractor
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
@@ -883,14 +888,14 @@ def ida_main():
meta = capa.ida.helpers.collect_metadata()
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor())
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta["analysis"].update(counts)
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
colorama.init(strip=True)
print(capa.render.render_default(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities))
def is_runtime_ida():

View File

@@ -1,363 +0,0 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
import capa.rules
import capa.engine
import capa.render.utils
def convert_statement_to_result_document(statement):
"""
"statement": {
"type": "or"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"""
statement_type = statement.name.lower()
result = {"type": statement_type}
if statement.description:
result["description"] = statement.description
if statement_type == "some" and statement.count == 0:
result["type"] = "optional"
elif statement_type == "some":
result["count"] = statement.count
elif statement_type == "range":
result["min"] = statement.min
result["max"] = statement.max
result["child"] = convert_feature_to_result_document(statement.child)
elif statement_type == "subscope":
result["subscope"] = statement.scope
return result
def convert_feature_to_result_document(feature):
"""
"feature": {
"number": 6,
"type": "number"
},
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"""
result = {"type": feature.name, feature.name: feature.get_value_str()}
if feature.description:
result["description"] = feature.description
if feature.name == "regex":
result["matches"] = feature.matches
return result
def convert_node_to_result_document(node):
"""
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"""
if isinstance(node, capa.engine.Statement):
return {
"type": "statement",
"statement": convert_statement_to_result_document(node),
}
elif isinstance(node, capa.features.Feature):
return {
"type": "feature",
"feature": convert_feature_to_result_document(node),
}
else:
raise RuntimeError("unexpected match node type")
def convert_match_to_result_document(rules, capabilities, result):
"""
convert the given Result instance into a common, Python-native data structure.
this will become part of the "result document" format that can be emitted to JSON.
"""
doc = {
"success": bool(result.success),
"node": convert_node_to_result_document(result.statement),
"children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children],
}
# logic expression, like `and`, don't have locations - their children do.
# so only add `locations` to feature nodes.
if isinstance(result.statement, capa.features.Feature):
if bool(result.success):
doc["locations"] = result.locations
elif isinstance(result.statement, capa.rules.Range):
if bool(result.success):
doc["locations"] = result.locations
# if we have a `match` statement, then we're referencing another rule or namespace.
# this could an external rule (written by a human), or
# rule generated to support a subscope (basic block, etc.)
# we still want to include the matching logic in this tree.
#
# so, we need to lookup the other rule results
# and then filter those down to the address used here.
# finally, splice that logic into this tree.
if (
doc["node"]["type"] == "feature"
and doc["node"]["feature"]["type"] == "match"
# only add subtree on success,
# because there won't be results for the other rule on failure.
and doc["success"]
):
name = doc["node"]["feature"]["match"]
if name in rules:
# this is a rule that we're matching
#
# pull matches from the referenced rule into our tree here.
rule_name = doc["node"]["feature"]["match"]
rule = rules[rule_name]
rule_matches = {address: result for (address, result) in capabilities[rule_name]}
if rule.meta.get("capa/subscope-rule"):
# for a subscope rule, fixup the node to be a scope node, rather than a match feature node.
#
# e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block`
scope = rule.meta["scope"]
doc["node"] = {
"type": "statement",
"statement": {
"type": "subscope",
"subscope": scope,
},
}
for location in doc["locations"]:
doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location]))
else:
# this is a namespace that we're matching
#
# check for all rules in the namespace,
# seeing if they matched.
# if so, pull their matches into our match tree here.
ns_name = doc["node"]["feature"]["match"]
ns_rules = rules.rules_by_namespace[ns_name]
for rule in ns_rules:
if rule.name in capabilities:
# the rule matched, so splice results into our tree here.
#
# note, there's a shortcoming in our result document schema here:
# we lose the name of the rule that matched in a namespace.
# for example, if we have a statement: `match: runtime/dotnet`
# and we get matches, we can say the following:
#
# match: runtime/dotnet @ 0x0
# or:
# import: mscoree._CorExeMain @ 0x402000
#
# however, we lose the fact that it was rule
# "compiled to the .NET platform"
# that contained this logic and did the match.
#
# we could introduce an intermediate node here.
# this would be a breaking change and require updates to the renderers.
# in the meantime, the above might be sufficient.
rule_matches = {address: result for (address, result) in capabilities[rule.name]}
for location in doc["locations"]:
doc["children"].append(
convert_match_to_result_document(rules, capabilities, rule_matches[location])
)
return doc
def convert_meta_to_result_document(meta):
attacks = meta.get("att&ck", [])
meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks]
mbcs = meta.get("mbc", [])
meta["mbc"] = [parse_canonical_mbc(mbc) for mbc in mbcs]
return meta
def parse_canonical_attack(attack):
"""
parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]`
"""
tactic = ""
technique = ""
subtechnique = ""
parts, id = capa.render.utils.parse_parts_id(attack)
if len(parts) > 0:
tactic = parts[0]
if len(parts) > 1:
technique = parts[1]
if len(parts) > 2:
subtechnique = parts[2]
return {
"parts": parts,
"id": id,
"tactic": tactic,
"technique": technique,
"subtechnique": subtechnique,
}
def parse_canonical_mbc(mbc):
"""
parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]`
"""
objective = ""
behavior = ""
method = ""
parts, id = capa.render.utils.parse_parts_id(mbc)
if len(parts) > 0:
objective = parts[0]
if len(parts) > 1:
behavior = parts[1]
if len(parts) > 2:
method = parts[2]
return {
"parts": parts,
"id": id,
"objective": objective,
"behavior": behavior,
"method": method,
}
def convert_capabilities_to_result_document(meta, rules, capabilities):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `render_*` routines
to render as text.
see examples of substructures in above routines.
schema:
```json
{
"meta": {...},
"rules: {
$rule-name: {
"meta": {...copied from rule.meta...},
"matches: {
$address: {...match details...},
...
}
},
...
}
}
```
Args:
meta (Dict[str, Any]):
rules (RuleSet):
capabilities (Dict[str, List[Tuple[int, Result]]]):
"""
doc = {
"meta": meta,
"rules": {},
}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("capa/subscope-rule"):
continue
rule_meta = convert_meta_to_result_document(rule.meta)
doc["rules"][rule_name] = {
"meta": rule_meta,
"source": rule.definition,
"matches": {
addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches
},
}
return doc
def render_vverbose(meta, rules, capabilities):
# there's an import loop here
# if capa.render imports capa.render.vverbose
# and capa.render.vverbose import capa.render (implicitly, as a submodule)
# so, defer the import until routine is called, breaking the import loop.
import capa.render.vverbose
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.vverbose.render_vverbose(doc)
def render_verbose(meta, rules, capabilities):
# break import loop
import capa.render.verbose
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.verbose.render_verbose(doc)
def render_default(meta, rules, capabilities):
# break import loop
import capa.render.default
import capa.render.verbose
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
return capa.render.default.render_default(doc)
class CapaJsonObjectEncoder(json.JSONEncoder):
"""JSON encoder that emits Python sets as sorted lists"""
def default(self, obj):
if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, str):
return json.JSONEncoder.default(self, obj)
elif isinstance(obj, set):
return list(sorted(obj))
else:
# probably will TypeError
return json.JSONEncoder.default(self, obj)
def render_json(meta, rules, capabilities):
return json.dumps(
convert_capabilities_to_result_document(meta, rules, capabilities),
cls=CapaJsonObjectEncoder,
sort_keys=True,
)

View File

@@ -11,6 +11,7 @@ import collections
import tabulate
import capa.render.utils as rutils
import capa.render.result_document
tabulate.PRESERVE_WHITESPACE = True
@@ -219,3 +220,8 @@ def render_default(doc):
render_capabilities(doc, ostream)
return ostream.getvalue()
def render(meta, rules, capabilities):
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_default(doc)

31
capa/render/json.py Normal file
View File

@@ -0,0 +1,31 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
import capa.render.result_document
class CapaJsonObjectEncoder(json.JSONEncoder):
"""JSON encoder that emits Python sets as sorted lists"""
def default(self, obj):
if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, str):
return json.JSONEncoder.default(self, obj)
elif isinstance(obj, set):
return list(sorted(obj))
else:
# probably will TypeError
return json.JSONEncoder.default(self, obj)
def render(meta, rules, capabilities):
return json.dumps(
capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities),
cls=CapaJsonObjectEncoder,
sort_keys=True,
)

View File

@@ -0,0 +1,312 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import capa.rules
import capa.engine
import capa.render.utils
import capa.features.common
def convert_statement_to_result_document(statement):
"""
"statement": {
"type": "or"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"""
statement_type = statement.name.lower()
result = {"type": statement_type}
if statement.description:
result["description"] = statement.description
if statement_type == "some" and statement.count == 0:
result["type"] = "optional"
elif statement_type == "some":
result["count"] = statement.count
elif statement_type == "range":
result["min"] = statement.min
result["max"] = statement.max
result["child"] = convert_feature_to_result_document(statement.child)
elif statement_type == "subscope":
result["subscope"] = statement.scope
return result
def convert_feature_to_result_document(feature):
"""
"feature": {
"number": 6,
"type": "number"
},
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"""
result = {"type": feature.name, feature.name: feature.get_value_str()}
if feature.description:
result["description"] = feature.description
if feature.name == "regex":
result["matches"] = feature.matches
return result
def convert_node_to_result_document(node):
"""
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"""
if isinstance(node, capa.engine.Statement):
return {
"type": "statement",
"statement": convert_statement_to_result_document(node),
}
elif isinstance(node, capa.features.common.Feature):
return {
"type": "feature",
"feature": convert_feature_to_result_document(node),
}
else:
raise RuntimeError("unexpected match node type")
def convert_match_to_result_document(rules, capabilities, result):
"""
convert the given Result instance into a common, Python-native data structure.
this will become part of the "result document" format that can be emitted to JSON.
"""
doc = {
"success": bool(result.success),
"node": convert_node_to_result_document(result.statement),
"children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children],
}
# logic expression, like `and`, don't have locations - their children do.
# so only add `locations` to feature nodes.
if isinstance(result.statement, capa.features.common.Feature):
if bool(result.success):
doc["locations"] = result.locations
elif isinstance(result.statement, capa.engine.Range):
if bool(result.success):
doc["locations"] = result.locations
# if we have a `match` statement, then we're referencing another rule or namespace.
# this could an external rule (written by a human), or
# rule generated to support a subscope (basic block, etc.)
# we still want to include the matching logic in this tree.
#
# so, we need to lookup the other rule results
# and then filter those down to the address used here.
# finally, splice that logic into this tree.
if (
doc["node"]["type"] == "feature"
and doc["node"]["feature"]["type"] == "match"
# only add subtree on success,
# because there won't be results for the other rule on failure.
and doc["success"]
):
name = doc["node"]["feature"]["match"]
if name in rules:
# this is a rule that we're matching
#
# pull matches from the referenced rule into our tree here.
rule_name = doc["node"]["feature"]["match"]
rule = rules[rule_name]
rule_matches = {address: result for (address, result) in capabilities[rule_name]}
if rule.meta.get("capa/subscope-rule"):
# for a subscope rule, fixup the node to be a scope node, rather than a match feature node.
#
# e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block`
scope = rule.meta["scope"]
doc["node"] = {
"type": "statement",
"statement": {
"type": "subscope",
"subscope": scope,
},
}
for location in doc["locations"]:
doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location]))
else:
# this is a namespace that we're matching
#
# check for all rules in the namespace,
# seeing if they matched.
# if so, pull their matches into our match tree here.
ns_name = doc["node"]["feature"]["match"]
ns_rules = rules.rules_by_namespace[ns_name]
for rule in ns_rules:
if rule.name in capabilities:
# the rule matched, so splice results into our tree here.
#
# note, there's a shortcoming in our result document schema here:
# we lose the name of the rule that matched in a namespace.
# for example, if we have a statement: `match: runtime/dotnet`
# and we get matches, we can say the following:
#
# match: runtime/dotnet @ 0x0
# or:
# import: mscoree._CorExeMain @ 0x402000
#
# however, we lose the fact that it was rule
# "compiled to the .NET platform"
# that contained this logic and did the match.
#
# we could introduce an intermediate node here.
# this would be a breaking change and require updates to the renderers.
# in the meantime, the above might be sufficient.
rule_matches = {address: result for (address, result) in capabilities[rule.name]}
for location in doc["locations"]:
doc["children"].append(
convert_match_to_result_document(rules, capabilities, rule_matches[location])
)
return doc
def convert_meta_to_result_document(meta):
attacks = meta.get("att&ck", [])
meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks]
mbcs = meta.get("mbc", [])
meta["mbc"] = [parse_canonical_mbc(mbc) for mbc in mbcs]
return meta
def parse_canonical_attack(attack):
"""
parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]`
"""
tactic = ""
technique = ""
subtechnique = ""
parts, id = capa.render.utils.parse_parts_id(attack)
if len(parts) > 0:
tactic = parts[0]
if len(parts) > 1:
technique = parts[1]
if len(parts) > 2:
subtechnique = parts[2]
return {
"parts": parts,
"id": id,
"tactic": tactic,
"technique": technique,
"subtechnique": subtechnique,
}
def parse_canonical_mbc(mbc):
"""
parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]`
"""
objective = ""
behavior = ""
method = ""
parts, id = capa.render.utils.parse_parts_id(mbc)
if len(parts) > 0:
objective = parts[0]
if len(parts) > 1:
behavior = parts[1]
if len(parts) > 2:
method = parts[2]
return {
"parts": parts,
"id": id,
"objective": objective,
"behavior": behavior,
"method": method,
}
def convert_capabilities_to_result_document(meta, rules, capabilities):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `capa.render.*.render()` routines
to render as text.
see examples of substructures in above routines.
schema:
```json
{
"meta": {...},
"rules: {
$rule-name: {
"meta": {...copied from rule.meta...},
"matches: {
$address: {...match details...},
...
}
},
...
}
}
```
Args:
meta (Dict[str, Any]):
rules (RuleSet):
capabilities (Dict[str, List[Tuple[int, Result]]]):
"""
doc = {
"meta": meta,
"rules": {},
}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("capa/subscope-rule"):
continue
rule_meta = convert_meta_to_result_document(rule.meta)
doc["rules"][rule_name] = {
"meta": rule_meta,
"source": rule.definition,
"matches": {
addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches
},
}
return doc

View File

@@ -26,6 +26,7 @@ import tabulate
import capa.rules
import capa.render.utils as rutils
import capa.render.result_document
def render_meta(ostream, doc):
@@ -120,3 +121,8 @@ def render_verbose(doc):
ostream.write("\n")
return ostream.getvalue()
def render(meta, rules, capabilities):
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_verbose(doc)

View File

@@ -9,9 +9,10 @@
import tabulate
import capa.rules
import capa.features
import capa.render.utils as rutils
import capa.render.verbose
import capa.features.common
import capa.render.result_document
def render_locations(ostream, match):
@@ -56,7 +57,7 @@ def render_statement(ostream, match, statement, indent=0):
if child[child["type"]]:
if child["type"] == "string":
value = '"%s"' % capa.features.escape_string(child[child["type"]])
value = '"%s"' % capa.features.common.escape_string(child[child["type"]])
else:
value = child[child["type"]]
value = rutils.bold2(value)
@@ -85,7 +86,7 @@ def render_statement(ostream, match, statement, indent=0):
def render_string_value(s):
return '"%s"' % capa.features.escape_string(s)
return '"%s"' % capa.features.common.escape_string(s)
def render_feature(ostream, match, feature, indent=0):
@@ -261,3 +262,8 @@ def render_vverbose(doc):
ostream.write("\n")
return ostream.getvalue()
def render(meta, rules, capabilities):
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_vverbose(doc)

View File

@@ -12,6 +12,7 @@ import codecs
import logging
import binascii
import functools
import collections
try:
from functools import lru_cache
@@ -25,12 +26,13 @@ import ruamel.yaml
import capa.rules
import capa.engine
import capa.engine as ceng
import capa.features
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
from capa.engine import *
from capa.features import MAX_BYTES_FEATURE_SIZE
from capa.features.common import MAX_BYTES_FEATURE_SIZE
logger = logging.getLogger(__name__)
@@ -67,38 +69,38 @@ BASIC_BLOCK_SCOPE = "basic block"
SUPPORTED_FEATURES = {
FILE_SCOPE: {
capa.features.MatchedRule,
capa.features.common.MatchedRule,
capa.features.file.Export,
capa.features.file.Import,
capa.features.file.Section,
capa.features.file.FunctionName,
capa.features.Characteristic("embedded pe"),
capa.features.String,
capa.features.common.Characteristic("embedded pe"),
capa.features.common.String,
},
FUNCTION_SCOPE: {
# plus basic block scope features, see below
capa.features.basicblock.BasicBlock,
capa.features.Characteristic("calls from"),
capa.features.Characteristic("calls to"),
capa.features.Characteristic("loop"),
capa.features.Characteristic("recursive call"),
capa.features.common.Characteristic("calls from"),
capa.features.common.Characteristic("calls to"),
capa.features.common.Characteristic("loop"),
capa.features.common.Characteristic("recursive call"),
},
BASIC_BLOCK_SCOPE: {
capa.features.MatchedRule,
capa.features.common.MatchedRule,
capa.features.insn.API,
capa.features.insn.Number,
capa.features.String,
capa.features.Bytes,
capa.features.common.String,
capa.features.common.Bytes,
capa.features.insn.Offset,
capa.features.insn.Mnemonic,
capa.features.Characteristic("nzxor"),
capa.features.Characteristic("peb access"),
capa.features.Characteristic("fs access"),
capa.features.Characteristic("gs access"),
capa.features.Characteristic("cross section flow"),
capa.features.Characteristic("tight loop"),
capa.features.Characteristic("stack string"),
capa.features.Characteristic("indirect call"),
capa.features.common.Characteristic("nzxor"),
capa.features.common.Characteristic("peb access"),
capa.features.common.Characteristic("fs access"),
capa.features.common.Characteristic("gs access"),
capa.features.common.Characteristic("cross section flow"),
capa.features.common.Characteristic("tight loop"),
capa.features.common.Characteristic("stack string"),
capa.features.common.Characteristic("indirect call"),
},
}
@@ -142,8 +144,8 @@ class InvalidRuleSet(ValueError):
def ensure_feature_valid_for_scope(scope, feature):
if isinstance(feature, capa.features.Characteristic):
if capa.features.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]:
if isinstance(feature, capa.features.common.Characteristic):
if capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]:
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
elif not isinstance(feature, tuple(filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope]))):
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
@@ -199,9 +201,9 @@ def parse_feature(key):
if key == "api":
return capa.features.insn.API
elif key == "string":
return capa.features.StringFactory
return capa.features.common.StringFactory
elif key == "bytes":
return capa.features.Bytes
return capa.features.common.Bytes
elif key == "number":
return capa.features.insn.Number
elif key.startswith("number/"):
@@ -223,7 +225,7 @@ def parse_feature(key):
elif key == "basic blocks":
return capa.features.basicblock.BasicBlock
elif key == "characteristic":
return capa.features.Characteristic
return capa.features.common.Characteristic
elif key == "export":
return capa.features.file.Export
elif key == "import":
@@ -231,7 +233,7 @@ def parse_feature(key):
elif key == "section":
return capa.features.file.Section
elif key == "match":
return capa.features.MatchedRule
return capa.features.common.MatchedRule
elif key == "function-name":
return capa.features.file.FunctionName
else:
@@ -264,7 +266,7 @@ def parse_description(s, value_type, description=None):
if isinstance(value, str):
if value_type == "bytes":
try:
value = codecs.decode(value.replace(" ", ""), "hex")
value = codecs.decode(value.replace(" ", "").encode("ascii"), "hex")
except binascii.Error:
raise InvalidRule('unexpected bytes value: "%s", must be a valid hex sequence' % value)
@@ -323,21 +325,21 @@ def build_statements(d, scope):
key = list(d.keys())[0]
description = pop_statement_description_entry(d[key])
if key == "and":
return And([build_statements(dd, scope) for dd in d[key]], description=description)
return ceng.And([build_statements(dd, scope) for dd in d[key]], description=description)
elif key == "or":
return Or([build_statements(dd, scope) for dd in d[key]], description=description)
return ceng.Or([build_statements(dd, scope) for dd in d[key]], description=description)
elif key == "not":
if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement")
return Not(build_statements(d[key][0], scope), description=description)
return ceng.Not(build_statements(d[key][0], scope), description=description)
elif key.endswith(" or more"):
count = int(key[: -len("or more")])
return Some(count, [build_statements(dd, scope) for dd in d[key]], description=description)
return ceng.Some(count, [build_statements(dd, scope) for dd in d[key]], description=description)
elif key == "optional":
# `optional` is an alias for `0 or more`
# which is useful for documenting behaviors,
# like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`.
return Some(0, [build_statements(dd, scope) for dd in d[key]], description=description)
return ceng.Some(0, [build_statements(dd, scope) for dd in d[key]], description=description)
elif key == "function":
if scope != FILE_SCOPE:
@@ -346,7 +348,7 @@ def build_statements(d, scope):
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE))
return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE))
elif key == "basic block":
if scope != FUNCTION_SCOPE:
@@ -355,7 +357,7 @@ def build_statements(d, scope):
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE))
return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE))
elif key.startswith("count(") and key.endswith(")"):
# e.g.:
@@ -396,18 +398,18 @@ def build_statements(d, scope):
count = d[key]
if isinstance(count, int):
return Range(feature, min=count, max=count, description=description)
return ceng.Range(feature, min=count, max=count, description=description)
elif count.endswith(" or more"):
min = parse_int(count[: -len(" or more")])
max = None
return Range(feature, min=min, max=max, description=description)
return ceng.Range(feature, min=min, max=max, description=description)
elif count.endswith(" or fewer"):
min = None
max = parse_int(count[: -len(" or fewer")])
return Range(feature, min=min, max=max, description=description)
return ceng.Range(feature, min=min, max=max, description=description)
elif count.startswith("("):
min, max = parse_range(count)
return Range(feature, min=min, max=max, description=description)
return ceng.Range(feature, min=min, max=max, description=description)
else:
raise InvalidRule("unexpected range: %s" % (count))
elif key == "string" and not isinstance(d[key], str):
@@ -462,7 +464,7 @@ class Rule(object):
deps = set([])
def rec(statement):
if isinstance(statement, capa.features.MatchedRule):
if isinstance(statement, capa.features.common.MatchedRule):
# we're not sure at this point if the `statement.value` is
# really a rule name or a namespace name (we use `MatchedRule` for both cases).
# we'll give precedence to namespaces, and then assume if that does work,
@@ -478,7 +480,7 @@ class Rule(object):
# not a namespace, assume its a rule name.
deps.add(statement.value)
elif isinstance(statement, Statement):
elif isinstance(statement, ceng.Statement):
for child in statement.get_children():
rec(child)
@@ -489,7 +491,7 @@ class Rule(object):
return deps
def _extract_subscope_rules_rec(self, statement):
if isinstance(statement, Statement):
if isinstance(statement, ceng.Statement):
# for each child that is a subscope,
for subscope in filter(
lambda statement: isinstance(statement, capa.engine.Subscope), statement.get_children()
@@ -518,7 +520,7 @@ class Rule(object):
)
# update the existing statement to `match` the new rule
new_node = capa.features.MatchedRule(name)
new_node = capa.features.common.MatchedRule(name)
statement.replace_child(subscope, new_node)
# and yield the new rule to our caller
@@ -854,6 +856,37 @@ def index_rules_by_namespace(rules):
return dict(namespaces)
def topologically_order_rules(rules):
"""
order the given rules such that dependencies show up before dependents.
this means that as we match rules, we can add features for the matches, and these
will be matched by subsequent rules if they follow this order.
assumes that the rule dependency graph is a DAG.
"""
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
seen = set([])
ret = []
def rec(rule):
if rule.name in seen:
return
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
ret.append(rule)
seen.add(rule.name)
for rule in rules.values():
rec(rule)
return ret
class RuleSet(object):
"""
a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes.
@@ -916,7 +949,7 @@ class RuleSet(object):
continue
scope_rules.update(get_rules_and_dependencies(rules, rule.name))
return get_rules_with_scope(capa.engine.topologically_order_rules(scope_rules), scope)
return get_rules_with_scope(topologically_order_rules(scope_rules), scope)
@staticmethod
def _extract_subscope_rules(rules):

View File

@@ -55,6 +55,7 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import json
import logging
@@ -66,7 +67,7 @@ import multiprocessing.pool
import capa
import capa.main
import capa.rules
import capa.render
import capa.render.json
logger = logging.getLogger("capa")
@@ -205,7 +206,7 @@ def main(argv=None):
capabilities = result["ok"]["capabilities"]
# our renderer expects to emit a json document for a single sample
# so we deserialize the json document, store it in a larger dict, and we'll subsequently re-encode.
results[result["path"]] = json.loads(capa.render.render_json(meta, rules, capabilities))
results[result["path"]] = json.loads(capa.render.json.render(meta, rules, capabilities))
else:
raise ValueError("unexpected status: %s" % (result["status"]))

View File

@@ -31,7 +31,6 @@ See the License for the specific language governing permissions and limitations
"""
import re
import pdb
import sys
import string
import logging
@@ -44,8 +43,7 @@ import capa.rules
import capa.engine
import capa.features
import capa.features.insn
from capa.features import ARCH_X32, ARCH_X64, String
from capa.features.insn import Number, Offset
from capa.features.common import ARCH_X32, ARCH_X64, String
logger = logging.getLogger("capa2yara")
@@ -348,9 +346,9 @@ def convert_rule(rule, rulename, cround, depth):
kid = rule.child
kids = [kid]
num_kids = 1
logger.info("kid: " + kids)
logger.info("kid: %s", kids)
except:
logger.info("no kid in rule: " + rule.name)
logger.info("no kid in rule: %s", rule.name)
# just a single statement without 'and' or 'or' before it in this rule
if "kids" not in locals().keys():
@@ -366,7 +364,7 @@ def convert_rule(rule, rulename, cround, depth):
else:
x = 0
logger.info("doing kids: " + repr(kids) + " - len: " + str(num_kids))
logger.info("doing kids: %r - len: %s", kids, num_kids)
for kid in kids:
s_type = kid.name
logger.info("doing type: " + s_type + " kidnum: " + str(x))

View File

@@ -6,11 +6,12 @@ import collections
import capa.main
import capa.rules
import capa.engine
import capa.render
import capa.features
import capa.render.json
import capa.render.utils as rutils
import capa.render.default
import capa.render.result_document
from capa.engine import *
from capa.render import convert_capabilities_to_result_document
# edit this to set the path for file to analyze and rule directory
RULES_PATH = "/tmp/capa/rules/"
@@ -202,14 +203,14 @@ def capa_details(file_path, output_format="dictionary"):
capa_output = False
if output_format == "dictionary":
# ...as python dictionary, simplified as textable but in dictionary
doc = convert_capabilities_to_result_document(meta, rules, capabilities)
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
capa_output = render_dictionary(doc)
elif output_format == "json":
# render results
# ...as json
capa_output = json.loads(capa.render.render_json(meta, rules, capabilities))
capa_output = json.loads(capa.render.json.render(meta, rules, capabilities))
elif output_format == "texttable":
# ...as human readable text table
capa_output = capa.render.render_default(meta, rules, capabilities)
capa_output = capa.render.default.render(meta, rules, capabilities)
return capa_output

View File

@@ -31,8 +31,8 @@ import ruamel.yaml
import capa.main
import capa.rules
import capa.engine
import capa.features
import capa.features.insn
import capa.features.common
logger = logging.getLogger("lint")
@@ -326,7 +326,7 @@ class FeatureStringTooShort(Lint):
def check_features(self, ctx, features):
for feature in features:
if isinstance(feature, capa.features.String):
if isinstance(feature, capa.features.common.String):
if len(feature.value) < 4:
self.recommendation = self.recommendation.format(feature.value)
return True

View File

@@ -59,10 +59,10 @@ import colorama
import capa.main
import capa.rules
import capa.engine
import capa.render
import capa.features
import capa.render.utils as rutils
import capa.features.freeze
import capa.render.result_document
from capa.helpers import get_file_taste
logger = logging.getLogger("capa.show-capabilities-by-function")
@@ -191,7 +191,7 @@ def main(argv=None):
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
print(render_matches_by_function(doc))
colorama.deinit()

View File

@@ -153,10 +153,14 @@ def main(argv=None):
def ida_main():
import idc
import capa.features.extractors.ida.extractor
function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
print("getting features for current function 0x%X" % function)
extractor = capa.features.extractors.ida.IdaFeatureExtractor()
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
if not function:
for feature, va in extractor.extract_file_features():

View File

@@ -19,8 +19,9 @@ import pytest
import capa.main
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
from capa.features import ARCH_X32, ARCH_X64
from capa.features.common import ARCH_X32, ARCH_X64
CD = os.path.dirname(__file__)
@@ -66,7 +67,8 @@ def xfail(condition, reason=None):
# need to limit cache size so GitHub Actions doesn't run out of memory, see #545
@lru_cache(maxsize=1)
def get_viv_extractor(path):
import capa.features.extractors.viv
import capa.main
import capa.features.extractors.viv.extractor
sigpaths = [
os.path.join(CD, "..", "sigs", "test_aulldiv.pat"),
@@ -82,7 +84,7 @@ def get_viv_extractor(path):
vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths)
else:
vw = capa.main.get_workspace(path, "auto", sigpaths=sigpaths)
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
fixup_viv(path, extractor)
return extractor
@@ -101,14 +103,14 @@ def get_smda_extractor(path):
from smda.SmdaConfig import SmdaConfig
from smda.Disassembler import Disassembler
import capa.features.extractors.smda
import capa.features.extractors.smda.extractor
config = SmdaConfig()
config.STORE_BUFFER = True
disasm = Disassembler(config)
report = disasm.disassembleFile(path)
return capa.features.extractors.smda.SmdaFeatureExtractor(report, path)
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(report, path)
@lru_cache(maxsize=1)
@@ -268,34 +270,34 @@ def get_basic_block(extractor, f, va):
def resolve_scope(scope):
if scope == "file":
def inner(extractor):
def inner_file(extractor):
return extract_file_features(extractor)
inner.__name__ = scope
return inner
inner_file.__name__ = scope
return inner_file
elif "bb=" in scope:
# like `function=0x401000,bb=0x40100A`
fspec, _, bbspec = scope.partition(",")
fva = int(fspec.partition("=")[2], 0x10)
bbva = int(bbspec.partition("=")[2], 0x10)
def inner(extractor):
def inner_bb(extractor):
f = get_function(extractor, fva)
bb = get_basic_block(extractor, f, bbva)
return extract_basic_block_features(extractor, f, bb)
inner.__name__ = scope
return inner
inner_bb.__name__ = scope
return inner_bb
elif scope.startswith("function"):
# like `function=0x401000`
va = int(scope.partition("=")[2], 0x10)
def inner(extractor):
def inner_function(extractor):
f = get_function(extractor, va)
return extract_function_features(extractor, f)
inner.__name__ = scope
return inner
inner_function.__name__ = scope
return inner_function
else:
raise ValueError("unexpected scope fixture")
@@ -324,12 +326,12 @@ def parametrize(params, values, **kwargs):
FEATURE_PRESENCE_TESTS = sorted(
[
# file/characteristic("embedded pe")
("pma12-04", "file", capa.features.Characteristic("embedded pe"), True),
("pma12-04", "file", capa.features.common.Characteristic("embedded pe"), True),
# file/string
("mimikatz", "file", capa.features.String("SCardControl"), True),
("mimikatz", "file", capa.features.String("SCardTransmit"), True),
("mimikatz", "file", capa.features.String("ACR > "), True),
("mimikatz", "file", capa.features.String("nope"), False),
("mimikatz", "file", capa.features.common.String("SCardControl"), True),
("mimikatz", "file", capa.features.common.String("SCardTransmit"), True),
("mimikatz", "file", capa.features.common.String("ACR > "), True),
("mimikatz", "file", capa.features.common.String("nope"), False),
# file/sections
("mimikatz", "file", capa.features.file.Section(".text"), True),
("mimikatz", "file", capa.features.file.Section(".nope"), False),
@@ -353,17 +355,17 @@ FEATURE_PRESENCE_TESTS = sorted(
("mimikatz", "file", capa.features.file.Import("CryptAcquireContextW"), True),
("mimikatz", "file", capa.features.file.Import("CryptAcquireContext"), True),
# function/characteristic(loop)
("mimikatz", "function=0x401517", capa.features.Characteristic("loop"), True),
("mimikatz", "function=0x401000", capa.features.Characteristic("loop"), False),
("mimikatz", "function=0x401517", capa.features.common.Characteristic("loop"), True),
("mimikatz", "function=0x401000", capa.features.common.Characteristic("loop"), False),
# bb/characteristic(tight loop)
("mimikatz", "function=0x402EC4", capa.features.Characteristic("tight loop"), True),
("mimikatz", "function=0x401000", capa.features.Characteristic("tight loop"), False),
("mimikatz", "function=0x402EC4", capa.features.common.Characteristic("tight loop"), True),
("mimikatz", "function=0x401000", capa.features.common.Characteristic("tight loop"), False),
# bb/characteristic(stack string)
("mimikatz", "function=0x4556E5", capa.features.Characteristic("stack string"), True),
("mimikatz", "function=0x401000", capa.features.Characteristic("stack string"), False),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("stack string"), True),
("mimikatz", "function=0x401000", capa.features.common.Characteristic("stack string"), False),
# bb/characteristic(tight loop)
("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.Characteristic("tight loop"), True),
("mimikatz", "function=0x401000,bb=0x401000", capa.features.Characteristic("tight loop"), False),
("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.common.Characteristic("tight loop"), True),
("mimikatz", "function=0x401000,bb=0x401000", capa.features.common.Characteristic("tight loop"), False),
# insn/mnemonic
("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("push"), True),
("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("movzx"), True),
@@ -440,60 +442,60 @@ FEATURE_PRESENCE_TESTS = sorted(
("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), True),
("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), True),
# insn/string
("mimikatz", "function=0x40105D", capa.features.String("SCardControl"), True),
("mimikatz", "function=0x40105D", capa.features.String("SCardTransmit"), True),
("mimikatz", "function=0x40105D", capa.features.String("ACR > "), True),
("mimikatz", "function=0x40105D", capa.features.String("nope"), False),
("773290...", "function=0x140001140", capa.features.String(r"%s:\\OfficePackagesForWDAG"), True),
("mimikatz", "function=0x40105D", capa.features.common.String("SCardControl"), True),
("mimikatz", "function=0x40105D", capa.features.common.String("SCardTransmit"), True),
("mimikatz", "function=0x40105D", capa.features.common.String("ACR > "), True),
("mimikatz", "function=0x40105D", capa.features.common.String("nope"), False),
("773290...", "function=0x140001140", capa.features.common.String(r"%s:\\OfficePackagesForWDAG"), True),
# insn/regex, issue #262
("pma16-01", "function=0x4021B0", capa.features.Regex("HTTP/1.0"), True),
("pma16-01", "function=0x4021B0", capa.features.Regex("www.practicalmalwareanalysis.com"), False),
("pma16-01", "function=0x4021B0", capa.features.common.Regex("HTTP/1.0"), True),
("pma16-01", "function=0x4021B0", capa.features.common.Regex("www.practicalmalwareanalysis.com"), False),
# insn/string, pointer to string
("mimikatz", "function=0x44EDEF", capa.features.String("INPUTEVENT"), True),
("mimikatz", "function=0x44EDEF", capa.features.common.String("INPUTEVENT"), True),
# insn/string, direct memory reference
("mimikatz", "function=0x46D6CE", capa.features.String("(null)"), True),
("mimikatz", "function=0x46D6CE", capa.features.common.String("(null)"), True),
# insn/bytes
("mimikatz", "function=0x40105D", capa.features.Bytes("SCardControl".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.Bytes("SCardTransmit".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.Bytes("ACR > ".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.Bytes("nope".encode("ascii")), False),
("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardControl".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardTransmit".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.common.Bytes("ACR > ".encode("utf-16le")), True),
("mimikatz", "function=0x40105D", capa.features.common.Bytes("nope".encode("ascii")), False),
# IDA features included byte sequences read from invalid memory, fixed in #409
("mimikatz", "function=0x44570F", capa.features.Bytes(binascii.unhexlify("FF" * 256)), False),
("mimikatz", "function=0x44570F", capa.features.common.Bytes(binascii.unhexlify("FF" * 256)), False),
# insn/bytes, pointer to bytes
("mimikatz", "function=0x44EDEF", capa.features.Bytes("INPUTEVENT".encode("utf-16le")), True),
("mimikatz", "function=0x44EDEF", capa.features.common.Bytes("INPUTEVENT".encode("utf-16le")), True),
# insn/characteristic(nzxor)
("mimikatz", "function=0x410DFC", capa.features.Characteristic("nzxor"), True),
("mimikatz", "function=0x40105D", capa.features.Characteristic("nzxor"), False),
("mimikatz", "function=0x410DFC", capa.features.common.Characteristic("nzxor"), True),
("mimikatz", "function=0x40105D", capa.features.common.Characteristic("nzxor"), False),
# insn/characteristic(nzxor): no security cookies
("mimikatz", "function=0x46D534", capa.features.Characteristic("nzxor"), False),
("mimikatz", "function=0x46D534", capa.features.common.Characteristic("nzxor"), False),
# insn/characteristic(nzxor): xorps
# viv needs fixup to recognize function, see above
("3b13b...", "function=0x10006860", capa.features.Characteristic("nzxor"), True),
("3b13b...", "function=0x10006860", capa.features.common.Characteristic("nzxor"), True),
# insn/characteristic(peb access)
("kernel32-64", "function=0x1800017D0", capa.features.Characteristic("peb access"), True),
("mimikatz", "function=0x4556E5", capa.features.Characteristic("peb access"), False),
("kernel32-64", "function=0x1800017D0", capa.features.common.Characteristic("peb access"), True),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("peb access"), False),
# insn/characteristic(gs access)
("kernel32-64", "function=0x180001068", capa.features.Characteristic("gs access"), True),
("mimikatz", "function=0x4556E5", capa.features.Characteristic("gs access"), False),
("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("gs access"), True),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("gs access"), False),
# insn/characteristic(cross section flow)
("a1982...", "function=0x4014D0", capa.features.Characteristic("cross section flow"), True),
("a1982...", "function=0x4014D0", capa.features.common.Characteristic("cross section flow"), True),
# insn/characteristic(cross section flow): imports don't count
("kernel32-64", "function=0x180001068", capa.features.Characteristic("cross section flow"), False),
("mimikatz", "function=0x4556E5", capa.features.Characteristic("cross section flow"), False),
("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("cross section flow"), False),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("cross section flow"), False),
# insn/characteristic(recursive call)
("mimikatz", "function=0x40640e", capa.features.Characteristic("recursive call"), True),
("mimikatz", "function=0x40640e", capa.features.common.Characteristic("recursive call"), True),
# before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386
("mimikatz", "function=0x4175FF", capa.features.Characteristic("recursive call"), False),
("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("recursive call"), False),
# insn/characteristic(indirect call)
("mimikatz", "function=0x4175FF", capa.features.Characteristic("indirect call"), True),
("mimikatz", "function=0x4556E5", capa.features.Characteristic("indirect call"), False),
("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("indirect call"), True),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("indirect call"), False),
# insn/characteristic(calls from)
("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls from"), True),
("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), False),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls from"), True),
("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), False),
# function/characteristic(calls to)
("mimikatz", "function=0x40105D", capa.features.Characteristic("calls to"), True),
("mimikatz", "function=0x40105D", capa.features.common.Characteristic("calls to"), True),
# before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386
("mimikatz", "function=0x456BB9", capa.features.Characteristic("calls to"), False),
("mimikatz", "function=0x456BB9", capa.features.common.Characteristic("calls to"), False),
# file/function-name
("pma16-01", "file", capa.features.file.FunctionName("__aulldiv"), True),
],
@@ -510,10 +512,10 @@ FEATURE_PRESENCE_TESTS_IDA = [
FEATURE_COUNT_TESTS = [
("mimikatz", "function=0x40E5C2", capa.features.basicblock.BasicBlock(), 7),
("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), 0),
("mimikatz", "function=0x40E5C2", capa.features.Characteristic("calls from"), 3),
("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls to"), 0),
("mimikatz", "function=0x40B1F1", capa.features.Characteristic("calls to"), 3),
("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0),
("mimikatz", "function=0x40E5C2", capa.features.common.Characteristic("calls from"), 3),
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls to"), 0),
("mimikatz", "function=0x40B1F1", capa.features.common.Characteristic("calls to"), 3),
]

View File

@@ -10,6 +10,8 @@ import textwrap
import capa.rules
import capa.engine
import capa.features.insn
import capa.features.common
from capa.engine import *
from capa.features import *
from capa.features.insn import *
@@ -233,7 +235,7 @@ def test_match_adds_matched_rule_feature():
)
r = capa.rules.Rule.from_yaml(rule)
features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("test rule") in features
def test_match_matched_rules():
@@ -264,22 +266,22 @@ def test_match_matched_rules():
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
capa.rules.topologically_order_rules(rules),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule1") in features
assert capa.features.MatchedRule("test rule2") in features
assert capa.features.common.MatchedRule("test rule1") in features
assert capa.features.common.MatchedRule("test rule2") in features
# the ordering of the rules must not matter,
# the engine should match rules in an appropriate order.
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(reversed(rules)),
capa.rules.topologically_order_rules(reversed(rules)),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule1") in features
assert capa.features.MatchedRule("test rule2") in features
assert capa.features.common.MatchedRule("test rule1") in features
assert capa.features.common.MatchedRule("test rule2") in features
def test_regex():
@@ -322,34 +324,34 @@ def test_regex():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
capa.rules.topologically_order_rules(rules),
{capa.features.insn.Number(100): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
assert capa.features.common.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("aaaa"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("aaaa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
assert capa.features.common.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("aBBBBa"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("aBBBBa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") not in features
assert capa.features.common.MatchedRule("test rule") not in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("abbbba"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("abbbba"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.MatchedRule("rule with implied wildcards") in features
assert capa.features.MatchedRule("rule with anchor") not in features
assert capa.features.common.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("rule with implied wildcards") in features
assert capa.features.common.MatchedRule("rule with anchor") not in features
def test_regex_ignorecase():
@@ -368,11 +370,11 @@ def test_regex_ignorecase():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("aBBBBa"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("aBBBBa"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("test rule") in features
def test_regex_complex():
@@ -391,11 +393,11 @@ def test_regex_complex():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String(r"Hardware\Key\key with spaces\some value"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String(r"Hardware\Key\key with spaces\some value"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("test rule") in features
def test_match_namespace():
@@ -449,19 +451,19 @@ def test_match_namespace():
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
capa.rules.topologically_order_rules(rules),
{capa.features.insn.API("CreateFile"): {1}},
0x0,
)
assert "CreateFile API" in matches
assert "file-create" in matches
assert "filesystem-any" in matches
assert capa.features.MatchedRule("file") in features
assert capa.features.MatchedRule("file/create") in features
assert capa.features.MatchedRule("file/create/CreateFile") in features
assert capa.features.common.MatchedRule("file") in features
assert capa.features.common.MatchedRule("file/create") in features
assert capa.features.common.MatchedRule("file/create/CreateFile") in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
capa.rules.topologically_order_rules(rules),
{capa.features.insn.API("WriteFile"): {1}},
0x0,
)
@@ -472,11 +474,11 @@ def test_match_namespace():
def test_render_number():
assert str(capa.features.insn.Number(1)) == "number(0x1)"
assert str(capa.features.insn.Number(1, arch=ARCH_X32)) == "number/x32(0x1)"
assert str(capa.features.insn.Number(1, arch=ARCH_X64)) == "number/x64(0x1)"
assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X32)) == "number/x32(0x1)"
assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X64)) == "number/x64(0x1)"
def test_render_offset():
assert str(capa.features.insn.Offset(1)) == "offset(0x1)"
assert str(capa.features.insn.Offset(1, arch=ARCH_X32)) == "offset/x32(0x1)"
assert str(capa.features.insn.Offset(1, arch=ARCH_X64)) == "offset/x64(0x1)"
assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X32)) == "offset/x32(0x1)"
assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X64)) == "offset/x64(0x1)"

View File

@@ -7,37 +7,39 @@
# See the License for the specific language governing permissions and limitations under the License.
import textwrap
import pytest
from fixtures import *
import capa.main
import capa.rules
import capa.helpers
import capa.features
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.freeze
import capa.features.extractors
import capa.features.basicblock
import capa.features.extractors.base_extractor
EXTRACTOR = capa.features.extractors.NullFeatureExtractor(
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
{
"base address": 0x401000,
"file features": [
(0x402345, capa.features.Characteristic("embedded pe")),
(0x402345, capa.features.common.Characteristic("embedded pe")),
],
"functions": {
0x401000: {
"features": [
(0x401000, capa.features.Characteristic("indirect call")),
(0x401000, capa.features.common.Characteristic("indirect call")),
],
"basic blocks": {
0x401000: {
"features": [
(0x401000, capa.features.Characteristic("tight loop")),
(0x401000, capa.features.common.Characteristic("tight loop")),
],
"instructions": {
0x401000: {
"features": [
(0x401000, capa.features.insn.Mnemonic("xor")),
(0x401000, capa.features.Characteristic("nzxor")),
(0x401000, capa.features.common.Characteristic("nzxor")),
],
},
0x401002: {
@@ -111,7 +113,7 @@ def compare_extractors_viv_null(viv_ext, null_ext):
and NullFeatureExtractor returns ints
args:
viv_ext (capa.features.extractors.viv.VivisectFeatureExtractor)
viv_ext (capa.features.extractors.viv.extractor.VivisectFeatureExtractor)
null_ext (capa.features.extractors.NullFeatureExtractor)
"""
assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features())
@@ -154,12 +156,12 @@ def roundtrip_feature(feature):
def test_serialize_features():
roundtrip_feature(capa.features.insn.API("advapi32.CryptAcquireContextW"))
roundtrip_feature(capa.features.String("SCardControl"))
roundtrip_feature(capa.features.common.String("SCardControl"))
roundtrip_feature(capa.features.insn.Number(0xFF))
roundtrip_feature(capa.features.insn.Offset(0x0))
roundtrip_feature(capa.features.insn.Mnemonic("push"))
roundtrip_feature(capa.features.file.Section(".rsrc"))
roundtrip_feature(capa.features.Characteristic("tight loop"))
roundtrip_feature(capa.features.common.Characteristic("tight loop"))
roundtrip_feature(capa.features.basicblock.BasicBlock())
roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk"))
roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process"))

View File

@@ -13,9 +13,9 @@ from capa.features.extractors import helpers
def test_all_zeros():
a = b"\x00\x00\x00\x00"
b = codecs.decode("00000000", "hex")
b = codecs.decode(b"00000000", "hex")
c = b"\x01\x00\x00\x00"
d = codecs.decode("01000000", "hex")
d = codecs.decode(b"01000000", "hex")
assert helpers.all_zeros(a) is True
assert helpers.all_zeros(b) is True
assert helpers.all_zeros(c) is False

View File

@@ -9,6 +9,7 @@ import pytest
try:
sys.path.append(os.path.dirname(__file__))
import fixtures
from fixtures import *
finally:
sys.path.pop()
@@ -37,27 +38,27 @@ def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida
import capa.features.extractors.ida.extractor
return capa.features.extractors.ida.IdaFeatureExtractor()
return capa.features.extractors.ida.extractor.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS + FEATURE_PRESENCE_TESTS_IDA:
id = make_test_id((sample, scope, feature, expected))
for (sample, scope, feature, expected) in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA:
id = fixtures.make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
check_input_file(fixtures.get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
scope = fixtures.resolve_scope(scope)
sample = fixtures.resolve_sample(sample)
try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()
@@ -67,20 +68,20 @@ def test_ida_features():
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected))
for (sample, scope, feature, expected) in fixtures.FEATURE_COUNT_TESTS:
id = fixtures.make_test_id((sample, scope, feature, expected))
try:
check_input_file(get_sample_md5_by_name(sample))
check_input_file(fixtures.get_sample_md5_by_name(sample))
except RuntimeError:
print("SKIP %s" % (id))
continue
scope = resolve_scope(scope)
sample = resolve_sample(sample)
scope = fixtures.resolve_scope(scope)
sample = fixtures.resolve_sample(sample)
try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e:
print("FAIL %s" % (id))
traceback.print_exc()

View File

@@ -9,6 +9,7 @@
import json
import textwrap
import fixtures
from fixtures import *
import capa.main
@@ -362,7 +363,7 @@ def test_not_render_rules_also_matched(z9324d_extractor, capsys):
def test_backend_option(capsys):
# tests that main works with different backends
path = get_data_path_by_name("pma16-01")
path = fixtures.get_data_path_by_name("pma16-01")
assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_VIV]) == 0
std = capsys.readouterr()
std_json = json.loads(std.out)

View File

@@ -5,9 +5,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import pytest
import fixtures
from fixtures import *
from fixtures import parametrize
@@ -16,7 +15,7 @@ import capa.features.file
@parametrize(
"sample,scope,feature,expected",
FEATURE_PRESENCE_TESTS,
fixtures.FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_pefile_features(sample, scope, feature, expected):
@@ -26,4 +25,4 @@ def test_pefile_features(sample, scope, feature, expected):
if isinstance(feature, capa.features.file.FunctionName):
pytest.xfail("pefile only doesn't extract function names")
do_test_feature_presence(get_pefile_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_presence(fixtures.get_pefile_extractor, sample, scope, feature, expected)

View File

@@ -1,8 +1,8 @@
import textwrap
import capa.rules
from capa.render import convert_meta_to_result_document
from capa.render.utils import format_parts_id
import capa.render.utils
import capa.render.result_document
def test_render_meta_attack():
@@ -27,7 +27,7 @@ def test_render_meta_attack():
)
)
r = capa.rules.Rule.from_yaml(rule)
rule_meta = convert_meta_to_result_document(r.meta)
rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta)
attack = rule_meta["att&ck"][0]
assert attack["id"] == id
@@ -35,7 +35,7 @@ def test_render_meta_attack():
assert attack["technique"] == technique
assert attack["subtechnique"] == subtechnique
assert format_parts_id(attack) == canonical
assert capa.render.utils.format_parts_id(attack) == canonical
def test_render_meta_mbc():
@@ -60,7 +60,7 @@ def test_render_meta_mbc():
)
)
r = capa.rules.Rule.from_yaml(rule)
rule_meta = convert_meta_to_result_document(r.meta)
rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta)
attack = rule_meta["mbc"][0]
assert attack["id"] == id
@@ -68,4 +68,4 @@ def test_render_meta_mbc():
assert attack["behavior"] == behavior
assert attack["method"] == method
assert format_parts_id(attack) == canonical
assert capa.render.utils.format_parts_id(attack) == canonical

View File

@@ -12,10 +12,10 @@ import pytest
import capa.rules
import capa.engine
import capa.features
from capa.features import ARCH_X32, ARCH_X64, String
import capa.features.common
from capa.features.file import FunctionName
from capa.features.insn import Number, Offset
from capa.features.common import ARCH_X32, ARCH_X64, String
def test_rule_ctor():
@@ -751,18 +751,18 @@ def test_regex_values_always_string():
),
]
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("123"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("123"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("test rule") in features
features, matches = capa.engine.match(
capa.engine.topologically_order_rules(rules),
{capa.features.String("0x123"): {1}},
capa.rules.topologically_order_rules(rules),
{capa.features.common.String("0x123"): {1}},
0x0,
)
assert capa.features.MatchedRule("test rule") in features
assert capa.features.common.MatchedRule("test rule") in features
def test_filter_rules():

View File

@@ -5,9 +5,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import pytest
import fixtures
from fixtures import *
from fixtures import parametrize
@@ -16,20 +15,20 @@ import capa.features.file
@parametrize(
"sample,scope,feature,expected",
FEATURE_PRESENCE_TESTS,
fixtures.FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_smda_features(sample, scope, feature, expected):
if scope.__name__ == "file" and isinstance(feature, capa.features.file.FunctionName) and expected is True:
pytest.xfail("SMDA has no function ID")
do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_presence(fixtures.get_smda_extractor, sample, scope, feature, expected)
@parametrize(
"sample,scope,feature,expected",
FEATURE_COUNT_TESTS,
fixtures.FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_smda_feature_counts(sample, scope, feature, expected):
do_test_feature_count(get_smda_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_count(fixtures.get_smda_extractor, sample, scope, feature, expected)

View File

@@ -5,24 +5,23 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import fixtures
from fixtures import *
@parametrize(
@fixtures.parametrize(
"sample,scope,feature,expected",
FEATURE_PRESENCE_TESTS,
fixtures.FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_viv_features(sample, scope, feature, expected):
do_test_feature_presence(get_viv_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_presence(fixtures.get_viv_extractor, sample, scope, feature, expected)
@parametrize(
@fixtures.parametrize(
"sample,scope,feature,expected",
FEATURE_COUNT_TESTS,
fixtures.FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_viv_feature_counts(sample, scope, feature, expected):
do_test_feature_count(get_viv_extractor, sample, scope, feature, expected)
fixtures.do_test_feature_count(fixtures.get_viv_extractor, sample, scope, feature, expected)