refactor main to for ease of integration (#1948)

* main: split main into a bunch of "main routines"

[wip] since there are a few references to BinExport2
that are in progress elsewhre. Next commit will remove them.

* main: remove references to wip BinExport2 code

* changelog

* main: rename first position argument "input_file"

closes #1946

* main: linters

* main: move rule-related routines to capa.rules

ref #1821

* main: extract routines to capa.loader module

closes #1821

* add loader module

* loader: learn to load freeze format

* freeze: use new cli arg handling

* Update capa/loader.py

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>

* main: remove duplicate documentation

* main: add doc about where some functions live

* scripts: migrate to new main wrapper helper functions

* scripts: port to main routines

* main: better handle auto-detection of backend

* scripts: migrate bulk-process to main wrappers

* scripts: migrate scripts to main wrappers

* main: rename *_from_args to *_from_cli

* changelog

* cache-ruleset: remove duplication

* main: fix tag handling

* cache-ruleset: fix cli args

* cache-ruleset: fix special rule cli handling

* scripts: fix type bytes

* main: remove old TODO message

* loader: fix references to binja extractor

---------

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
This commit is contained in:
Willi Ballenthin
2024-01-29 13:59:05 +01:00
committed by GitHub
parent d2e1a47192
commit c3301d3b3f
26 changed files with 1321 additions and 1168 deletions

View File

@@ -6,6 +6,9 @@
### Breaking Changes
- main: introduce wrapping routines within main for working with CLI args #1813 @williballenthin
- move functions from `capa.main` to new `capa.loader` namespace #1821 @williballenthin
### New Rules (0)
-

View File

@@ -458,18 +458,22 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
FORMAT_SC32,
FORMAT_SC64,
FORMAT_PE,
FORMAT_ELF,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_FREEZE,
FORMAT_RESULT,
}
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
FORMAT_UNKNOWN = "unknown"

View File

@@ -45,7 +45,7 @@ MATCH_RESULT = b'{"meta":'
MATCH_JSON_OBJECT = b'{"'
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
def extract_file_strings(buf: bytes, **kwargs) -> Iterator[Tuple[String, Address]]:
"""
extract ASCII and UTF-16 LE strings from file
"""
@@ -56,7 +56,7 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
yield String(s.s), FileOffsetAddress(s.offset)
def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
def extract_format(buf: bytes) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(MATCH_PE):
yield Format(FORMAT_PE), NO_ADDRESS
elif buf.startswith(MATCH_ELF):

View File

@@ -21,6 +21,7 @@ from pydantic import Field, BaseModel, ConfigDict
# https://github.com/mandiant/capa/issues/1699
from typing_extensions import TypeAlias
import capa.loader
import capa.helpers
import capa.version
import capa.features.file
@@ -681,14 +682,18 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="save capa features to a file")
capa.main.install_common_args(parser, {"sample", "format", "backend", "os", "signatures"})
capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"})
parser.add_argument("output", type=str, help="Path to output file")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
sigpaths = capa.main.get_signatures(args.signatures)
extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False)
try:
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
return e.status_code
Path(args.output).write_bytes(dump(extractor))

View File

@@ -69,7 +69,7 @@ def run_headless():
rules_path = pathlib.Path(args.rules)
logger.debug("rule path: %s", rules_path)
rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
@@ -78,7 +78,7 @@ def run_headless():
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True):
logger.info("capa encountered warnings during analysis")
@@ -119,7 +119,7 @@ def run_ui():
rules_path: pathlib.Path = pathlib.Path(rules_dir)
logger.info("running capa using rules from %s", str(rules_path))
rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
@@ -128,7 +128,7 @@ def run_ui():
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False):
logger.info("capa encountered warnings during analysis")

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import json
import inspect
import logging
@@ -16,12 +17,22 @@ from pathlib import Path
import tqdm
from capa.exceptions import UnsupportedFormatError
from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format
from capa.features.common import (
FORMAT_PE,
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_UNKNOWN,
Format,
)
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
EXTENSIONS_DYNAMIC = ("json", "json_")
EXTENSIONS_ELF = "elf_"
EXTENSIONS_FREEZE = "frz"
logger = logging.getLogger("capa")
@@ -81,6 +92,8 @@ def get_format_from_extension(sample: Path) -> str:
format_ = FORMAT_SC64
elif sample.name.endswith(EXTENSIONS_DYNAMIC):
format_ = get_format_from_report(sample)
elif sample.name.endswith(EXTENSIONS_FREEZE):
format_ = FORMAT_FREEZE
return format_
@@ -201,3 +214,16 @@ def log_unsupported_runtime_error():
" If you're seeing this message on the command line, please ensure you're running a supported Python version."
)
logger.error("-" * 80)
def is_running_standalone() -> bool:
"""
are we running from a PyInstaller'd executable?
if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
"""
# typically we only expect capa.main to be packaged via PyInstaller.
# therefore, this *should* be in capa.main; however,
# the Binary Ninja extractor uses this to resolve the BN API code,
# so we keep this in a common area.
# generally, other library code should not use this function.
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")

View File

