main: when --signatures provided, override default set

closes #630
This commit is contained in:
William Ballenthin
2021-06-15 11:43:38 -06:00
parent 9484fadd0f
commit 6f1f928434
5 changed files with 127 additions and 16 deletions

View File

@@ -52,6 +52,7 @@ See the License for the specific language governing permissions and limitations
import json
import zlib
import logging
import os.path
import capa.features.file
import capa.features.insn
@@ -254,7 +255,25 @@ def main(argv=None):
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths=args.signatures)
if args.signatures == capa.main.SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
sigs_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "sigs"))
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
try:
sig_paths = capa.main.get_signatures(sigs_path)
except (IOError) as e:
logger.error("%s", str(e))
return -1
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths=sig_paths)
with open(args.output, "wb") as f:
f.write(dump(extractor))

View File

@@ -40,6 +40,7 @@ import capa.features.extractors.pefile
from capa.helpers import get_file_taste
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
SUPPORTED_FILE_MAGIC = set([b"MZ"])
BACKEND_VIV = "vivisect"
BACKEND_SMDA = "smda"
@@ -503,6 +504,25 @@ def get_rules(rule_path, disable_progress=False):
return rules
def get_signatures(sigs_path):
if not os.path.exists(sigs_path):
raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path)
paths = []
if os.path.isfile(sigs_path):
paths.append(sigs_path)
elif os.path.isdir(sigs_path):
logger.debug("reading signatures from directory %s", sigs_path)
for root, dirs, files in os.walk(sigs_path):
for file in files:
if file.endswith((".pat", ".pat.gz", ".sig")):
sig_path = os.path.join(root, file)
logger.debug("found signature: %s", sig_path)
paths.append(sig_path)
return paths
def collect_metadata(argv, sample_path, rules_path, format, extractor):
md5 = hashlib.md5()
sha1 = hashlib.sha1()
@@ -634,12 +654,9 @@ def install_common_args(parser, wanted=None):
if "signatures" in wanted:
parser.add_argument(
"--signature",
action="append",
dest="signatures",
type=str,
# with action=append, users can specify futher signatures but not override whats found in $capa/sigs/.
# seems reasonable for now. this is an easy way to register the default signature set.
default=get_default_signatures(),
default=SIGNATURES_PATH_DEFAULT_STRING,
help="use the given signatures to identify library functions, file system paths to .sig/.pat files.",
)
@@ -755,7 +772,7 @@ def main(argv=None):
logger.debug("default rule path (PyInstaller method): %s", rules_path)
else:
logger.debug("detected running from source")
rules_path = os.path.join(os.path.dirname(__file__), "..", "rules")
rules_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "rules"))
logger.debug("default rule path (source method): %s", rules_path)
if not os.path.exists(rules_path):
@@ -807,6 +824,32 @@ def main(argv=None):
logger.debug("file limitation short circuit, won't analyze fully.")
return -1
if args.signatures == SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
logger.debug("detected running under PyInstaller")
sigs_path = os.path.join(sys._MEIPASS, "sigs")
logger.debug("default signatures path (PyInstaller method): %s", sigs_path)
else:
logger.debug("detected running from source")
sigs_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "sigs"))
logger.debug("default signatures path (source method): %s", sigs_path)
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
try:
sig_paths = get_signatures(sigs_path)
except (IOError) as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
format = "freeze"
with open(args.sample, "rb") as f:
@@ -814,7 +857,7 @@ def main(argv=None):
else:
format = args.format
try:
extractor = get_extractor(args.sample, format, args.backend, args.signatures, disable_progress=args.quiet)
extractor = get_extractor(args.sample, format, args.backend, sig_paths, disable_progress=args.quiet)
except UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")

View File

@@ -78,6 +78,7 @@ def get_capa_results(args):
args is a tuple, containing:
rules (capa.rules.RuleSet): the rules to match
signatures (List[str]): list of file system paths to signature files
format (str): the name of the sample file format
path (str): the file system path to the sample to process
@@ -94,10 +95,10 @@ def get_capa_results(args):
meta (dict): the meta analysis results
capabilities (dict): the matched capabilities and their result objects
"""
rules, format, path = args
rules, sigpaths, format, path = args
logger.info("computing capa results for: %s", path)
try:
extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, args.signatures, disable_progress=True)
extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, sigpaths, disable_progress=True)
except capa.main.UnsupportedFormatError:
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
# so instead, return an object with explicit success/failure status.
@@ -166,6 +167,19 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if args.signatures == capa.main.SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("using default embedded signatures.")
sigs_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "sigs"))
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
try:
sig_paths = capa.main.get_signatures(sigs_path)
except (IOError) as e:
logger.error("%s", str(e))
return -1
samples = []
for (base, directories, files) in os.walk(args.input):
for file in files:
@@ -197,7 +211,7 @@ def main(argv=None):
results = {}
for result in mapper(
get_capa_results, [(rules, "pe", sample) for sample in samples], parallelism=args.parallelism
get_capa_results, [(rules, sig_paths, "pe", sample) for sample in samples], parallelism=args.parallelism
):
if result["status"] == "error":
logger.warning(result["error"])

View File

@@ -121,7 +121,7 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if args.rules == "(embedded rules)":
if args.rules == capa.main.RULES_PATH_DEFAULT_STRING:
logger.info("-" * 80)
logger.info(" Using default embedded rules.")
logger.info(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.")
@@ -130,7 +130,7 @@ def main(argv=None):
logger.info("-" * 80)
logger.debug("detected running from source")
args.rules = os.path.join(os.path.dirname(__file__), "..", "rules")
args.rules = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "rules"))
logger.debug("default rule path (source method): %s", args.rules)
else:
logger.info("using rules path: %s", args.rules)
@@ -146,6 +146,24 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if args.signatures == capa.main.SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
sigs_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "sigs"))
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
try:
sig_paths = capa.main.get_signatures(sigs_path)
except (IOError) as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
format = "freeze"
with open(args.sample, "rb") as f:
@@ -154,7 +172,7 @@ def main(argv=None):
format = args.format
try:
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, args.signatures)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sig_paths)
except capa.main.UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")

View File

@@ -66,6 +66,7 @@ Example::
"""
import sys
import logging
import os.path
import argparse
import capa.main
@@ -95,14 +96,30 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if args.signatures == capa.main.SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
sigs_path = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "sigs"))
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
try:
sig_paths = capa.main.get_signatures(sigs_path)
except (IOError) as e:
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
try:
extractor = capa.main.get_extractor(
args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=args.signatures
)
extractor = capa.main.get_extractor(args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=sig_paths)
except capa.main.UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")