mirror of
https://github.com/mandiant/capa.git
synced 2025-12-15 00:50:47 -08:00
Compare commits
2 Commits
add-codema
...
add-cfg-in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06fb21728c | ||
|
|
f46b5a4c02 |
@@ -4,6 +4,8 @@
|
|||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
|
|
||||||
|
- meta: add function `calls` information, add `entry_points`, add `thunk_functions` #874 @mr-tz
|
||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
|
|
||||||
### New Rules (0)
|
### New Rules (0)
|
||||||
|
|||||||
@@ -7,7 +7,7 @@
|
|||||||
# See the License for the specific language governing permissions and limitations under the License.
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
from typing import Tuple, Iterator, SupportsInt
|
from typing import List, Tuple, Iterator, SupportsInt
|
||||||
|
|
||||||
from capa.features.common import Feature
|
from capa.features.common import Feature
|
||||||
|
|
||||||
@@ -59,6 +59,13 @@ class FeatureExtractor:
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_entry_points(self) -> List[int]:
|
||||||
|
"""
|
||||||
|
get the programs entry points, e.g. AddressOfEntryPoint and exported functions
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def extract_global_features(self) -> Iterator[Tuple[Feature, int]]:
|
def extract_global_features(self) -> Iterator[Tuple[Feature, int]]:
|
||||||
"""
|
"""
|
||||||
@@ -99,6 +106,12 @@ class FeatureExtractor:
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def is_thunk_function(self, va: int) -> bool:
|
||||||
|
"""
|
||||||
|
is the given address a thunk function?
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
def is_library_function(self, va: int) -> bool:
|
def is_library_function(self, va: int) -> bool:
|
||||||
"""
|
"""
|
||||||
is the given address a library function?
|
is the given address a library function?
|
||||||
@@ -134,6 +147,13 @@ class FeatureExtractor:
|
|||||||
"""
|
"""
|
||||||
raise KeyError(va)
|
raise KeyError(va)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_calls_from(self, va: int) -> List[int]:
|
||||||
|
"""
|
||||||
|
return a function's call targets
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, int]]:
|
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, int]]:
|
||||||
"""
|
"""
|
||||||
@@ -273,6 +293,9 @@ class NullFeatureExtractor(FeatureExtractor):
|
|||||||
def get_base_address(self):
|
def get_base_address(self):
|
||||||
return self.features["base address"]
|
return self.features["base address"]
|
||||||
|
|
||||||
|
def get_entry_points(self) -> List[int]:
|
||||||
|
return self.features["entry points"]
|
||||||
|
|
||||||
def extract_global_features(self):
|
def extract_global_features(self):
|
||||||
for p in self.features.get("global features", []):
|
for p in self.features.get("global features", []):
|
||||||
va, feature = p
|
va, feature = p
|
||||||
@@ -287,6 +310,19 @@ class NullFeatureExtractor(FeatureExtractor):
|
|||||||
for va in sorted(self.features["functions"].keys()):
|
for va in sorted(self.features["functions"].keys()):
|
||||||
yield va
|
yield va
|
||||||
|
|
||||||
|
def is_library_function(self, va: int) -> bool:
|
||||||
|
return va in self.features["library functions"]
|
||||||
|
|
||||||
|
def get_function_name(self, va: int) -> str:
|
||||||
|
return self.features["library functions"].get(va)
|
||||||
|
|
||||||
|
def is_thunk_function(self, va: int) -> bool:
|
||||||
|
return va in self.features["thunk functions"]
|
||||||
|
|
||||||
|
def get_calls_from(self, va: int) -> List[int]:
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|
||||||
def extract_function_features(self, f):
|
def extract_function_features(self, f):
|
||||||
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
|
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
|
||||||
va, feature = p
|
va, feature = p
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and limitations under the License.
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
import idaapi
|
import idaapi
|
||||||
|
import idautils
|
||||||
|
|
||||||
import capa.ida.helpers
|
import capa.ida.helpers
|
||||||
import capa.features.extractors.elf
|
import capa.features.extractors.elf
|
||||||
@@ -66,6 +67,10 @@ class IdaFeatureExtractor(FeatureExtractor):
|
|||||||
def get_base_address(self):
|
def get_base_address(self):
|
||||||
return idaapi.get_imagebase()
|
return idaapi.get_imagebase()
|
||||||
|
|
||||||
|
def get_entry_points(self):
|
||||||
|
# returns list of tuples (index, ordinal, ea, name)
|
||||||
|
return [e[2] for e in idautils.Entries()]
|
||||||
|
|
||||||
def extract_global_features(self):
|
def extract_global_features(self):
|
||||||
yield from self.global_features
|
yield from self.global_features
|
||||||
|
|
||||||
@@ -102,6 +107,10 @@ class IdaFeatureExtractor(FeatureExtractor):
|
|||||||
def extract_basic_block_features(self, f, bb):
|
def extract_basic_block_features(self, f, bb):
|
||||||
yield from capa.features.extractors.ida.basicblock.extract_features(f, bb)
|
yield from capa.features.extractors.ida.basicblock.extract_features(f, bb)
|
||||||
|
|
||||||
|
def is_thunk_function(self, va):
|
||||||
|
f = idaapi.get_func(va)
|
||||||
|
return f.flags & idaapi.FUNC_THUNK
|
||||||
|
|
||||||
def get_instructions(self, f, bb):
|
def get_instructions(self, f, bb):
|
||||||
import capa.features.extractors.ida.helpers as ida_helpers
|
import capa.features.extractors.ida.helpers as ida_helpers
|
||||||
|
|
||||||
@@ -110,3 +119,7 @@ class IdaFeatureExtractor(FeatureExtractor):
|
|||||||
|
|
||||||
def extract_insn_features(self, f, bb, insn):
|
def extract_insn_features(self, f, bb, insn):
|
||||||
yield from capa.features.extractors.ida.insn.extract_features(f, bb, insn)
|
yield from capa.features.extractors.ida.insn.extract_features(f, bb, insn)
|
||||||
|
|
||||||
|
def get_calls_from(self, va):
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|||||||
@@ -25,6 +25,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
|
|||||||
def get_base_address(self):
|
def get_base_address(self):
|
||||||
return self.smda_report.base_addr
|
return self.smda_report.base_addr
|
||||||
|
|
||||||
|
def get_entry_points(self):
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|
||||||
def extract_global_features(self):
|
def extract_global_features(self):
|
||||||
yield from self.global_features
|
yield from self.global_features
|
||||||
|
|
||||||
@@ -35,6 +39,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
|
|||||||
for function in self.smda_report.getFunctions():
|
for function in self.smda_report.getFunctions():
|
||||||
yield function
|
yield function
|
||||||
|
|
||||||
|
def get_calls_from(self, va):
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|
||||||
def extract_function_features(self, f):
|
def extract_function_features(self, f):
|
||||||
yield from capa.features.extractors.smda.function.extract_features(f)
|
yield from capa.features.extractors.smda.function.extract_features(f)
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,9 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
|||||||
# assume there is only one file loaded into the vw
|
# assume there is only one file loaded into the vw
|
||||||
return list(self.vw.filemeta.values())[0]["imagebase"]
|
return list(self.vw.filemeta.values())[0]["imagebase"]
|
||||||
|
|
||||||
|
def get_entry_points(self):
|
||||||
|
return self.vw.getEntryPoints()
|
||||||
|
|
||||||
def extract_global_features(self):
|
def extract_global_features(self):
|
||||||
yield from self.global_features
|
yield from self.global_features
|
||||||
|
|
||||||
@@ -80,5 +83,12 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
|||||||
def is_library_function(self, va):
|
def is_library_function(self, va):
|
||||||
return viv_utils.flirt.is_library_function(self.vw, va)
|
return viv_utils.flirt.is_library_function(self.vw, va)
|
||||||
|
|
||||||
|
def is_thunk_function(self, va):
|
||||||
|
return self.vw.isFunctionThunk(va)
|
||||||
|
|
||||||
def get_function_name(self, va):
|
def get_function_name(self, va):
|
||||||
return viv_utils.get_function_name(self.vw, va)
|
return viv_utils.get_function_name(self.vw, va)
|
||||||
|
|
||||||
|
def get_calls_from(self, va):
|
||||||
|
# TODO compare vs. getXrefsFrom, e.g. on threads?
|
||||||
|
return self.vw.cfctx.getCallsFrom(va)
|
||||||
|
|||||||
@@ -4,8 +4,14 @@ capa freeze file format: `| capa0000 | + zlib(utf-8(json(...)))`
|
|||||||
json format:
|
json format:
|
||||||
|
|
||||||
{
|
{
|
||||||
'version': 1,
|
'version': 2,
|
||||||
'base address': int(base address),
|
'base address': int(base address),
|
||||||
|
'library functions': {
|
||||||
|
int(function va): str(function name)
|
||||||
|
},
|
||||||
|
'thunk functions': {
|
||||||
|
int(function va): str(function name)
|
||||||
|
},
|
||||||
'functions': {
|
'functions': {
|
||||||
int(function va): {
|
int(function va): {
|
||||||
int(basic block va): [int(instruction va), ...]
|
int(basic block va): [int(instruction va), ...]
|
||||||
@@ -59,6 +65,8 @@ import capa.features.basicblock
|
|||||||
import capa.features.extractors.base_extractor
|
import capa.features.extractors.base_extractor
|
||||||
from capa.helpers import hex
|
from capa.helpers import hex
|
||||||
|
|
||||||
|
FREEZE_FORMAT_VERSION = 2
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -85,8 +93,11 @@ def dumps(extractor):
|
|||||||
str: the serialized features.
|
str: the serialized features.
|
||||||
"""
|
"""
|
||||||
ret = {
|
ret = {
|
||||||
"version": 1,
|
"version": FREEZE_FORMAT_VERSION,
|
||||||
"base address": extractor.get_base_address(),
|
"base address": extractor.get_base_address(),
|
||||||
|
"entry points": extractor.get_entry_points(),
|
||||||
|
"library functions": {},
|
||||||
|
"thunk functions": {},
|
||||||
"functions": {},
|
"functions": {},
|
||||||
"scopes": {
|
"scopes": {
|
||||||
"global": [],
|
"global": [],
|
||||||
@@ -105,6 +116,12 @@ def dumps(extractor):
|
|||||||
for f in extractor.get_functions():
|
for f in extractor.get_functions():
|
||||||
ret["functions"][hex(f)] = {}
|
ret["functions"][hex(f)] = {}
|
||||||
|
|
||||||
|
if extractor.is_library_function(int(f)):
|
||||||
|
ret["library functions"][hex(f)] = extractor.get_function_name(int(f))
|
||||||
|
|
||||||
|
if extractor.is_thunk_function(int(f)):
|
||||||
|
ret["thunk functions"][hex(f)] = extractor.get_function_name(int(f))
|
||||||
|
|
||||||
for feature, va in extractor.extract_function_features(f):
|
for feature, va in extractor.extract_function_features(f):
|
||||||
ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),)))
|
ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),)))
|
||||||
|
|
||||||
@@ -147,11 +164,14 @@ def loads(s):
|
|||||||
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
|
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
|
||||||
doc = json.loads(s)
|
doc = json.loads(s)
|
||||||
|
|
||||||
if doc.get("version") != 1:
|
if doc.get("version") != FREEZE_FORMAT_VERSION:
|
||||||
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
|
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
|
||||||
|
|
||||||
features = {
|
features = {
|
||||||
"base address": doc.get("base address"),
|
"base address": doc.get("base address"),
|
||||||
|
"entry points": doc.get("entry points"),
|
||||||
|
"library functions": {int(k, 0x10): v for k, v in doc.get("library functions", {}).items()},
|
||||||
|
"thunk functions": {int(k, 0x10): v for k, v in doc.get("thunk functions", {}).items()},
|
||||||
"global features": [],
|
"global features": [],
|
||||||
"file features": [],
|
"file features": [],
|
||||||
"functions": {},
|
"functions": {},
|
||||||
|
|||||||
19
capa/main.py
19
capa/main.py
@@ -157,6 +157,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
|||||||
"functions": {},
|
"functions": {},
|
||||||
},
|
},
|
||||||
"library_functions": {},
|
"library_functions": {},
|
||||||
|
"thunk_functions": {},
|
||||||
} # type: Dict[str, Any]
|
} # type: Dict[str, Any]
|
||||||
|
|
||||||
pbar = tqdm.tqdm
|
pbar = tqdm.tqdm
|
||||||
@@ -181,6 +182,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
|||||||
if isinstance(pb, tqdm.tqdm):
|
if isinstance(pb, tqdm.tqdm):
|
||||||
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
|
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
|
||||||
continue
|
continue
|
||||||
|
elif extractor.is_thunk_function(function_address):
|
||||||
|
function_name = extractor.get_function_name(function_address)
|
||||||
|
logger.debug("skipping thunk function 0x%x (%s)", function_address, function_name)
|
||||||
|
meta["thunk_functions"][function_address] = function_name
|
||||||
|
continue
|
||||||
|
|
||||||
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
||||||
meta["feature_counts"]["functions"][function_address] = feature_count
|
meta["feature_counts"]["functions"][function_address] = feature_count
|
||||||
@@ -603,11 +609,12 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
|
|||||||
"extractor": extractor.__class__.__name__,
|
"extractor": extractor.__class__.__name__,
|
||||||
"rules": rules_path,
|
"rules": rules_path,
|
||||||
"base_address": extractor.get_base_address(),
|
"base_address": extractor.get_base_address(),
|
||||||
|
"entry_points": extractor.get_entry_points(),
|
||||||
"layout": {
|
"layout": {
|
||||||
# this is updated after capabilities have been collected.
|
# this is updated after capabilities have been collected.
|
||||||
# will look like:
|
# will look like:
|
||||||
#
|
#
|
||||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
# "functions": { 0x401000: { ... }
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -624,15 +631,22 @@ def compute_layout(rules, extractor, capabilities):
|
|||||||
"""
|
"""
|
||||||
functions_by_bb = {}
|
functions_by_bb = {}
|
||||||
bbs_by_function = {}
|
bbs_by_function = {}
|
||||||
|
calls_by_function = collections.defaultdict(list)
|
||||||
for f in extractor.get_functions():
|
for f in extractor.get_functions():
|
||||||
bbs_by_function[int(f)] = []
|
bbs_by_function[int(f)] = []
|
||||||
for bb in extractor.get_basic_blocks(f):
|
for bb in extractor.get_basic_blocks(f):
|
||||||
functions_by_bb[int(bb)] = int(f)
|
functions_by_bb[int(bb)] = int(f)
|
||||||
bbs_by_function[int(f)].append(int(bb))
|
bbs_by_function[int(f)].append(int(bb))
|
||||||
|
calls_by_function[int(f)] = extractor.get_calls_from(int(f))
|
||||||
|
|
||||||
matched_bbs = set()
|
matched_bbs = set()
|
||||||
for rule_name, matches in capabilities.items():
|
for rule_name, matches in capabilities.items():
|
||||||
rule = rules[rule_name]
|
rule = rules[rule_name]
|
||||||
|
|
||||||
|
if rule.meta.get("capa/subscope-rule"):
|
||||||
|
# not included in result document
|
||||||
|
continue
|
||||||
|
|
||||||
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
|
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
|
||||||
for (addr, match) in matches:
|
for (addr, match) in matches:
|
||||||
assert addr in functions_by_bb
|
assert addr in functions_by_bb
|
||||||
@@ -641,7 +655,8 @@ def compute_layout(rules, extractor, capabilities):
|
|||||||
layout = {
|
layout = {
|
||||||
"functions": {
|
"functions": {
|
||||||
f: {
|
f: {
|
||||||
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
|
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs],
|
||||||
|
"calls": calls_by_function.get(f, []),
|
||||||
# this object is open to extension in the future,
|
# this object is open to extension in the future,
|
||||||
# such as with the function name, etc.
|
# such as with the function name, etc.
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ import capa.features.extractors.base_extractor
|
|||||||
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
|
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
|
||||||
{
|
{
|
||||||
"base address": 0x401000,
|
"base address": 0x401000,
|
||||||
|
"entry points": [0x401000],
|
||||||
|
"library functions": {},
|
||||||
|
"thunk functions": {},
|
||||||
"file features": [
|
"file features": [
|
||||||
(0x402345, capa.features.common.Characteristic("embedded pe")),
|
(0x402345, capa.features.common.Characteristic("embedded pe")),
|
||||||
],
|
],
|
||||||
|
|||||||
Reference in New Issue
Block a user