@@ -636,7 +636,7 @@ class CapaExplorerForm(idaapi.PluginForm):
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")
return capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
return capa.rules.get_rules([rule_path], on_load_rule=on_load_rule)
except UserCancelledError:
logger.info("User cancelled analysis.")
return None
@@ -775,7 +775,7 @@ class CapaExplorerForm(idaapi.PluginForm):
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False

544
capa/loader.py Normal file
View File

@@ -0,0 +1,544 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import json
import logging
import datetime
from typing import Set, Dict, List, Optional
from pathlib import Path
import halo
from typing_extensions import assert_never
import capa.perf
import capa.rules
import capa.engine
import capa.helpers
import capa.version
import capa.render.json
import capa.rules.cache
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze as frz
import capa.render.vverbose
import capa.features.extractors
import capa.render.result_document
import capa.render.result_document as rdoc
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.elffile
import capa.features.extractors.dotnetfile
import capa.features.extractors.base_extractor
import capa.features.extractors.cape.extractor
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError
from capa.features.common import (
OS_AUTO,
FORMAT_PE,
FORMAT_ELF,
FORMAT_AUTO,
FORMAT_CAPE,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_DOTNET,
)
from capa.features.address import Address
from capa.features.extractors.base_extractor import (
SampleHashes,
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)
logger = logging.getLogger(__name__)
BACKEND_VIV = "vivisect"
BACKEND_DOTNET = "dotnet"
BACKEND_BINJA = "binja"
BACKEND_PEFILE = "pefile"
BACKEND_CAPE = "cape"
BACKEND_FREEZE = "freeze"
def is_supported_format(sample: Path) -> bool:
"""
Return if this is a supported file based on magic header values
"""
taste = sample.open("rb").read(0x100)
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
def is_supported_arch(sample: Path) -> bool:
buf = sample.read_bytes()
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
def get_arch(sample: Path) -> str:
buf = sample.read_bytes()
for feature, _ in capa.features.extractors.common.extract_arch(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def is_supported_os(sample: Path) -> bool:
buf = sample.read_bytes()
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
def get_os(sample: Path) -> str:
buf = sample.read_bytes()
for feature, _ in capa.features.extractors.common.extract_os(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def get_meta_str(vw):
"""
Return workspace meta information string
"""
meta = []
for k in ["Format", "Platform", "Architecture"]:
if k in vw.metadata:
meta.append(f"{k.lower()}: {vw.metadata[k]}")
return f"{', '.join(meta)}, number of functions: {len(vw.getFunctions())}"
def get_workspace(path: Path, input_format: str, sigpaths: List[Path]):
"""
load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures.
supported formats:
- pe
- elf
- shellcode 32-bit
- shellcode 64-bit
- auto
this creates and analyzes the workspace; however, it does *not* save the workspace.
this is the responsibility of the caller.
"""
# lazy import enables us to not require viv if user wants another backend.
import viv_utils
import viv_utils.flirt
logger.debug("generating vivisect workspace for: %s", path)
if input_format == FORMAT_AUTO:
if not is_supported_format(path):
raise UnsupportedFormatError()
# don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif input_format in {FORMAT_PE, FORMAT_ELF}:
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif input_format == FORMAT_SC32:
# these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False)
elif input_format == FORMAT_SC64:
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False)
else:
raise ValueError("unexpected format: " + input_format)
viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
vw.analyze()
logger.debug("%s", get_meta_str(vw))
return vw
def get_extractor(
input_path: Path,
input_format: str,
os_: str,
backend: str,
sigpaths: List[Path],
should_save_workspace=False,
disable_progress=False,
sample_path: Optional[Path] = None,
) -> FeatureExtractor:
"""
raises:
UnsupportedFormatError
UnsupportedArchError
UnsupportedOSError
"""
if backend == BACKEND_CAPE:
import capa.features.extractors.cape.extractor
report = json.loads(input_path.read_text(encoding="utf-8"))
return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
elif backend == BACKEND_DOTNET:
import capa.features.extractors.dnfile.extractor
if input_format not in (FORMAT_PE, FORMAT_DOTNET):
raise UnsupportedFormatError()
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path)
elif backend == BACKEND_BINJA:
import capa.helpers
from capa.features.extractors.binja.find_binja_api import find_binja_path
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if capa.helpers.is_running_standalone():
bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))
try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)
import capa.features.extractors.binja.extractor
if input_format not in (FORMAT_SC32, FORMAT_SC64):
if not is_supported_format(input_path):
raise UnsupportedFormatError()
if not is_supported_arch(input_path):
raise UnsupportedArchError()
if os_ == OS_AUTO and not is_supported_os(input_path):
raise UnsupportedOSError()
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = binaryninja.load(str(input_path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {input_path}")
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
elif backend == BACKEND_PEFILE:
import capa.features.extractors.pefile
return capa.features.extractors.pefile.PefileFeatureExtractor(input_path)
elif backend == BACKEND_VIV:
import capa.features.extractors.viv.extractor
if input_format not in (FORMAT_SC32, FORMAT_SC64):
if not is_supported_format(input_path):
raise UnsupportedFormatError()
if not is_supported_arch(input_path):
raise UnsupportedArchError()
if os_ == OS_AUTO and not is_supported_os(input_path):
raise UnsupportedOSError()
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(input_path, input_format, sigpaths)
if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_)
elif backend == BACKEND_FREEZE:
return frz.load(input_path.read_bytes())
else:
raise ValueError("unexpected backend: " + backend)
def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtractor]:
file_extractors: List[FeatureExtractor] = []
if input_format == FORMAT_PE:
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
elif input_format == FORMAT_DOTNET:
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file))
elif input_format == FORMAT_ELF:
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file))
elif input_format == FORMAT_CAPE:
report = json.loads(input_file.read_text(encoding="utf-8"))
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
return file_extractors
def get_signatures(sigs_path: Path) -> List[Path]:
if not sigs_path.exists():
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
paths: List[Path] = []
if sigs_path.is_file():
paths.append(sigs_path)
elif sigs_path.is_dir():
logger.debug("reading signatures from directory %s", sigs_path.resolve())
for file in sigs_path.rglob("*"):
if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"):
paths.append(file)
# Convert paths to their absolute and normalized forms
paths = [path.resolve().absolute() for path in paths]
# load signatures in deterministic order: the alphabetic sorting of filename.
# this means that `0_sigs.pat` loads before `1_sigs.pat`.
paths = sorted(paths, key=lambda path: path.name)
for path in paths:
logger.debug("found signature file: %s", path)
return paths
def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
if isinstance(extractor, StaticFeatureExtractor):
return rdoc.StaticAnalysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
base_address=frz.Address.from_capa(extractor.get_base_address()),
layout=rdoc.StaticLayout(
functions=(),
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
),
feature_counts=counts["feature_counts"],
library_functions=counts["library_functions"],
)
elif isinstance(extractor, DynamicFeatureExtractor):
return rdoc.DynamicAnalysis(
format=format_,
arch=arch,
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
layout=rdoc.DynamicLayout(
processes=(),
),
feature_counts=counts["feature_counts"],
)
else:
raise ValueError("invalid extractor type")
def collect_metadata(
argv: List[str],
input_path: Path,
input_format: str,
os_: str,
rules_path: List[Path],
extractor: FeatureExtractor,
counts: dict,
) -> rdoc.Metadata:
# if it's a binary sample we hash it, if it's a report
# we fetch the hashes from the report
sample_hashes: SampleHashes = extractor.get_sample_hashes()
md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256
global_feats = list(extractor.extract_global_features())
extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)]
extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)]
extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)]
input_format = (
str(extractor_format[0]) if extractor_format else "unknown" if input_format == FORMAT_AUTO else input_format
)
arch = str(extractor_arch[0]) if extractor_arch else "unknown"
os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_
if isinstance(extractor, StaticFeatureExtractor):
meta_class: type = rdoc.StaticMetadata
elif isinstance(extractor, DynamicFeatureExtractor):
meta_class = rdoc.DynamicMetadata
else:
assert_never(extractor)
rules = tuple(r.resolve().absolute().as_posix() for r in rules_path)
return meta_class(
timestamp=datetime.datetime.now(),
version=capa.version.__version__,
argv=tuple(argv) if argv else None,
sample=rdoc.Sample(
md5=md5,
sha1=sha1,
sha256=sha256,
path=input_path.resolve().as_posix(),
),
analysis=get_sample_analysis(
input_format,
arch,
os_,
extractor,
rules,
counts,
),
)
def compute_dynamic_layout(
rules: RuleSet, extractor: DynamicFeatureExtractor, capabilities: MatchResults
) -> rdoc.DynamicLayout:
"""
compute a metadata structure that links threads
to the processes in which they're found.
only collect the threads at which some rule matched.
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
assert isinstance(extractor, DynamicFeatureExtractor)
matched_calls: Set[Address] = set()
def result_rec(result: capa.features.common.Result):
for loc in result.locations:
if isinstance(loc, capa.features.address.DynamicCallAddress):
matched_calls.add(loc)
for child in result.children:
result_rec(child)
for matches in capabilities.values():
for _, result in matches:
result_rec(result)
names_by_process: Dict[Address, str] = {}
names_by_call: Dict[Address, str] = {}
matched_processes: Set[Address] = set()
matched_threads: Set[Address] = set()
threads_by_process: Dict[Address, List[Address]] = {}
calls_by_thread: Dict[Address, List[Address]] = {}
for p in extractor.get_processes():
threads_by_process[p.address] = []
for t in extractor.get_threads(p):
calls_by_thread[t.address] = []
for c in extractor.get_calls(p, t):
if c.address in matched_calls:
names_by_call[c.address] = extractor.get_call_name(p, t, c)
calls_by_thread[t.address].append(c.address)
if calls_by_thread[t.address]:
matched_threads.add(t.address)
threads_by_process[p.address].append(t.address)
if threads_by_process[p.address]:
matched_processes.add(p.address)
names_by_process[p.address] = extractor.get_process_name(p)
layout = rdoc.DynamicLayout(
processes=tuple(
rdoc.ProcessLayout(
address=frz.Address.from_capa(p),
name=names_by_process[p],
matched_threads=tuple(
rdoc.ThreadLayout(
address=frz.Address.from_capa(t),
matched_calls=tuple(
rdoc.CallLayout(
address=frz.Address.from_capa(c),
name=names_by_call[c],
)
for c in calls_by_thread[t]
if c in matched_calls
),
)
for t in threads
if t in matched_threads
) # this object is open to extension in the future,
# such as with the function name, etc.
)
for p, threads in threads_by_process.items()
if p in matched_processes
)
)
return layout
def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout:
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
only collect the basic blocks at which some rule matched.
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
functions_by_bb: Dict[Address, Address] = {}
bbs_by_function: Dict[Address, List[Address]] = {}
for f in extractor.get_functions():
bbs_by_function[f.address] = []
for bb in extractor.get_basic_blocks(f):
functions_by_bb[bb.address] = f.address
bbs_by_function[f.address].append(bb.address)
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if capa.rules.Scope.BASIC_BLOCK in rule.scopes:
for addr, _ in matches:
assert addr in functions_by_bb
matched_bbs.add(addr)
layout = rdoc.StaticLayout(
functions=tuple(
rdoc.FunctionLayout(
address=frz.Address.from_capa(f),
matched_basic_blocks=tuple(
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
) # this object is open to extension in the future,
# such as with the function name, etc.
)
for f, bbs in bbs_by_function.items()
if len([bb for bb in bbs if bb in matched_bbs]) > 0
)
)
return layout
def compute_layout(rules: RuleSet, extractor, capabilities) -> rdoc.Layout:
if isinstance(extractor, StaticFeatureExtractor):
return compute_static_layout(rules, extractor, capabilities)
elif isinstance(extractor, DynamicFeatureExtractor):
return compute_dynamic_layout(rules, extractor, capabilities)
else:
raise ValueError("extractor must be either a static or dynamic extracotr")

File diff suppressed because it is too large Load Diff

View File

@@ -7,6 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License.
import io
import os
import re
import uuid
import codecs
@@ -25,7 +26,7 @@ except ImportError:
# https://github.com/python/mypy/issues/1153
from backports.functools_lru_cache import lru_cache # type: ignore
from typing import Any, Set, Dict, List, Tuple, Union, Iterator, Optional
from typing import Any, Set, Dict, List, Tuple, Union, Callable, Iterator, Optional
from dataclasses import asdict, dataclass
import yaml
@@ -1691,3 +1692,105 @@ class RuleSet:
matches.update(hard_matches)
return (features3, matches)
def is_nursery_rule_path(path: Path) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
The nursery is currently a subdirectory of the rules directory with that name.
When nursery rules are loaded, their metadata section should be updated with:
`nursery=True`.
"""
return "nursery" in path.parts
def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
"""
collect all rule file paths, including those in subdirectories.
"""
rule_file_paths = []
for rule_path in rule_paths:
if not rule_path.exists():
raise IOError(f"rule path {rule_path} does not exist or cannot be accessed")
if rule_path.is_file():
rule_file_paths.append(rule_path)
elif rule_path.is_dir():
logger.debug("reading rules from directory %s", rule_path)
for root, _, files in os.walk(rule_path):
if ".git" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_file_paths.append(Path(root) / file)
return rule_file_paths
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
RulePath = Path
def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
return
def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
# rule_paths may contain directory paths,
# so search for file paths recursively.
rule_file_paths = collect_rule_file_paths(rule_paths)
# this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
rules: List[Rule] = []
total_rule_count = len(rule_file_paths)
for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)):
on_load_rule(path, i, total_rule_count)
try:
rule = capa.rules.Rule.from_yaml(content.decode("utf-8"))
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = path.as_posix()
rule.meta["capa/nursery"] = is_nursery_rule_path(path)
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)
ruleset = capa.rules.RuleSet(rules)
capa.rules.cache.cache_ruleset(cache_dir, ruleset)
return ruleset

