mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Merge pull request #1463 from Aayush-Goel-04/Aayush-Goel-04/Issue#1451
Utility script to detect feature overlap between new and existing CAPA rules.
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
## master (unreleased)
|
||||
|
||||
### New Features
|
||||
- Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
111
scripts/detect_duplicate_features.py
Normal file
111
scripts/detect_duplicate_features.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine as ceng
|
||||
|
||||
logger = logging.getLogger("detect_duplicate_features")
|
||||
|
||||
|
||||
def get_child_features(feature: ceng.Statement) -> list:
|
||||
"""
|
||||
Recursively extracts all feature statements from a given rule statement.
|
||||
|
||||
Args:
|
||||
feature (capa.engine.Statement): The feature statement to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the given feature statement.
|
||||
"""
|
||||
children = []
|
||||
|
||||
if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)):
|
||||
for child in feature.children:
|
||||
children.extend(get_child_features(child))
|
||||
elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)):
|
||||
children.extend(get_child_features(feature.child))
|
||||
else:
|
||||
children.append(feature)
|
||||
return children
|
||||
|
||||
|
||||
def get_features(rule_path: str) -> list:
|
||||
"""
|
||||
Extracts all features from a given rule file.
|
||||
|
||||
Args:
|
||||
rule_path (str): The path to the rule file to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the rule file.
|
||||
"""
|
||||
feature_list = []
|
||||
with open(rule_path, "r") as f:
|
||||
try:
|
||||
new_rule = capa.rules.Rule.from_yaml(f.read())
|
||||
feature_list = get_child_features(new_rule.statement)
|
||||
except Exception as e:
|
||||
logger.error("Error: New rule " + rule_path + " " + str(type(e)) + " " + str(e))
|
||||
sys.exit(-1)
|
||||
return feature_list
|
||||
|
||||
|
||||
def find_overlapping_rules(new_rule_path, rules_path):
|
||||
if not new_rule_path.endswith(".yml"):
|
||||
logger.error("FileNotFoundError ! New rule file name doesn't end with .yml")
|
||||
sys.exit(-1)
|
||||
|
||||
# Loads features of new rule in a list.
|
||||
new_rule_features = get_features(new_rule_path)
|
||||
|
||||
count = 0
|
||||
overlapping_rules = []
|
||||
|
||||
# capa.rules.RuleSet stores all rules in given paths
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
|
||||
for rule_name, rule in ruleset.rules.items():
|
||||
rule_features = get_child_features(rule.statement)
|
||||
|
||||
if not len(rule_features):
|
||||
continue
|
||||
count += 1
|
||||
# Checks if any features match between existing and new rule.
|
||||
if any([feature in rule_features for feature in new_rule_features]):
|
||||
overlapping_rules.append(rule_name)
|
||||
|
||||
result = {"overlapping_rules": overlapping_rules, "count": count}
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Find overlapping features in Capa rules.")
|
||||
|
||||
parser.add_argument("rules", type=str, action="append", help="Path to rules")
|
||||
parser.add_argument("new_rule", type=str, help="Path to new rule")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
new_rule_path = args.new_rule
|
||||
rules_path = args.rules
|
||||
|
||||
result = find_overlapping_rules(new_rule_path, rules_path)
|
||||
|
||||
print("\nNew rule path : %s" % new_rule_path)
|
||||
print("Number of rules checked : %s " % result["count"])
|
||||
if result["overlapping_rules"]:
|
||||
print("Paths to overlapping rules : ")
|
||||
for r in result["overlapping_rules"]:
|
||||
print("- %s" % r)
|
||||
else:
|
||||
print("Paths to overlapping rules : None")
|
||||
print("Number of rules containing same features : %s" % len(result["overlapping_rules"]))
|
||||
print("\n")
|
||||
|
||||
return len(result["overlapping_rules"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -8,9 +8,11 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import textwrap
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from fixtures import *
|
||||
|
||||
CD = os.path.dirname(__file__)
|
||||
|
||||
@@ -82,3 +84,112 @@ def test_proto_conversion(tmpdir):
|
||||
assert p.returncode == 0
|
||||
|
||||
assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')
|
||||
|
||||
|
||||
def test_detect_duplicate_features(tmpdir):
|
||||
TEST_RULE_0 = textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: Test Rule 0
|
||||
scope: function
|
||||
features:
|
||||
- and:
|
||||
- number: 1
|
||||
- not:
|
||||
- string: process
|
||||
"""
|
||||
)
|
||||
|
||||
TEST_RULESET = {
|
||||
"rule_1": textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: Test Rule 1
|
||||
features:
|
||||
- or:
|
||||
- string: unique
|
||||
- number: 2
|
||||
- and:
|
||||
- or:
|
||||
- arch: i386
|
||||
- number: 4
|
||||
- not:
|
||||
- count(mnemonic(xor)): 5
|
||||
- not:
|
||||
- os: linux
|
||||
"""
|
||||
),
|
||||
"rule_2": textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: Test Rule 2
|
||||
features:
|
||||
- and:
|
||||
- string: "sites.ini"
|
||||
- basic block:
|
||||
- and:
|
||||
- api: CreateFile
|
||||
- mnemonic: xor
|
||||
"""
|
||||
),
|
||||
"rule_3": textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: Test Rule 3
|
||||
features:
|
||||
- or:
|
||||
- not:
|
||||
- number: 4
|
||||
- basic block:
|
||||
- and:
|
||||
- api: bind
|
||||
- number: 2
|
||||
"""
|
||||
),
|
||||
"rule_4": textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: Test Rule 4
|
||||
features:
|
||||
- not:
|
||||
- string: "expa"
|
||||
"""
|
||||
),
|
||||
}
|
||||
|
||||
"""
|
||||
The rule_overlaps list represents the number of overlaps between each rule in the RULESET.
|
||||
An overlap includes a rule overlap with itself.
|
||||
The scripts
|
||||
The overlaps are like:
|
||||
- Rule 0 has zero overlaps in RULESET
|
||||
- Rule 1 overlaps with 3 other rules in RULESET
|
||||
- Rule 4 overlaps with itself in RULESET
|
||||
These overlap values indicate the number of rules with which
|
||||
each rule in RULESET has overlapping features.
|
||||
"""
|
||||
rule_overlaps = [0, 4, 3, 3, 1]
|
||||
|
||||
rule_dir = tmpdir.mkdir("capa_rule_overlap_test")
|
||||
rule_paths = []
|
||||
|
||||
rule_file = tmpdir.join("rule_0.yml")
|
||||
rule_file.write(TEST_RULE_0)
|
||||
rule_paths.append(rule_file.strpath)
|
||||
|
||||
for rule_name, RULE_CONTENT in TEST_RULESET.items():
|
||||
rule_file = rule_dir.join("%s.yml" % rule_name)
|
||||
rule_file.write(RULE_CONTENT)
|
||||
rule_paths.append(rule_file.strpath)
|
||||
|
||||
# tests if number of overlaps for rules in RULESET found are correct.
|
||||
script_path = get_script_path("detect_duplicate_features.py")
|
||||
for expected_overlaps, rule_path in zip(rule_overlaps, rule_paths):
|
||||
args = [rule_dir.strpath, rule_path]
|
||||
overlaps_found = run_program(script_path, args)
|
||||
assert overlaps_found.returncode == expected_overlaps
|
||||
|
||||
Reference in New Issue
Block a user