mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
some more changes
This commit is contained in:
@@ -157,7 +157,7 @@ def collect_metadata(rules: List[Path]):
|
||||
arch=arch,
|
||||
os=os,
|
||||
extractor="ida",
|
||||
rules=tuple(rules),
|
||||
rules=tuple(str(r.resolve().absolute()) for r in rules),
|
||||
base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
|
||||
layout=rdoc.Layout(
|
||||
functions=tuple()
|
||||
|
||||
@@ -577,10 +577,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def ensure_capa_settings_rule_path(self):
|
||||
try:
|
||||
path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
|
||||
path: Path = Path(settings.user.get(CAPA_SETTINGS_RULE_PATH, ""))
|
||||
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not os.path.exists(path):
|
||||
if not path.exists():
|
||||
# configure rules selection messagebox
|
||||
rules_message = QtWidgets.QMessageBox()
|
||||
rules_message.setIcon(QtWidgets.QMessageBox.Information)
|
||||
@@ -598,15 +598,15 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if pressed == QtWidgets.QMessageBox.Cancel:
|
||||
raise UserCancelledError()
|
||||
|
||||
path = self.ask_user_directory()
|
||||
path = Path(self.ask_user_directory())
|
||||
if not path:
|
||||
raise UserCancelledError()
|
||||
|
||||
if not os.path.exists(path):
|
||||
if not path.exists():
|
||||
logger.error("rule path %s does not exist or cannot be accessed" % path)
|
||||
return False
|
||||
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = path
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = str(path)
|
||||
except UserCancelledError as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules")
|
||||
logger.warning(
|
||||
|
||||
17
capa/main.py
17
capa/main.py
@@ -13,7 +13,6 @@ import sys
|
||||
import time
|
||||
import hashlib
|
||||
import logging
|
||||
import os.path
|
||||
import argparse
|
||||
import datetime
|
||||
import textwrap
|
||||
@@ -544,7 +543,7 @@ def get_extractor(
|
||||
# We need to fist find the binja API installation path and add it into sys.path
|
||||
if is_running_standalone():
|
||||
bn_api = find_binja_path()
|
||||
if os.path.exists(bn_api):
|
||||
if Path(bn_api).exists():
|
||||
sys.path.append(bn_api)
|
||||
|
||||
try:
|
||||
@@ -628,11 +627,17 @@ def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
|
||||
logger.debug("reading rules from directory %s", rule_path)
|
||||
for root, _, files in os.walk(rule_path):
|
||||
if ".git" in root:
|
||||
# Skip certain directories within the walk
|
||||
# the .github directory contains CI config in capa-rules
|
||||
# this includes some .yml files
|
||||
# these are not rules
|
||||
# additionally, .git has files that are not .yml and generate the warning
|
||||
# skip those too
|
||||
continue
|
||||
for file in files:
|
||||
if not file.endswith(".yml"):
|
||||
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
|
||||
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
|
||||
# other things maybe are rules, but are mis-named.
|
||||
logger.warning("skipping non-.yml file: %s", file)
|
||||
continue
|
||||
rule_file_paths.append(Path(root) / file)
|
||||
@@ -740,7 +745,7 @@ def collect_metadata(
|
||||
sha1.update(buf)
|
||||
sha256.update(buf)
|
||||
|
||||
rules_path = [r.resolve().absolute() for r in rules_path]
|
||||
rules = tuple(str(r.resolve().absolute()) for r in rules_path)
|
||||
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
|
||||
arch = get_arch(sample_path)
|
||||
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
|
||||
@@ -753,14 +758,14 @@ def collect_metadata(
|
||||
md5=md5.hexdigest(),
|
||||
sha1=sha1.hexdigest(),
|
||||
sha256=sha256.hexdigest(),
|
||||
path=os.path.normpath(sample_path),
|
||||
path=str(Path(sample_path).resolve()),
|
||||
),
|
||||
analysis=rdoc.Analysis(
|
||||
format=format_,
|
||||
arch=arch,
|
||||
os=os_,
|
||||
extractor=extractor.__class__.__name__,
|
||||
rules=tuple(rules_path),
|
||||
rules=rules,
|
||||
base_address=frz.Address.from_capa(extractor.get_base_address()),
|
||||
layout=rdoc.Layout(
|
||||
functions=tuple(),
|
||||
|
||||
@@ -138,7 +138,7 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
|
||||
os=meta.analysis.os,
|
||||
extractor=meta.analysis.extractor,
|
||||
# TODO convert analysis.rule type to Path in capa_pb2.Metadata
|
||||
rules=list(str(rule) for rule in meta.analysis.rules),
|
||||
rules=list(meta.analysis.rules),
|
||||
base_address=addr_to_pb2(meta.analysis.base_address),
|
||||
layout=capa_pb2.Layout(
|
||||
functions=[
|
||||
@@ -501,7 +501,7 @@ def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
|
||||
arch=meta.analysis.arch,
|
||||
os=meta.analysis.os,
|
||||
extractor=meta.analysis.extractor,
|
||||
rules=tuple(Path(r) for r in meta.analysis.rules),
|
||||
rules=tuple(meta.analysis.rules),
|
||||
base_address=addr_from_pb2(meta.analysis.base_address),
|
||||
layout=rd.Layout(
|
||||
functions=tuple(
|
||||
|
||||
@@ -74,7 +74,7 @@ class Analysis(Model):
|
||||
arch: str
|
||||
os: str
|
||||
extractor: str
|
||||
rules: Tuple[Path, ...]
|
||||
rules: Tuple[str, ...]
|
||||
base_address: frz.Address
|
||||
layout: Layout
|
||||
feature_counts: FeatureCounts
|
||||
|
||||
@@ -91,7 +91,7 @@ def render_meta(ostream, doc: rd.ResultDocument):
|
||||
("arch", doc.meta.analysis.arch),
|
||||
("extractor", doc.meta.analysis.extractor),
|
||||
("base address", format_address(doc.meta.analysis.base_address)),
|
||||
("rules", "\n".join(tuple(str(rule) for rule in doc.meta.analysis.rules))),
|
||||
("rules", "\n".join(doc.meta.analysis.rules)),
|
||||
("function count", len(doc.meta.analysis.feature_counts.functions)),
|
||||
("library function count", len(doc.meta.analysis.library_functions)),
|
||||
(
|
||||
|
||||
@@ -59,10 +59,10 @@ import os
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import argparse
|
||||
import multiprocessing
|
||||
import multiprocessing.pool
|
||||
from pathlib import Path
|
||||
|
||||
import capa
|
||||
import capa.main
|
||||
@@ -171,7 +171,7 @@ def main(argv=None):
|
||||
samples = []
|
||||
for base, directories, files in os.walk(args.input):
|
||||
for file in files:
|
||||
samples.append(os.path.join(base, file))
|
||||
samples.append(str(Path(base) / file))
|
||||
|
||||
def pmap(f, args, parallelism=multiprocessing.cpu_count()):
|
||||
"""apply the given function f to the given args using subprocesses"""
|
||||
|
||||
@@ -61,7 +61,7 @@ def main(argv=None):
|
||||
id = capa.rules.cache.compute_cache_identifier(content)
|
||||
path = capa.rules.cache.get_cache_path(args.cache, id)
|
||||
|
||||
assert os.path.exists(path)
|
||||
assert path.exists()
|
||||
logger.info("cached to: %s", path)
|
||||
|
||||
|
||||
|
||||
@@ -197,7 +197,6 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import os.path
|
||||
import argparse
|
||||
|
||||
RULES_PATH = capa.main.get_default_root() / "rules"
|
||||
@@ -209,6 +208,7 @@ if __name__ == "__main__":
|
||||
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.rules != RULES_PATH:
|
||||
args.rules = Path(args.rules)
|
||||
print(capa_details(args.rules, args.file, args.output))
|
||||
sys.exit(0)
|
||||
|
||||
@@ -24,6 +24,7 @@ Derived from: https://github.com/mandiant/capa/blob/master/scripts/import-to-ida
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import binaryninja
|
||||
import binaryninja.interaction
|
||||
@@ -45,22 +46,23 @@ def append_func_cmt(bv, va, cmt):
|
||||
|
||||
|
||||
def load_analysis(bv):
|
||||
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
|
||||
dirname = os.path.dirname(bv.file.filename)
|
||||
shortname = Path(bv.file.filename).resolve().stem
|
||||
dirname = Path(bv.file.filename).resolve().parent
|
||||
binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
|
||||
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
|
||||
path = os.path.join(dirname, shortname + ".js")
|
||||
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
|
||||
path = os.path.join(dirname, shortname + ".json")
|
||||
js_path = path = dirname / (shortname + ".js")
|
||||
json_path = dirname / (shortname + ".json")
|
||||
if os.access(str(js_path), os.R_OK):
|
||||
path = js_path
|
||||
elif os.access(str(json_path), os.R_OK):
|
||||
path = json_path
|
||||
else:
|
||||
path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
|
||||
if not path or not os.access(path, os.R_OK):
|
||||
if not path or not os.access(str(path), os.R_OK):
|
||||
binaryninja.log_error("Invalid filename.")
|
||||
return 0
|
||||
binaryninja.log_info(f"Using capa file {path}")
|
||||
|
||||
with open(path, "rb") as f:
|
||||
doc = json.loads(f.read().decode("utf-8"))
|
||||
doc = json.loads(path.read_bytes().decode("utf-8"))
|
||||
|
||||
if "meta" not in doc or "rules" not in doc:
|
||||
binaryninja.log_error("doesn't appear to be a capa report")
|
||||
|
||||
@@ -114,7 +114,7 @@ class FilenameDoesntMatchRuleName(Lint):
|
||||
expected = expected.replace(".", "")
|
||||
expected = expected + ".yml"
|
||||
|
||||
found = os.path.basename(rule.meta["capa/path"])
|
||||
found = Path(rule.meta["capa/path"]).name
|
||||
|
||||
self.recommendation = self.recommendation_template.format(expected, found)
|
||||
|
||||
@@ -249,7 +249,8 @@ class InvalidAttckOrMbcTechnique(Lint):
|
||||
super().__init__()
|
||||
|
||||
try:
|
||||
with open(f"{os.path.dirname(__file__)}/linter-data.json", "rb") as fd:
|
||||
data_path = Path(__file__).resolve().parent / "linter-data.json"
|
||||
with data_path.open("rb") as fd:
|
||||
self.data = json.load(fd)
|
||||
self.enabled_frameworks = self.data.keys()
|
||||
except BaseException:
|
||||
@@ -295,7 +296,7 @@ DEFAULT_SIGNATURES = capa.main.get_default_signatures()
|
||||
|
||||
|
||||
def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
|
||||
nice_path = os.path.abspath(str(path))
|
||||
nice_path = path.resolve().absolute().as_posix()
|
||||
if path in ctx.capabilities_by_sample:
|
||||
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
|
||||
return ctx.capabilities_by_sample[path]
|
||||
@@ -883,43 +884,31 @@ def lint(ctx: Context):
|
||||
return ret
|
||||
|
||||
|
||||
def collect_samples(path) -> Dict[str, Path]:
|
||||
def collect_samples(path: Path) -> Dict[str, Path]:
|
||||
"""
|
||||
recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename.
|
||||
Recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename.
|
||||
"""
|
||||
samples = {}
|
||||
for root, dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
if name.endswith(".viv"):
|
||||
continue
|
||||
if name.endswith(".idb"):
|
||||
continue
|
||||
if name.endswith(".i64"):
|
||||
continue
|
||||
if name.endswith(".frz"):
|
||||
continue
|
||||
if name.endswith(".fnames"):
|
||||
continue
|
||||
for path in path.rglob("*"):
|
||||
if path.suffix in [".viv", ".idb", ".i64", ".frz", ".fnames"]:
|
||||
continue
|
||||
|
||||
path = pathlib.Path(os.path.join(root, name))
|
||||
try:
|
||||
buf = path.read_bytes()
|
||||
except IOError:
|
||||
continue
|
||||
|
||||
try:
|
||||
with path.open("rb") as f:
|
||||
buf = f.read()
|
||||
except IOError:
|
||||
continue
|
||||
sha256 = hashlib.sha256()
|
||||
sha256.update(buf)
|
||||
|
||||
sha256 = hashlib.sha256()
|
||||
sha256.update(buf)
|
||||
md5 = hashlib.md5()
|
||||
md5.update(buf)
|
||||
|
||||
md5 = hashlib.md5()
|
||||
md5.update(buf)
|
||||
|
||||
samples[sha256.hexdigest().lower()] = path
|
||||
samples[sha256.hexdigest().upper()] = path
|
||||
samples[md5.hexdigest().lower()] = path
|
||||
samples[md5.hexdigest().upper()] = path
|
||||
samples[name] = path
|
||||
samples[sha256.hexdigest().lower()] = path
|
||||
samples[sha256.hexdigest().upper()] = path
|
||||
samples[md5.hexdigest().lower()] = path
|
||||
samples[md5.hexdigest().upper()] = path
|
||||
samples[path.name] = path
|
||||
|
||||
return samples
|
||||
|
||||
@@ -928,7 +917,7 @@ def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
samples_path = os.path.join(os.path.dirname(__file__), "..", "tests", "data")
|
||||
samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Lint capa rules.")
|
||||
capa.main.install_common_args(parser, wanted={"tag"})
|
||||
@@ -964,11 +953,12 @@ def main(argv=None):
|
||||
return -1
|
||||
|
||||
logger.info("collecting potentially referenced samples")
|
||||
if not os.path.exists(args.samples):
|
||||
logger.error("samples path %s does not exist", args.samples)
|
||||
samplePath = Path(args.samples)
|
||||
if not samplePath.exists():
|
||||
logger.error("samples path %s does not exist", samplePath)
|
||||
return -1
|
||||
|
||||
samples = collect_samples(args.samples)
|
||||
samples = collect_samples(samplePath)
|
||||
|
||||
ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough)
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ import logging
|
||||
import argparse
|
||||
from sys import argv
|
||||
from typing import Dict, List
|
||||
from os.path import dirname
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from stix2 import Filter, MemoryStore, AttackPattern # type: ignore
|
||||
@@ -187,7 +187,7 @@ if __name__ == "__main__":
|
||||
"--output",
|
||||
"-o",
|
||||
type=str,
|
||||
default=f"{dirname(__file__)}/linter-data.json",
|
||||
default=str(Path(__file__).resolve().parent / "linter-data.json"),
|
||||
help="Path to output file (lint.py will be looking for linter-data.json)",
|
||||
)
|
||||
main(parser.parse_args(args=argv[1:]))
|
||||
|
||||
@@ -144,7 +144,7 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
|
||||
assert meta.analysis.arch == dst.analysis.arch
|
||||
assert meta.analysis.os == dst.analysis.os
|
||||
assert meta.analysis.extractor == dst.analysis.extractor
|
||||
assert list(str(r) for r in meta.analysis.rules) == dst.analysis.rules
|
||||
assert list(meta.analysis.rules) == dst.analysis.rules
|
||||
assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
|
||||
|
||||
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
|
||||
|
||||
Reference in New Issue
Block a user