View File

@@ -36,7 +36,7 @@ example:
usage:
usage: bulk-process.py [-h] [-r RULES] [-d] [-q] [-n PARALLELISM] [--no-mp]
input
input_directory
detect capabilities in programs.
@@ -62,7 +62,6 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import json
import logging
@@ -74,10 +73,10 @@ from pathlib import Path
import capa
import capa.main
import capa.rules
import capa.loader
import capa.render.json
import capa.capabilities.common
import capa.render.result_document as rd
from capa.features.common import OS_AUTO
logger = logging.getLogger("capa")
@@ -87,11 +86,8 @@ def get_capa_results(args):
run capa against the file at the given path, using the given rules.
args is a tuple, containing:
rules (capa.rules.RuleSet): the rules to match
signatures (List[str]): list of file system paths to signature files
format (str): the name of the sample file format
os (str): the name of the operating system
path (str): the file system path to the sample to process
rules, signatures, format, backend, os, input_file
as provided via the CLI arguments.
args is a tuple because i'm not quite sure how to unpack multiple arguments using `map`.
@@ -106,44 +102,58 @@ def get_capa_results(args):
meta (dict): the meta analysis results
capabilities (dict): the matched capabilities and their result objects
"""
rules, sigpaths, format, os_, path = args
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
logger.info("computing capa results for: %s", path)
rules, signatures, format_, backend, os_, input_file = args
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend", "input_file"})
argv = [
"--signatures",
signatures,
"--format",
format_,
"--backend",
backend,
"--os",
os_,
input_file,
]
if rules:
argv += ["--rules", rules]
args = parser.parse_args(args=argv)
try:
extractor = capa.main.get_extractor(
path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True
)
except capa.exceptions.UnsupportedFormatError:
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
rules = capa.main.get_rules_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
sample_path = capa.main.get_sample_path_from_cli(args, backend)
if sample_path is None:
os_ = "unknown"
else:
os_ = capa.loader.get_os(sample_path)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
# i'm not 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
# so instead, return an object with explicit success/failure status.
#
# if success, then status=ok, and results found in property "ok"
# if error, then status=error, and human readable message in property "error"
return {
"path": path,
"status": "error",
"error": f"input file does not appear to be a PE file: {path}",
}
except capa.exceptions.UnsupportedRuntimeError:
return {
"path": path,
"status": "error",
"error": "unsupported runtime or Python interpreter",
}
return {"path": input_file, "status": "error", "error": str(e), "status_code": e.status_code}
except Exception as e:
return {
"path": path,
"path": input_file,
"status": "error",
"error": f"unexpected error: {e}",
}
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
return {"path": path, "status": "ok", "ok": doc.model_dump()}
return {"path": input_file, "status": "ok", "ok": doc.model_dump()}
def main(argv=None):
@@ -151,30 +161,16 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"})
parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze")
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend"})
parser.add_argument("input_directory", type=str, help="Path to directory of files to recursively analyze")
parser.add_argument(
"-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor"
)
parser.add_argument("--no-mp", action="store_true", help="disable subprocesses")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
try:
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1
samples = []
for file in Path(args.input).rglob("*"):
for file in Path(args.input_directory).rglob("*"):
samples.append(file)
cpu_count = multiprocessing.cpu_count()
@@ -203,18 +199,22 @@ def main(argv=None):
logger.debug("using process mapper")
mapper = pmap
rules = args.rules
if rules == [capa.main.RULES_PATH_DEFAULT_STRING]:
rules = None
results = {}
for result in mapper(
get_capa_results,
[(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples],
[(rules, args.signatures, args.format, args.backend, args.os, str(sample)) for sample in samples],
parallelism=args.parallelism,
):
if result["status"] == "error":
logger.warning(result["error"])
elif result["status"] == "ok":
results[result["path"].as_posix()] = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(
exclude_none=True
)
doc = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(exclude_none=True)
results[result["path"]] = json.loads(doc)
else:
raise ValueError(f"unexpected status: {result['status']}")

View File

@@ -36,20 +36,27 @@ def main(argv=None):
parser = argparse.ArgumentParser(description="Cache ruleset.")
capa.main.install_common_args(parser)
parser.add_argument("rules", type=str, action="append", help="Path to rules")
parser.add_argument("rules", type=str, help="Path to rules directory")
parser.add_argument("cache", type=str, help="Path to cache directory")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.debug:
logging.getLogger("capa").setLevel(logging.DEBUG)
# don't use capa.main.handle_common_args
# because it expects a different format for the --rules argument
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger("capa").setLevel(logging.ERROR)
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
try:
cache_dir = Path(args.cache)
cache_dir.mkdir(parents=True, exist_ok=True)
rules = capa.main.get_rules(args.rules, cache_dir)
rules = capa.rules.get_rules([Path(args.rules)], cache_dir)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))

View File

@@ -723,36 +723,33 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Capa to YARA rule converter")
parser.add_argument("rules", type=str, help="Path to rules")
parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False)
capa.main.install_common_args(parser, wanted={"tag"})
parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False)
parser.add_argument("rules", type=str, help="Path to rules directory")
args = parser.parse_args(args=argv)
make_priv = args.private
if args.verbose:
level = logging.DEBUG
elif args.quiet:
level = logging.ERROR
# don't use capa.main.handle_common_args
# because it expects a different format for the --rules argument
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
level = logging.INFO
logging.basicConfig(level=level)
logging.getLogger("capa2yara").setLevel(level)
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
try:
rules = capa.main.get_rules([Path(args.rules)])
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.debug("selected %d rules", len(rules))
for i, r in enumerate(rules.rules, 1):
logger.debug(" %d. %s", i, r)
rules = capa.rules.get_rules([Path(args.rules)])
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
output_yar(
"// Rules from Mandiant's https://github.com/mandiant/capa-rules converted to YARA using https://github.com/mandiant/capa/blob/master/scripts/capa2yara.py by Arnim Rupp"
)
@@ -780,10 +777,10 @@ def main(argv=None):
cround += 1
logger.info("doing convert_rules(), round: %d", cround)
num_rules = len(converted_rules)
count_incomplete += convert_rules(rules, namespaces, cround, make_priv)
count_incomplete += convert_rules(rules, namespaces, cround, args.private)
# one last round to collect all unconverted rules
count_incomplete += convert_rules(rules, namespaces, 9000, make_priv)
count_incomplete += convert_rules(rules, namespaces, 9000, args.private)
stats = "\n// converted rules : " + str(len(converted_rules))
stats += "\n// among those are incomplete : " + str(count_incomplete)

View File

@@ -15,6 +15,7 @@ from pathlib import Path
import capa.main
import capa.rules
import capa.engine
import capa.loader
import capa.features
import capa.render.json
import capa.render.utils as rutils
@@ -168,19 +169,19 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers
def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"):
def capa_details(rules_path: Path, input_file: Path, output_format="dictionary"):
# load rules from disk
rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])
# extract features and find capabilities
extractor = capa.main.get_extractor(
file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True
extractor = capa.loader.get_extractor(
input_file, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], should_save_workspace=False, disable_progress=True
)
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
capa_output: Any = False
@@ -206,7 +207,7 @@ if __name__ == "__main__":
RULES_PATH = capa.main.get_default_root() / "rules"
parser = argparse.ArgumentParser(description="Extract capabilities from a file")
parser.add_argument("file", help="file to extract capabilities from")
parser.add_argument("input_file", help="file to extract capabilities from")
parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH)
parser.add_argument(
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
@@ -214,5 +215,5 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.rules != RULES_PATH:
args.rules = Path(args.rules)
print(capa_details(args.rules, Path(args.file), args.output))
print(capa_details(args.rules, Path(args.input_file), args.output))
sys.exit(0)

View File

@@ -19,6 +19,7 @@ import logging
import argparse
from pathlib import Path
import capa.main
import capa.rules
logger = logging.getLogger("capafmt")
@@ -29,6 +30,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Capa rule formatter.")
capa.main.install_common_args(parser)
parser.add_argument("path", type=str, help="Path to rule to format")
parser.add_argument(
"-i",
@@ -37,8 +39,6 @@ def main(argv=None):
dest="in_place",
help="Format the rule in place, otherwise, write formatted rule to STDOUT",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
parser.add_argument(
"-c",
"--check",
@@ -47,15 +47,10 @@ def main(argv=None):
)
args = parser.parse_args(args=argv)
if args.verbose:
level = logging.DEBUG
elif args.quiet:
level = logging.ERROR
else:
level = logging.INFO
logging.basicConfig(level=level)
logging.getLogger("capafmt").setLevel(level)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
rule = capa.rules.Rule.from_yaml_file(args.path, use_ruamel=True)
reformatted_rule = rule.to_yaml()

View File

@@ -17,8 +17,8 @@ import logging
import argparse
import contextlib
from typing import BinaryIO
from pathlib import Path
import capa.main
import capa.helpers
import capa.features.extractors.elf
@@ -36,28 +36,16 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Detect the underlying OS for the given ELF file")
parser.add_argument("sample", type=str, help="path to ELF file")
logging_group = parser.add_argument_group("logging arguments")
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
logging_group.add_argument(
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
)
capa.main.install_common_args(parser, wanted={"input_file"})
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
try:
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
except capa.main.ShouldExitError as e:
return e.status_code
f = Path(args.sample).open("rb")
f = args.input_file.open("rb")
with contextlib.closing(f):
try:

View File

@@ -48,7 +48,7 @@ def find_overlapping_rules(new_rule_path, rules_path):
overlapping_rules = []
# capa.rules.RuleSet stores all rules in given paths
ruleset = capa.main.get_rules(rules_path)
ruleset = capa.rules.get_rules(rules_path)
for rule_name, rule in ruleset.rules.items():
rule_features = rule.extract_all_features()

View File

@@ -39,6 +39,7 @@ import tqdm.contrib.logging
import capa.main
import capa.rules
import capa.engine
import capa.loader
import capa.helpers
import capa.features.insn
import capa.capabilities.common
@@ -363,8 +364,14 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
format_ = capa.helpers.get_auto_format(nice_path)
logger.debug("analyzing sample: %s", nice_path)
extractor = capa.main.get_extractor(
nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
extractor = capa.loader.get_extractor(
nice_path,
format_,
OS_AUTO,
capa.main.BACKEND_VIV,
DEFAULT_SIGNATURES,
should_save_workspace=False,
disable_progress=True,
)
capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True)
@@ -990,7 +997,11 @@ def main(argv=None):
help="Enable thorough linting - takes more time, but does a better job",
)
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
if args.debug:
logging.getLogger("capa").setLevel(logging.DEBUG)
@@ -1002,16 +1013,9 @@ def main(argv=None):
time0 = time.time()
try:
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", rules.source_rule_count)
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.debug("selected %s rules", len(rules))
for i, r in enumerate(rules.rules, 1):
logger.debug(" %d. %s", i, r)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
rules = capa.main.get_rules_from_cli(args)
except capa.main.ShouldExitError as e:
return e.status_code
logger.info("collecting potentially referenced samples")
samples_path = Path(args.samples)

View File

@@ -62,6 +62,7 @@ import capa.engine
import capa.helpers
import capa.features
import capa.features.freeze
from capa.loader import BACKEND_VIV
logger = logging.getLogger("capa.match-function-id")
@@ -71,61 +72,53 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="FLIRT match each function")
parser.add_argument("sample", type=str, help="Path to sample to analyze")
capa.main.install_common_args(parser, wanted={"input_file", "signatures", "format"})
parser.add_argument(
"-F",
"--function",
type=lambda x: int(x, 0x10),
help="match a specific function by VA, rather than add functions",
)
parser.add_argument(
"--signature",
action="append",
dest="signatures",
type=str,
default=[],
help="use the given signatures to identify library functions, file system paths to .sig/.pat files.",
)
parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# disable vivisect-related logging, it's verbose and not relevant for capa users
capa.main.set_vivisect_log_level(logging.CRITICAL)
try:
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
sig_paths = capa.main.get_signatures_from_cli(args, input_format, BACKEND_VIV)
except capa.main.ShouldExitError as e:
return e.status_code
analyzers = []
for sigpath in args.signatures:
sigs = viv_utils.flirt.load_flirt_signature(sigpath)
for sigpath in sig_paths:
sigs = viv_utils.flirt.load_flirt_signature(str(sigpath))
with capa.main.timing("flirt: compiling sigs"):
matcher = flirt.compile(sigs)
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath)
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, str(sigpath))
logger.debug("registering viv function analyzer: %s", repr(analyzer))
analyzers.append(analyzer)
vw = viv_utils.getWorkspace(args.sample, analyze=True, should_save=False)
vw = viv_utils.getWorkspace(str(args.input_file), analyze=True, should_save=False)
functions = vw.getFunctions()
if args.function:
functions = [args.function]
seen = set()
for function in functions:
logger.debug("matching function: 0x%04x", function)
for analyzer in analyzers:
name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
name = viv_utils.get_function_name(vw, function)
if name:
print(f"0x{function:04x}: {name}")
key = (function, name)
if key in seen:
continue
else:
print(f"0x{function:04x}: {name}")
seen.add(key)
return 0

View File

@@ -41,7 +41,6 @@ import timeit
import logging
import argparse
import subprocess
from pathlib import Path
import tqdm
import tabulate
@@ -50,6 +49,7 @@ import capa.main
import capa.perf
import capa.rules
import capa.engine
import capa.loader
import capa.helpers
import capa.features
import capa.features.common
@@ -74,42 +74,22 @@ def main(argv=None):
label += " (dirty)"
parser = argparse.ArgumentParser(description="Profile capa performance")
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "rules"})
capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "rules"})
parser.add_argument("--number", type=int, default=3, help="batch size of profile collection")
parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection")
parser.add_argument("--label", type=str, default=label, help="description of the profile collection")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
try:
taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
try:
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
with capa.main.timing("load rules"):
rules = capa.main.get_rules(args.rules)
except IOError as e:
logger.error("%s", str(e))
return -1
try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
)
rules = capa.main.get_rules_from_cli(args)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
return e.status_code
with tqdm.tqdm(total=args.number * args.repeat, leave=False) as pbar:

View File

@@ -33,6 +33,7 @@ import logging
import argparse
from pathlib import Path
import capa.main
import capa.render.proto
import capa.render.result_document
@@ -44,26 +45,14 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Convert a capa JSON result document into the protobuf format")
capa.main.install_common_args(parser)
parser.add_argument("json", type=str, help="path to JSON result document file, produced by `capa --json`")
logging_group = parser.add_argument_group("logging arguments")
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
logging_group.add_argument(
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
)
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
rd = capa.render.result_document.ResultDocument.from_file(Path(args.json))
pb = capa.render.proto.doc_to_pb2(rd)

View File

@@ -36,6 +36,7 @@ import logging
import argparse
from pathlib import Path
import capa.main
import capa.render.json
import capa.render.proto
import capa.render.proto.capa_pb2
@@ -49,28 +50,16 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Convert a capa protobuf result document into the JSON format")
capa.main.install_common_args(parser)
parser.add_argument(
"pb", type=str, help="path to protobuf result document file, produced by `proto-from-results.py`"
)
logging_group = parser.add_argument_group("logging arguments")
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
logging_group.add_argument(
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
)
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
pb = Path(args.pb).read_bytes()

View File

@@ -55,13 +55,11 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import logging
import argparse
import collections
from typing import Dict
from pathlib import Path
import colorama
@@ -76,10 +74,7 @@ import capa.render.verbose
import capa.features.freeze
import capa.capabilities.common
import capa.render.result_document as rd
from capa.helpers import get_file_taste
from capa.features.common import FORMAT_AUTO
from capa.features.freeze import Address
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
logger = logging.getLogger("capa.show-capabilities-by-function")
@@ -142,67 +137,37 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"format", "os", "backend", "sample", "signatures", "rules", "tag"})
capa.main.install_common_args(
parser, wanted={"format", "os", "backend", "input_file", "signatures", "rules", "tag"}
)
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
try:
taste = get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
try:
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.info("selected %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)):
format_ = "freeze"
extractor: FeatureExtractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
format_ = args.format
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
assert isinstance(extractor, StaticFeatureExtractor)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
capa.helpers.log_unsupported_runtime_error()
return -1
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
rules = capa.main.get_rules_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
sample_path = capa.main.get_sample_path_from_cli(args, backend)
if sample_path is None:
os_ = "unknown"
else:
os_ = capa.loader.get_os(sample_path)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
return e.status_code
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor)
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
if capa.capabilities.common.has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return -1
return capa.main.E_FILE_LIMITATION
# colorama will detect:
# - when on Windows console, and fixup coloring, and
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
print(render_matches_by_function(doc))
colorama.deinit()

View File

@@ -64,16 +64,15 @@ Example::
insn: 0x10001027: mnemonic(shl)
...
"""
import os
import sys
import logging
import argparse
from typing import Tuple
from pathlib import Path
import capa.main
import capa.rules
import capa.engine
import capa.loader
import capa.helpers
import capa.features
import capa.exceptions
@@ -81,17 +80,9 @@ import capa.render.verbose as v
import capa.features.freeze
import capa.features.address
import capa.features.extractors.pefile
from capa.helpers import get_auto_format, log_unsupported_runtime_error
from capa.helpers import assert_never
from capa.features.insn import API, Number
from capa.features.common import (
FORMAT_AUTO,
FORMAT_CAPE,
FORMAT_FREEZE,
DYNAMIC_FORMATS,
String,
Feature,
is_global_feature,
)
from capa.features.common import String, Feature, is_global_feature
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger("capa.show-features")
@@ -106,56 +97,33 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample")
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"})
capa.main.install_common_args(parser, wanted={"input_file", "format", "os", "signatures", "backend"})
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
parser.add_argument("-P", "--process", type=str, help="Show features for specific process name")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1
try:
_ = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1
format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample)
if format_ == FORMAT_FREEZE:
# this should be moved above the previous if clause after implementing
# feature freeze for the dynamic analysis flavor
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError as e:
if format_ == FORMAT_CAPE:
capa.helpers.log_unsupported_cape_report_error(str(e))
else:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
log_unsupported_runtime_error()
if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1
if format_ in DYNAMIC_FORMATS:
assert isinstance(extractor, DynamicFeatureExtractor)
input_format = capa.main.get_input_format_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
return e.status_code
if isinstance(extractor, DynamicFeatureExtractor):
print_dynamic_analysis(extractor, args)
else:
assert isinstance(extractor, StaticFeatureExtractor)
elif isinstance(extractor, StaticFeatureExtractor):
print_static_analysis(extractor, args)
else:
assert_never(extractor)
return 0

