initial commit

This commit is contained in:
Yacine Elhamer
2023-07-17 11:50:49 +01:00
parent ce15a2b01e
commit e3f60ea0fb
9 changed files with 396 additions and 70 deletions

View File

@@ -16,6 +16,7 @@
- publish via PyPI trusted publishing #1491 @williballenthin
- migrate to pyproject.toml #1301 @williballenthin
- Add ProcessesAddress and ThreadAddress #1612 @yelhamer
- Add dynamic capability extraction @yelhamer
### Breaking Changes
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat

View File

@@ -22,7 +22,7 @@ import textwrap
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple, Callable, cast
from typing import Any, Dict, List, Tuple, Callable
import halo
import tqdm
@@ -84,6 +84,8 @@ from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
ThreadHandle,
ProcessHandle,
FunctionHandle,
FeatureExtractor,
StaticFeatureExtractor,
@@ -264,6 +266,7 @@ def find_static_capabilities(
feature_counts = rdoc.FeatureCounts(file=0, functions=())
library_functions: Tuple[rdoc.LibraryFunction, ...] = ()
assert isinstance(extractor, StaticFeatureExtractor)
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
@@ -338,13 +341,131 @@ def find_static_capabilities(
return matches, meta
def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> Tuple[FeatureSet, MatchResults]:
"""
find matches for the given rules for the given thread.
returns: tuple containing (features for thread, match results for thread)
"""
# all features found for the instruction.
features = collections.defaultdict(set) # type: FeatureSet
for feature, addr in itertools.chain(
extractor.extract_thread_features(ph, th), extractor.extract_global_features()
):
features[feature].add(addr)
# matches found at this instruction.
_, matches = ruleset.match(Scope.THREAD, features, th.address)
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches
def find_process_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle
) -> Tuple[MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given process.
returns: tuple containing (match results for process, match results for threads, number of features)
"""
# all features found within this process,
# includes features found within threads.
process_features = collections.defaultdict(set) # type: FeatureSet
# matches found at the thread scope.
# might be found at different threads, thats ok.
thread_matches = collections.defaultdict(list) # type: MatchResults
for th in extractor.get_threads(ph):
features, tmatches = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in features.items():
process_features[feature].update(vas)
for rule_name, res in tmatches.items():
thread_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()):
process_features[feature].add(va)
_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return process_matches, thread_matches, len(process_features)
def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
) -> Tuple[MatchResults, Any]:
all_process_matches = collections.defaultdict(list) # type: MatchResults
all_thread_matches = collections.defaultdict(list) # type: MatchResults
feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=())
assert isinstance(extractor, DynamicFeatureExtractor)
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
def pbar(s, *args, **kwargs):
return s
processes = list(extractor.get_processes())
pb = pbar(processes, desc="matching", unit=" processes", leave=False)
for p in pb:
process_matches, thread_matches, feature_count = find_process_capabilities(ruleset, extractor, p)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
)
logger.debug("analyzed process 0x%x and extracted %d features", p.address, feature_count)
for rule_name, res in process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
all_thread_matches[rule_name].extend(res)
# collection of features that captures the rule matches within process and thread scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
process_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(all_process_matches.items(), all_thread_matches.items()):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
capa.engine.index_rule_matches(process_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = feature_count
matches = dict(
itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_thread_matches.items(),
all_process_matches.items(),
all_file_matches.items(),
)
)
meta = {
"feature_counts": feature_counts,
}
return matches, meta
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, **kwargs) -> Tuple[MatchResults, Any]:
if isinstance(extractor, StaticFeatureExtractor):
extractor_: StaticFeatureExtractor = cast(StaticFeatureExtractor, extractor)
return find_static_capabilities(ruleset, extractor_, kwargs)
return find_static_capabilities(ruleset, extractor, kwargs)
elif isinstance(extractor, DynamicFeatureExtractor):
# extractor_ = cast(DynamicFeatureExtractor, extractor)
raise NotImplementedError()
return find_dynamic_capabilities(ruleset, extractor, kwargs)
else:
raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}")
@@ -773,6 +894,72 @@ def get_signatures(sigs_path):
return paths
def get_sample_hashes(sample_path, extractor: FeatureExtractor) -> Tuple[str, str, str]:
if isinstance(extractor, StaticFeatureExtractor):
md5_ = hashlib.md5()
sha1_ = hashlib.sha1()
sha256_ = hashlib.sha256()
with open(sample_path, "rb") as f:
buf = f.read()
md5_.update(buf)
sha1_.update(buf)
sha256_.update(buf)
md5, sha1, sha256 = md5_.hexdigest(), sha1_.hexdigest(), sha256_.hexdigest()
elif isinstance(extractor, DynamicFeatureExtractor):
import json
if isinstance(extractor, capa.features.extractors.cape.extractor.CapeExtractor):
with open(sample_path, "rb") as f:
report = json.load(f)
md5 = report["target"]["file"]["md5"]
sha1 = report["target"]["file"]["sha1"]
sha256 = report["target"]["file"]["sha256"]
else:
md5, sha1, sha256 = "0", "0", "0"
else:
raise ValueError("invalid extractor")
return md5, sha1, sha256
def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
if isinstance(extractor, StaticFeatureExtractor):
return rdoc.StaticAnalysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
base_address=frz.Address.from_capa(extractor.get_base_address()),
layout=rdoc.StaticLayout(
functions=(),
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
),
feature_counts=counts["feature_counts"],
library_functions=counts["library_functions"],
)
elif isinstance(extractor, DynamicFeatureExtractor):
return rdoc.DynamicAnalysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
layout=rdoc.DynamicLayout(
processes=(),
),
feature_counts=counts["feature_counts"],
)
else:
raise ValueError("invalid extractor type")
def collect_metadata(
argv: List[str],
sample_path: str,
@@ -780,18 +967,11 @@ def collect_metadata(
os_: str,
rules_path: List[str],
extractor: FeatureExtractor,
counts: dict,
) -> rdoc.Metadata:
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
assert isinstance(extractor, StaticFeatureExtractor)
with open(sample_path, "rb") as f:
buf = f.read()
md5.update(buf)
sha1.update(buf)
sha256.update(buf)
# if it's a binary sample we hash it, if it's a report
# we fetch the hashes from the report
md5, sha1, sha256 = get_sample_hashes(sample_path, extractor)
if rules_path != [RULES_PATH_DEFAULT_STRING]:
rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]
@@ -799,39 +979,72 @@ def collect_metadata(
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else NO_ADDRESS
return rdoc.Metadata(
timestamp=datetime.datetime.now(),
version=capa.version.__version__,
argv=tuple(argv) if argv else None,
sample=rdoc.Sample(
md5=md5.hexdigest(),
sha1=sha1.hexdigest(),
sha256=sha256.hexdigest(),
md5=md5,
sha1=sha1,
sha256=sha256,
path=os.path.normpath(sample_path),
),
analysis=rdoc.Analysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
base_address=frz.Address.from_capa(base_addr),
layout=rdoc.Layout(
functions=(),
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
),
feature_counts=rdoc.FeatureCounts(file=0, functions=()),
library_functions=(),
analysis=get_sample_analysis(
format_,
arch,
os_,
extractor,
rules_path,
counts,
),
)
def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
def compute_dynamic_layout(rules, extractor, capabilities) -> rdoc.Layout:
"""
compute a metadata structure that links threads
to the processes in which they're found.
only collect the threads at which some rule matched.
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
assert isinstance(extractor, DynamicFeatureExtractor)
processes_by_thread: Dict[Address, Address] = {}
threads_by_processes: Dict[Address, List[Address]] = {}
for p in extractor.get_processes():
threads_by_processes[p.address] = []
for t in extractor.get_threads(p):
processes_by_thread[t.address] = p.address
threads_by_processes[p.address].append(t.address)
matched_threads = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.get("scopes")["dynamic"]:
for addr, _ in matches:
assert addr in processes_by_thread
matched_threads.add(addr)
layout = rdoc.DynamicLayout(
processes=tuple(
rdoc.ProcessLayout(
address=frz.Address.from_capa(p),
matched_threads=tuple(
rdoc.ThreadLayout(address=frz.Address.from_capa(t)) for t in threads if t in matched_threads
) # this object is open to extension in the future,
# such as with the function name, etc.
)
for p, threads in threads_by_processes.items()
if len([t for t in threads if t in matched_threads]) > 0
)
)
return layout
def compute_static_layout(rules, extractor, capabilities) -> rdoc.Layout:
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
@@ -840,6 +1053,7 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
assert isinstance(extractor, StaticFeatureExtractor)
functions_by_bb: Dict[Address, Address] = {}
bbs_by_function: Dict[Address, List[Address]] = {}
for f in extractor.get_functions():
@@ -851,12 +1065,12 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.get("scopes")["static"]:
for addr, _ in matches:
assert addr in functions_by_bb
matched_bbs.add(addr)
layout = rdoc.Layout(
layout = rdoc.StaticLayout(
functions=tuple(
rdoc.FunctionLayout(
address=frz.Address.from_capa(f),
@@ -873,6 +1087,15 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
return layout
def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
if isinstance(extractor, StaticFeatureExtractor):
return compute_static_layout(rules, extractor, capabilities)
elif isinstance(extractor, DynamicFeatureExtractor):
return compute_dynamic_layout(rules, extractor, capabilities)
else:
raise ValueError("extractor must be either a static or dynamic extracotr")
def install_common_args(parser, wanted=None):
"""
register a common set of command line arguments for re-use by main & scripts.
@@ -1308,12 +1531,9 @@ def main(argv=None):
log_unsupported_os_error()
return E_INVALID_FILE_OS
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor, counts)
meta.analysis.layout = compute_layout(rules, extractor, capabilities)
if has_file_limitation(rules, capabilities):

View File

@@ -10,6 +10,7 @@ import collections
from typing import Dict, List, Tuple, Union, Optional
from pydantic import Field, BaseModel
from typing_extensions import TypeAlias
import capa.rules
import capa.engine
@@ -49,10 +50,26 @@ class FunctionLayout(Model):
matched_basic_blocks: Tuple[BasicBlockLayout, ...]
class Layout(Model):
class ThreadLayout(Model):
address: frz.Address
class ProcessLayout(Model):
address: frz.Address
matched_threads: Tuple[ThreadLayout, ...]
class StaticLayout(Model):
functions: Tuple[FunctionLayout, ...]
class DynamicLayout(Model):
processes: Tuple[ProcessLayout, ...]
Layout: TypeAlias = Union[StaticLayout, DynamicLayout]
class LibraryFunction(Model):
address: frz.Address
name: str
@@ -63,23 +80,49 @@ class FunctionFeatureCount(Model):
count: int
class FeatureCounts(Model):
class ProcessFeatureCount(Model):
address: frz.Address
count: int
class StaticFeatureCounts(Model):
file: int
functions: Tuple[FunctionFeatureCount, ...]
class Analysis(Model):
class DynamicFeatureCounts(Model):
file: int
processes: Tuple[ProcessFeatureCount, ...]
FeatureCounts: TypeAlias = Union[StaticFeatureCounts, DynamicFeatureCounts]
class StaticAnalysis(Model):
format: str
arch: str
os: str
extractor: str
rules: Tuple[str, ...]
base_address: frz.Address
layout: Layout
feature_counts: FeatureCounts
layout: StaticLayout
feature_counts: StaticFeatureCounts
library_functions: Tuple[LibraryFunction, ...]
class DynamicAnalysis(Model):
format: str
arch: str
os: str
extractor: str
rules: Tuple[str, ...]
layout: DynamicLayout
feature_counts: DynamicFeatureCounts
Analysis: TypeAlias = Union[StaticAnalysis, DynamicAnalysis]
class Metadata(Model):
timestamp: datetime.datetime
version: str
@@ -510,7 +553,7 @@ class RuleMetadata(FrozenModel):
name: str
namespace: Optional[str]
authors: Tuple[str, ...]
scope: capa.rules.Scope
scopes: capa.rules.Scopes
attack: Tuple[AttackSpec, ...] = Field(alias="att&ck")
mbc: Tuple[MBCSpec, ...]
references: Tuple[str, ...]
@@ -527,7 +570,7 @@ class RuleMetadata(FrozenModel):
name=rule.meta.get("name"),
namespace=rule.meta.get("namespace"),
authors=rule.meta.get("authors"),
scope=capa.rules.Scope(rule.meta.get("scope")),
scopes=capa.rules.Scopes.from_dict(rule.meta.get("scopes")),
attack=tuple(map(AttackSpec.from_str, rule.meta.get("att&ck", []))),
mbc=tuple(map(MBCSpec.from_str, rule.meta.get("mbc", []))),
references=rule.meta.get("references", []),

View File

@@ -60,13 +60,26 @@ def format_address(address: frz.Address) -> str:
assert isinstance(id_, int)
assert isinstance(return_address, int)
return f"event: {id_}, retaddr: 0x{return_address:x}"
elif address.type == frz.AddressType.PROCESS:
assert isinstance(address.value, tuple)
ppid, pid = address.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
return f"process ppid: {ppid}, process pid: {pid}"
elif address.type == frz.AddressType.THREAD:
assert isinstance(address.value, tuple)
ppid, pid, tid = address.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
return f"process ppid: {ppid}, process pid: {pid}, thread id: {tid}"
elif address.type == frz.AddressType.NO_ADDRESS:
return "global"
else:
raise ValueError("unexpected address type")
def render_meta(ostream, doc: rd.ResultDocument):
def render_static_meta(ostream, doc: rd.ResultDocument):
"""
like:
@@ -85,6 +98,8 @@ def render_meta(ostream, doc: rd.ResultDocument):
function count 42
total feature count 1918
"""
assert isinstance(doc.meta.analysis, rd.StaticAnalysis)
rows = [
("md5", doc.meta.sample.md5),
("sha1", doc.meta.sample.sha1),
@@ -109,6 +124,57 @@ def render_meta(ostream, doc: rd.ResultDocument):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
def render_dynamic_meta(ostream, doc: rd.ResultDocument):
"""
like:
md5 84882c9d43e23d63b82004fae74ebb61
sha1 c6fb3b50d946bec6f391aefa4e54478cf8607211
sha256 5eced7367ed63354b4ed5c556e2363514293f614c2c2eb187273381b2ef5f0f9
path /tmp/packed-report,jspn
timestamp 2023-07-17T10:17:05.796933
capa version 0.0.0
os windows
format pe
arch amd64
extractor CAPEFeatureExtractor
rules (embedded rules)
process count 42
total feature count 1918
"""
assert isinstance(doc.meta.analysis, rd.DynamicAnalysis)
rows = [
("md5", doc.meta.sample.md5),
("sha1", doc.meta.sample.sha1),
("sha256", doc.meta.sample.sha256),
("path", doc.meta.sample.path),
("timestamp", doc.meta.timestamp),
("capa version", doc.meta.version),
("os", doc.meta.analysis.os),
("format", doc.meta.analysis.format),
("arch", doc.meta.analysis.arch),
("extractor", doc.meta.analysis.extractor),
("rules", "\n".join(doc.meta.analysis.rules)),
("process count", len(doc.meta.analysis.feature_counts.processes)),
(
"total feature count",
doc.meta.analysis.feature_counts.file + sum(p.count for p in doc.meta.analysis.feature_counts.processes),
),
]
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
def render_meta(osstream, doc: rd.ResultDocument):
if isinstance(doc.meta.analysis, rd.StaticAnalysis):
render_static_meta(osstream, doc)
elif isinstance(doc.meta.analysis, rd.DynamicAnalysis):
render_dynamic_meta(osstream, doc)
else:
raise ValueError("invalid meta analysis")
def render_rules(ostream, doc: rd.ResultDocument):
"""
like:
@@ -132,7 +198,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
had_match = True
rows = []
for key in ("namespace", "description", "scope"):
for key in ("namespace", "description", "scopes"):
v = getattr(rule.meta, key)
if not v:
continue
@@ -145,7 +211,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
rows.append((key, v))
if rule.meta.scope != capa.rules.FILE_SCOPE:
if capa.rules.FILE_SCOPE not in rule.meta.scopes:
locations = [m[0] for m in doc.rules[rule.meta.name].matches]
rows.append(("matches", "\n".join(map(format_address, locations))))

View File

@@ -267,6 +267,8 @@ def render_rules(ostream, doc: rd.ResultDocument):
api: kernel32.GetLastError @ 0x10004A87
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
"""
assert isinstance(doc.meta.analysis, rd.StaticAnalysis)
functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {}
for finfo in doc.meta.analysis.layout.functions:
faddress = finfo.address.to_capa()
@@ -322,7 +324,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
rows.append(("author", ", ".join(rule.meta.authors)))
rows.append(("scope", rule.meta.scope.value))
rows.append(("scopes", str(rule.meta.scopes)))
if rule.meta.attack:
rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack])))
@@ -338,7 +340,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
if rule.meta.scope == capa.rules.FILE_SCOPE:
if capa.rules.FILE_SCOPE in rule.meta.scopes:
matches = doc.rules[rule.meta.name].matches
if len(matches) != 1:
# i think there should only ever be one match per file-scope rule,
@@ -350,11 +352,11 @@ def render_rules(ostream, doc: rd.ResultDocument):
render_match(ostream, first_match, indent=0)
else:
for location, match in sorted(doc.rules[rule.meta.name].matches):
ostream.write(rule.meta.scope)
ostream.write(rule.meta.scopes)
ostream.write(" @ ")
ostream.write(capa.render.verbose.format_address(location))
if rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE:
if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes:
ostream.write(
" in function "
+ capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()]))

View File

@@ -129,11 +129,9 @@ def get_capa_results(args):
"error": f"unexpected error: {e}",
}
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)

View File

@@ -170,10 +170,7 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
capa_output: Any = False

View File

@@ -89,7 +89,7 @@ def main():
continue
if rule.meta.is_subscope_rule:
continue
if rule.meta.scope != capa.rules.Scope.FUNCTION:
if capa.rules.Scope.FUNCTION in rule.meta.scopes:
continue
ns = rule.meta.namespace

View File

@@ -94,6 +94,7 @@ def render_matches_by_function(doc: rd.ResultDocument):
- send HTTP request
- connect to HTTP server
"""
assert isinstance(doc.meta.analysis, rd.StaticAnalysis)
functions_by_bb: Dict[Address, Address] = {}
for finfo in doc.meta.analysis.layout.functions:
faddress = finfo.address
@@ -106,10 +107,10 @@ def render_matches_by_function(doc: rd.ResultDocument):
matches_by_function = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
if rule.meta.scope == capa.rules.FUNCTION_SCOPE:
if capa.rules.FUNCTION_SCOPE in rule.meta.scopes:
for addr, _ in rule.matches:
matches_by_function[addr].add(rule.meta.name)
elif rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE:
elif capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes:
for addr, _ in rule.matches:
function = functions_by_bb[addr]
matches_by_function[function].add(rule.meta.name)
@@ -178,11 +179,9 @@ def main(argv=None):
capa.helpers.log_unsupported_runtime_error()
return -1
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
if capa.main.has_file_limitation(rules, capabilities):