mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Merge pull request #1683 from Aayush-Goel-04/Aayush-Goel-04/Issue#331
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
### New Features
|
||||
- ELF: implement file import and export name extractor #1607 #1608 @Aayush-Goel-04
|
||||
- bump pydantic from 1.10.9 to 2.1.1 #1582 @Aayush-Goel-04
|
||||
- develop script to highlight the features that are not used during matching #331 @Aayush-Goel-04
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
@@ -738,6 +738,33 @@ class Rule:
|
||||
|
||||
yield from self._extract_subscope_rules_rec(self.statement)
|
||||
|
||||
def _extract_all_features_rec(self, statement) -> Set[Feature]:
|
||||
feature_set: Set[Feature] = set()
|
||||
|
||||
for child in statement.get_children():
|
||||
if isinstance(child, Statement):
|
||||
feature_set.update(self._extract_all_features_rec(child))
|
||||
else:
|
||||
feature_set.add(child)
|
||||
return feature_set
|
||||
|
||||
def extract_all_features(self) -> Set[Feature]:
|
||||
"""
|
||||
recursively extracts all feature statements in this rule.
|
||||
|
||||
returns:
|
||||
set: A set of all feature statements contained within this rule.
|
||||
"""
|
||||
if not isinstance(self.statement, ceng.Statement):
|
||||
# For rules with single feature like
|
||||
# anti-analysis\obfuscation\obfuscated-with-advobfuscator.yml
|
||||
# contains a single feature - substring , which is of type String
|
||||
return {
|
||||
self.statement,
|
||||
}
|
||||
|
||||
return self._extract_all_features_rec(self.statement)
|
||||
|
||||
def evaluate(self, features: FeatureSet, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.rule"] += 1
|
||||
|
||||
@@ -8,38 +8,17 @@
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Set
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine as ceng
|
||||
from capa.features.common import Feature
|
||||
|
||||
logger = logging.getLogger("detect_duplicate_features")
|
||||
|
||||
|
||||
def get_child_features(feature: ceng.Statement) -> list:
|
||||
"""
|
||||
Recursively extracts all feature statements from a given rule statement.
|
||||
|
||||
Args:
|
||||
feature (capa.engine.Statement): The feature statement to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the given feature statement.
|
||||
"""
|
||||
children = []
|
||||
|
||||
if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)):
|
||||
for child in feature.children:
|
||||
children.extend(get_child_features(child))
|
||||
elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)):
|
||||
children.extend(get_child_features(feature.child))
|
||||
else:
|
||||
children.append(feature)
|
||||
return children
|
||||
|
||||
|
||||
def get_features(rule_path: str) -> list:
|
||||
def get_features(rule_path: str) -> Set[Feature]:
|
||||
"""
|
||||
Extracts all features from a given rule file.
|
||||
|
||||
@@ -47,17 +26,15 @@ def get_features(rule_path: str) -> list:
|
||||
rule_path (str): The path to the rule file to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the rule file.
|
||||
set: A set of all feature statements contained within the rule file.
|
||||
"""
|
||||
feature_list = []
|
||||
with Path(rule_path).open("r", encoding="utf-8") as f:
|
||||
try:
|
||||
new_rule = capa.rules.Rule.from_yaml(f.read())
|
||||
feature_list = get_child_features(new_rule.statement)
|
||||
return new_rule.extract_all_features()
|
||||
except Exception as e:
|
||||
logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e))
|
||||
sys.exit(-1)
|
||||
return feature_list
|
||||
|
||||
|
||||
def find_overlapping_rules(new_rule_path, rules_path):
|
||||
@@ -67,7 +44,6 @@ def find_overlapping_rules(new_rule_path, rules_path):
|
||||
|
||||
# Loads features of new rule in a list.
|
||||
new_rule_features = get_features(new_rule_path)
|
||||
|
||||
count = 0
|
||||
overlapping_rules = []
|
||||
|
||||
@@ -75,7 +51,7 @@ def find_overlapping_rules(new_rule_path, rules_path):
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
|
||||
for rule_name, rule in ruleset.rules.items():
|
||||
rule_features = get_child_features(rule.statement)
|
||||
rule_features = rule.extract_all_features()
|
||||
|
||||
if not len(rule_features):
|
||||
continue
|
||||
|
||||
218
scripts/show-unused-features.py
Normal file
218
scripts/show-unused-features.py
Normal file
@@ -0,0 +1,218 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import typing
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Set, Tuple
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
|
||||
import tabulate
|
||||
from termcolor import colored
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.render.verbose as v
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
import capa.features.address
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.base_extractor
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
from capa.features.common import Feature
|
||||
from capa.features.extractors.base_extractor import FunctionHandle
|
||||
|
||||
logger = logging.getLogger("show-unused-features")
|
||||
|
||||
|
||||
def format_address(addr: capa.features.address.Address) -> str:
|
||||
return v.format_address(capa.features.freeze.Address.from_capa((addr)))
|
||||
|
||||
|
||||
def get_rules_feature_set(rules_path) -> Set[Feature]:
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
rules_feature_set: Set[Feature] = set()
|
||||
for _, rule in ruleset.rules.items():
|
||||
rules_feature_set.update(rule.extract_all_features())
|
||||
|
||||
return rules_feature_set
|
||||
|
||||
|
||||
def get_file_features(
|
||||
functions: Tuple[FunctionHandle, ...], extractor: capa.features.extractors.base_extractor.FeatureExtractor
|
||||
) -> typing.Counter[Feature]:
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
for f in functions:
|
||||
if extractor.is_library_function(f.address):
|
||||
function_name = extractor.get_function_name(f.address)
|
||||
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
|
||||
continue
|
||||
|
||||
for feature, _ in extractor.extract_function_features(f):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
for feature, _ in extractor.extract_basic_block_features(f, bb):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
for feature, _ in extractor.extract_insn_features(f, bb, insn):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
return feature_map
|
||||
|
||||
|
||||
def get_colored(s: str):
|
||||
if "(" in s and ")" in s:
|
||||
s_split = s.split("(", 1)
|
||||
s_color = colored(s_split[1][:-1], "cyan")
|
||||
return f"{s_split[0]}({s_color})"
|
||||
else:
|
||||
return colored(s, "cyan")
|
||||
|
||||
|
||||
def print_unused_features(feature_map: typing.Counter[Feature], rules_feature_set: Set[Feature]):
|
||||
unused_features = []
|
||||
for feature, count in reversed(feature_map.most_common()):
|
||||
if feature in rules_feature_set:
|
||||
continue
|
||||
unused_features.append((str(count), get_colored(str(feature))))
|
||||
print("\n")
|
||||
print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain"))
|
||||
print("\n")
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})
|
||||
|
||||
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
if args.function and args.backend == "pefile":
|
||||
print("pefile backend does not support extracting function features")
|
||||
return -1
|
||||
|
||||
try:
|
||||
taste = capa.helpers.get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
feature_map.update([feature for feature, _ in extractor.extract_global_features()])
|
||||
|
||||
function_handles: Tuple[FunctionHandle, ...]
|
||||
if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor):
|
||||
# pefile extractor doesn't extract function features
|
||||
function_handles = ()
|
||||
else:
|
||||
function_handles = tuple(extractor.get_functions())
|
||||
|
||||
if args.function:
|
||||
if args.format == "freeze":
|
||||
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
|
||||
else:
|
||||
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))
|
||||
|
||||
if args.function not in [format_address(fh.address) for fh in function_handles]:
|
||||
print(f"{args.function} not a function")
|
||||
return -1
|
||||
|
||||
if len(function_handles) == 0:
|
||||
print(f"{args.function} not a function")
|
||||
return -1
|
||||
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_feature_set = get_rules_feature_set(args.rules)
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
return 0
|
||||
|
||||
|
||||
def ida_main():
|
||||
import idc
|
||||
|
||||
import capa.main
|
||||
import capa.features.extractors.ida.extractor
|
||||
|
||||
function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
|
||||
print(f"getting features for current function {hex(function)}")
|
||||
|
||||
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
feature_map.update([feature for feature, _ in extractor.extract_file_features()])
|
||||
|
||||
function_handles = tuple(extractor.get_functions())
|
||||
|
||||
if function:
|
||||
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))
|
||||
|
||||
if len(function_handles) == 0:
|
||||
print(f"{hex(function)} not a function")
|
||||
return -1
|
||||
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_path = capa.main.get_default_root() / "rules"
|
||||
rules_feature_set = get_rules_feature_set([rules_path])
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if capa.helpers.is_runtime_ida():
|
||||
ida_main()
|
||||
else:
|
||||
sys.exit(main())
|
||||
@@ -45,6 +45,7 @@ def get_rule_path():
|
||||
pytest.param("show-capabilities-by-function.py", [get_file_path()]),
|
||||
pytest.param("show-features.py", [get_file_path()]),
|
||||
pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]),
|
||||
pytest.param("show-unused-features.py", [get_file_path()]),
|
||||
pytest.param("capa_as_library.py", [get_file_path()]),
|
||||
],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user