View File

@@ -8,13 +8,11 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import typing
import logging
import argparse
from typing import Set, Tuple
from pathlib import Path
from collections import Counter
import tabulate
@@ -31,8 +29,7 @@ import capa.features.freeze
import capa.features.address
import capa.features.extractors.pefile
import capa.features.extractors.base_extractor
from capa.helpers import log_unsupported_runtime_error
from capa.features.common import Feature
from capa.features.common import FORMAT_FREEZE, Feature
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor
logger = logging.getLogger("show-unused-features")
@@ -42,10 +39,9 @@ def format_address(addr: capa.features.address.Address) -> str:
return v.format_address(capa.features.freeze.Address.from_capa((addr)))
def get_rules_feature_set(rules_path) -> Set[Feature]:
ruleset = capa.main.get_rules(rules_path)
def get_rules_feature_set(rules: capa.rules.RuleSet) -> Set[Feature]:
rules_feature_set: Set[Feature] = set()
for _, rule in ruleset.rules.items():
for _, rule in rules.rules.items():
rules_feature_set.update(rule.extract_all_features())
return rules_feature_set
@@ -106,44 +102,23 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet")
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})
capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "backend", "rules"})
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1
try:
taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
log_unsupported_runtime_error()
return -1
capa.main.handle_common_args(args)
capa.main.ensure_input_exists_from_cli(args)
rules = capa.main.get_rules_from_cli(args)
input_format = capa.main.get_input_format_from_cli(args)
backend = capa.main.get_backend_from_cli(args, input_format)
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
except capa.main.ShouldExitError as e:
return e.status_code
assert isinstance(extractor, StaticFeatureExtractor), "only static analysis supported today"
@@ -159,7 +134,7 @@ def main(argv=None):
function_handles = tuple(extractor.get_functions())
if args.function:
if args.format == "freeze":
if input_format == FORMAT_FREEZE:
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
else:
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))
@@ -174,7 +149,7 @@ def main(argv=None):
feature_map.update(get_file_features(function_handles, extractor))
rules_feature_set = get_rules_feature_set(args.rules)
rules_feature_set = get_rules_feature_set(rules)
print_unused_features(feature_map, rules_feature_set)
return 0
@@ -206,7 +181,8 @@ def ida_main():
feature_map.update(get_file_features(function_handles, extractor))
rules_path = capa.main.get_default_root() / "rules"
rules_feature_set = get_rules_feature_set([rules_path])
rules = capa.rules.get_rules([rules_path])
rules_feature_set = get_rules_feature_set(rules)
print_unused_features(feature_map, rules_feature_set)

View File

@@ -106,11 +106,11 @@ def get_viv_extractor(path: Path):
]
if "raw32" in path.name:
vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths)
vw = capa.loader.get_workspace(path, "sc32", sigpaths=sigpaths)
elif "raw64" in path.name:
vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths)
vw = capa.loader.get_workspace(path, "sc64", sigpaths=sigpaths)
else:
vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
vw = capa.loader.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
vw.saveWorkspace()
extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO)
fixup_viv(path, extractor)