rules: cache the ruleset to disk

ref: #1212
This commit is contained in:
Willi Ballenthin
2023-01-20 14:50:00 +01:00
parent 4b7a9e149f
commit 476ffabae9
8 changed files with 162 additions and 17 deletions

View File

@@ -33,6 +33,7 @@ import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.rules.cache
import capa.render.default
import capa.render.verbose
import capa.features.common
@@ -561,7 +562,10 @@ def is_nursery_rule_path(path: str) -> bool:
return "nursery" in path
def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
def collect_rule_file_paths(rule_paths: List[str]) -> List[str]:
"""
collect all rule file paths, including those in subdirectories.
"""
rule_file_paths = []
for rule_path in rule_paths:
if not os.path.exists(rule_path):
@@ -589,6 +593,23 @@ def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
rule_path = os.path.join(root, file)
rule_file_paths.append(rule_path)
return rule_file_paths
def get_rules(rule_paths: List[str], disable_progress=False) -> RuleSet:
rule_file_paths = collect_rule_file_paths(rule_paths)
# this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = []
for file_path in rule_file_paths:
with open(file_path, "rb") as f:
rule_contents.append(f.read())
ruleset = capa.rules.cache.load_cached_ruleset(rule_contents)
if ruleset is not None:
return ruleset
rules = [] # type: List[Rule]
pbar = tqdm.tqdm
@@ -597,20 +618,24 @@ def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
for rule_file_path in pbar(list(rule_file_paths), desc="loading ", unit=" rules"):
for path, content in pbar(zip(rule_file_paths, rule_contents), desc="parsing ", unit=" rules"):
try:
rule = capa.rules.Rule.from_yaml_file(rule_file_path)
rule = capa.rules.Rule.from_yaml(content)
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = rule_file_path
if is_nursery_rule_path(rule_file_path):
rule.meta["capa/path"] = path
if is_nursery_rule_path(path):
rule.meta["capa/nursery"] = True
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)
logger.debug("parsed rule: '%s' with scope: %s", rule.name, rule.scope)
return rules
ruleset = capa.rules.RuleSet(rules)
capa.rules.cache.cache_ruleset(ruleset)
return ruleset
def get_signatures(sigs_path):
@@ -1001,7 +1026,7 @@ def main(argv=None):
return E_INVALID_FILE_TYPE
try:
rules = capa.rules.RuleSet(get_rules(args.rules, disable_progress=args.quiet))
rules = get_rules(args.rules, disable_progress=args.quiet)
logger.debug(
"successfully loaded %s rules",
@@ -1151,7 +1176,7 @@ def ida_main():
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = capa.rules.RuleSet(get_rules([rules_path]))
rules = get_rules([rules_path])
meta = capa.ida.helpers.collect_metadata([rules_path])

121
capa/rules/cache.py Normal file
View File

@@ -0,0 +1,121 @@
import sys
import pickle
import hashlib
import logging
import os.path
from typing import List, Optional
from dataclasses import dataclass
import capa.rules
logger = logging.getLogger(__name__)
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
CacheIdentifier = str
def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier:
hash = hashlib.sha256()
# note that this changes with each release,
# so cache identifiers will never collide across releases.
version = capa.version.__version__
hash.update(version.encode("utf-8"))
hash.update(b"\x00")
rule_hashes = list(sorted([hashlib.sha256(rule).hexdigest() for rule in rule_content]))
for rule_hash in rule_hashes:
hash.update(rule_hash.encode("ascii"))
hash.update(b"\x00")
return hash.hexdigest()
def get_default_cache_directory() -> str:
# ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813
#
# Linux: $XDG_CACHE_HOME/capa/
# Windows: %LOCALAPPDATA%\flare\capa\cache
# MacOS: ~/Library/Caches/capa
# ref: https://stackoverflow.com/a/8220141/87207
if sys.platform == "linux" or sys.platform == "linux2":
directory = os.environ.get("XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache", "capa"))
elif sys.platform == "darwin":
directory = os.path.join(os.environ["HOME"], "Library", "Caches", "capa")
elif sys.platform == "win32":
directory = os.path.join(os.environ["LOCALAPPDATA"], "flare", "capa", "cache")
else:
raise NotImplementedError(f"unsupported platform: {sys.platform}")
os.makedirs(directory, exist_ok=True)
return directory
def get_default_cache_path(id: CacheIdentifier) -> str:
filename = "capa-" + id[:8] + ".cache"
return os.path.join(get_default_cache_directory(), filename)
MAGIC = b"capa"
VERSION = b"\x00\x00\x00\x01"
@dataclass
class RuleCache:
id: CacheIdentifier
ruleset: capa.rules.RuleSet
def dump(self):
return MAGIC + VERSION + self.id.encode("ascii") + pickle.dumps(self)
@staticmethod
def load(data):
assert data.startswith(MAGIC + VERSION)
id = data[0x8:0x48].decode("ascii")
cache = pickle.loads(data[0x48:])
assert isinstance(cache, RuleCache)
assert cache.id == id
return cache
def cache_ruleset(ruleset: capa.rules.RuleSet):
rule_contents = []
for rule in ruleset.rules.values():
if rule.is_subscope_rule():
continue
with open(rule.meta["capa/path"], "rb") as f:
rule_contents.append(f.read())
id = compute_cache_identifier(rule_contents)
path = get_default_cache_path(id)
if os.path.exists(path):
logger.debug("rule set already cached to %s", path)
return
cache = RuleCache(id, ruleset)
with open(path, "wb") as f:
f.write(cache.dump())
logger.debug("rule set cached to %s", path)
return
def load_cached_ruleset(rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]:
id = compute_cache_identifier(rule_contents)
path = get_default_cache_path(id)
if not os.path.exists(path):
logger.debug("rule set cache does not exist: %s", path)
return None
logger.debug("loading rule set from cache: %s", path)
with open(path, "rb") as f:
buf = f.read()
cache = RuleCache.load(buf)
return cache.ruleset

View File

@@ -152,7 +152,7 @@ def main(argv=None):
capa.main.handle_common_args(args)
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules))
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))

View File

@@ -709,9 +709,8 @@ def main(argv=None):
logging.getLogger("capa2yara").setLevel(level)
try:
rules_ = capa.main.get_rules([args.rules], disable_progress=True)
namespaces = capa.rules.index_rules_by_namespace(rules_)
rules = capa.rules.RuleSet(rules_)
rules = capa.main.get_rules([args.rules], disable_progress=True)
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)

View File

@@ -161,7 +161,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers
def capa_details(rules_path, file_path, output_format="dictionary"):
# load rules from disk
rules = capa.rules.RuleSet(capa.main.get_rules([rules_path], disable_progress=True))
rules = capa.main.get_rules([rules_path], disable_progress=True)
# extract features and find capabilities
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)

View File

@@ -1002,7 +1002,7 @@ def main(argv=None):
time0 = time.time()
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True))
rules = capa.main.get_rules(args.rules, disable_progress=True)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)

View File

@@ -88,7 +88,7 @@ def main(argv=None):
try:
with capa.main.timing("load rules"):
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True))
rules = capa.main.get_rules(args.rules, disable_progress=True)
except (IOError) as e:
logger.error("%s", str(e))
return -1

View File

@@ -141,7 +141,7 @@ def main(argv=None):
return -1
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules))
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)