mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
merge from master, sorry
This commit is contained in:
@@ -8,11 +8,7 @@
|
||||
|
||||
import types
|
||||
|
||||
import file
|
||||
import insn
|
||||
import function
|
||||
import viv_utils
|
||||
import basicblock
|
||||
|
||||
import capa.features.extractors
|
||||
import capa.features.extractors.viv.file
|
||||
@@ -42,7 +38,7 @@ def add_va_int_cast(o):
|
||||
this bit of skullduggery lets use cast viv-utils objects as ints.
|
||||
the correct way of doing this is to update viv-utils (or subclass the objects here).
|
||||
"""
|
||||
setattr(o, "__int__", types.MethodType(get_va, o, type(o)))
|
||||
setattr(o, "__int__", types.MethodType(get_va, o))
|
||||
return o
|
||||
|
||||
|
||||
|
||||
@@ -125,11 +125,16 @@ def get_printable_len(oper):
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
return all(ord(c) < 127 and c in string.printable for c in chars)
|
||||
try:
|
||||
chars_str = chars.decode("ascii")
|
||||
except UnicodeDecodeError:
|
||||
return False
|
||||
else:
|
||||
return all(c in string.printable for c in chars_str)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == "\x00" for c in chars[1::2]):
|
||||
if all(c == b"\x00" for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
|
||||
@@ -239,7 +239,7 @@ def read_bytes(vw, va):
|
||||
"""
|
||||
segm = vw.getSegment(va)
|
||||
if not segm:
|
||||
raise envi.SegmentationViolation()
|
||||
raise envi.SegmentationViolation(va)
|
||||
|
||||
segm_end = segm[0] + segm[1]
|
||||
try:
|
||||
@@ -499,6 +499,10 @@ def extract_insn_cross_section_cflow(f, bb, insn):
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
for va, flags in insn.getBranches():
|
||||
if va is None:
|
||||
# va may be none for dynamic branches that haven't been resolved, such as `jmp eax`.
|
||||
continue
|
||||
|
||||
if flags & envi.BR_FALL:
|
||||
continue
|
||||
|
||||
|
||||
@@ -264,6 +264,15 @@ def main(argv=None):
|
||||
parser.add_argument(
|
||||
"-f", "--format", choices=[f[0] for f in formats], default="auto", help="Select sample format, %s" % format_help
|
||||
)
|
||||
if sys.version_info >= (3, 0):
|
||||
parser.add_argument(
|
||||
"-b",
|
||||
"--backend",
|
||||
type=str,
|
||||
help="select the backend to use",
|
||||
choices=(capa.main.BACKEND_VIV, capa.main.BACKEND_SMDA),
|
||||
default=capa.main.BACKEND_VIV,
|
||||
)
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.quiet:
|
||||
@@ -276,7 +285,8 @@ def main(argv=None):
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
extractor = capa.main.get_extractor(args.sample, args.format)
|
||||
backend = args.backend if sys.version_info > (3, 0) else capa.main.BACKEND_VIV
|
||||
extractor = capa.main.get_extractor(args.sample, args.format, backend)
|
||||
with open(args.output, "wb") as f:
|
||||
f.write(dump(extractor))
|
||||
|
||||
|
||||
62
capa/main.py
62
capa/main.py
@@ -32,7 +32,9 @@ import capa.features.extractors
|
||||
from capa.helpers import oint, get_file_taste
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SUPPORTED_FILE_MAGIC = set(["MZ"])
|
||||
SUPPORTED_FILE_MAGIC = set([b"MZ"])
|
||||
BACKEND_VIV = "vivisect"
|
||||
BACKEND_SMDA = "smda"
|
||||
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
@@ -280,6 +282,8 @@ def get_workspace(path, format, should_save=True):
|
||||
vw = get_shellcode_vw(path, arch="i386", should_save=should_save)
|
||||
elif format == "sc64":
|
||||
vw = get_shellcode_vw(path, arch="amd64", should_save=should_save)
|
||||
else:
|
||||
raise ValueError("unexpected format: " + format)
|
||||
logger.debug("%s", get_meta_str(vw))
|
||||
return vw
|
||||
|
||||
@@ -303,29 +307,43 @@ class UnsupportedRuntimeError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def get_extractor_py3(path, format, disable_progress=False):
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
def get_extractor_py3(path, format, backend, disable_progress=False):
|
||||
if backend == "smda":
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda
|
||||
import capa.features.extractors.smda
|
||||
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
|
||||
else:
|
||||
import capa.features.extractors.viv
|
||||
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
vw = get_workspace(path, format, should_save=False)
|
||||
|
||||
try:
|
||||
vw.saveWorkspace()
|
||||
except IOError:
|
||||
# see #168 for discussion around how to handle non-writable directories
|
||||
logger.info("source directory is not writable, won't save intermediate workspace")
|
||||
|
||||
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
||||
|
||||
|
||||
def get_extractor(path, format, disable_progress=False):
|
||||
def get_extractor(path, format, backend, disable_progress=False):
|
||||
"""
|
||||
raises:
|
||||
UnsupportedFormatError:
|
||||
"""
|
||||
if sys.version_info >= (3, 0):
|
||||
return get_extractor_py3(path, format, disable_progress=disable_progress)
|
||||
return get_extractor_py3(path, format, backend, disable_progress=disable_progress)
|
||||
else:
|
||||
return get_extractor_py2(path, format, disable_progress=disable_progress)
|
||||
|
||||
@@ -442,6 +460,7 @@ def install_common_args(parser, wanted=None):
|
||||
wanted (Set[str]): collection of arguments to opt-into, including:
|
||||
- "sample": required positional argument to input file.
|
||||
- "format": flag to override file format.
|
||||
- "backend": flag to override analysis backend under py3.
|
||||
- "rules": flag to override path to capa rules.
|
||||
- "tag": flag to override/specify which rules to match.
|
||||
"""
|
||||
@@ -509,6 +528,16 @@ def install_common_args(parser, wanted=None):
|
||||
"-f", "--format", choices=[f[0] for f in formats], default="auto", help="select sample format, %s" % format_help
|
||||
)
|
||||
|
||||
if "backend" in wanted and sys.version_info >= (3, 0):
|
||||
parser.add_argument(
|
||||
"-b",
|
||||
"--backend",
|
||||
type=str,
|
||||
help="select the backend to use",
|
||||
choices=(BACKEND_VIV, BACKEND_SMDA),
|
||||
default=BACKEND_VIV,
|
||||
)
|
||||
|
||||
if "rules" in wanted:
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
@@ -600,7 +629,7 @@ def main(argv=None):
|
||||
parser = argparse.ArgumentParser(
|
||||
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
install_common_args(parser, {"sample", "format", "rules", "tag"})
|
||||
install_common_args(parser, {"sample", "format", "backend", "rules", "tag"})
|
||||
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||
args = parser.parse_args(args=argv)
|
||||
handle_common_args(args)
|
||||
@@ -669,7 +698,8 @@ def main(argv=None):
|
||||
else:
|
||||
format = args.format
|
||||
try:
|
||||
extractor = get_extractor(args.sample, args.format, disable_progress=args.quiet)
|
||||
backend = args.backend if sys.version_info > (3, 0) else capa.BACKEND_VIV
|
||||
extractor = get_extractor(args.sample, args.format, backend, disable_progress=args.quiet)
|
||||
except UnsupportedFormatError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE file.")
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 037a96d1b8...74f372149f
@@ -96,7 +96,7 @@ def get_capa_results(args):
|
||||
rules, format, path = args
|
||||
logger.info("computing capa results for: %s", path)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(path, format, disable_progress=True)
|
||||
extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, disable_progress=True)
|
||||
except capa.main.UnsupportedFormatError:
|
||||
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
|
||||
# so instead, return an object with explicit success/failure status.
|
||||
|
||||
@@ -192,7 +192,7 @@ def render_dictionary(doc):
|
||||
def capa_details(file_path, output_format="dictionary"):
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(file_path, "auto", disable_progress=True)
|
||||
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, disable_progress=True)
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
|
||||
@@ -201,7 +201,7 @@ class DoesntMatchExample(Lint):
|
||||
continue
|
||||
|
||||
try:
|
||||
extractor = capa.main.get_extractor(path, "auto", disable_progress=True)
|
||||
extractor = capa.main.get_extractor(path, "auto", capa.main.BACKEND_VIV, disable_progress=True)
|
||||
capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
|
||||
except Exception as e:
|
||||
logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e)
|
||||
|
||||
@@ -100,7 +100,7 @@ def main(argv=None):
|
||||
extractor = capa.features.freeze.load(f.read())
|
||||
else:
|
||||
try:
|
||||
extractor = capa.main.get_extractor(args.sample, args.format)
|
||||
extractor = capa.main.get_extractor(args.sample, args.format, capa.main.BACKEND_VIV)
|
||||
except capa.main.UnsupportedFormatError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE file.")
|
||||
|
||||
69
scripts/vivisect-py2-vs-py3.sh
Executable file
69
scripts/vivisect-py2-vs-py3.sh
Executable file
@@ -0,0 +1,69 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
int() {
|
||||
int=$(bc <<< "scale=0; ($1 + 0.5)/1")
|
||||
}
|
||||
|
||||
export TIMEFORMAT='%3R'
|
||||
threshold_time=90
|
||||
threshold_py3_time=60 # Do not warn if it doesn't take at least 1 minute to run
|
||||
rm tests/data/*.viv 2>/dev/null
|
||||
mkdir results
|
||||
for file in tests/data/*
|
||||
do
|
||||
file=$(printf %q "$file") # Handle names with white spaces
|
||||
file_name=$(basename $file)
|
||||
echo $file_name
|
||||
|
||||
rm "$file.viv" 2>/dev/null
|
||||
py3_time=$(sh -c "time python3 scripts/show-features.py $file >> results/p3-$file_name.out 2>/dev/null" 2>&1)
|
||||
rm "$file.viv" 2>/dev/null
|
||||
py2_time=$(sh -c "time python2 scripts/show-features.py $file >> results/p2-$file_name.out 2>/dev/null" 2>&1)
|
||||
|
||||
int $py3_time
|
||||
if (($int > $threshold_py3_time))
|
||||
then
|
||||
percentage=$(bc <<< "scale=3; $py2_time/$py3_time*100 + 0.5")
|
||||
int $percentage
|
||||
if (($int < $threshold_py3_time))
|
||||
then
|
||||
echo -n " SLOWER ($percentage): "
|
||||
fi
|
||||
fi
|
||||
echo " PY2($py2_time) PY3($py3_time)"
|
||||
done
|
||||
|
||||
threshold_features=98
|
||||
counter=0
|
||||
average=0
|
||||
results_for() {
|
||||
py3=$(cat "results/p3-$file_name.out" | grep "$1" | wc -l)
|
||||
py2=$(cat "results/p2-$file_name.out" | grep "$1" | wc -l)
|
||||
if (($py2 > 0))
|
||||
then
|
||||
percentage=$(bc <<< "scale=2; 100*$py3/$py2")
|
||||
average=$(bc <<< "scale=2; $percentage + $average")
|
||||
count=$(($count + 1))
|
||||
int $percentage
|
||||
if (($int < $threshold_features))
|
||||
then
|
||||
echo -e "$1: py2($py2) py3($py3) $percentage% - $file_name"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
rm tests/data/*.viv 2>/dev/null
|
||||
echo -e '\nRESULTS:'
|
||||
for file in tests/data/*
|
||||
do
|
||||
file_name=$(basename $file)
|
||||
if test -f "results/p2-$file_name.out"; then
|
||||
results_for 'insn'
|
||||
results_for 'file'
|
||||
results_for 'func'
|
||||
results_for 'bb'
|
||||
fi
|
||||
done
|
||||
|
||||
average=$(bc <<< "scale=2; $average/$count")
|
||||
echo "TOTAL: $average"
|
||||
2
setup.py
2
setup.py
@@ -27,6 +27,8 @@ if sys.version_info >= (3, 0):
|
||||
# py3
|
||||
requirements.append("halo")
|
||||
requirements.append("networkx")
|
||||
requirements.append("vivisect==1.0.0")
|
||||
requirements.append("viv-utils==0.3.19")
|
||||
requirements.append("smda==1.5.13")
|
||||
else:
|
||||
# py2
|
||||
|
||||
@@ -520,11 +520,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
|
||||
|
||||
|
||||
def get_extractor(path):
|
||||
if sys.version_info >= (3, 0):
|
||||
extractor = get_smda_extractor(path)
|
||||
else:
|
||||
extractor = get_viv_extractor(path)
|
||||
|
||||
extractor = get_viv_extractor(path)
|
||||
# overload the extractor so that the fixture exposes `extractor.path`
|
||||
setattr(extractor, "path", path)
|
||||
return extractor
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import json
|
||||
import textwrap
|
||||
|
||||
import pytest
|
||||
@@ -365,3 +366,20 @@ def test_not_render_rules_also_matched(z9324d_extractor, capsys):
|
||||
assert "act as TCP client" in std.out
|
||||
assert "connect TCP socket" in std.out
|
||||
assert "create TCP socket" in std.out
|
||||
|
||||
|
||||
# It tests main works with different backends
|
||||
def test_backend_option(capsys):
|
||||
if sys.version_info > (3, 0):
|
||||
path = get_data_path_by_name("pma16-01")
|
||||
assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_VIV]) == 0
|
||||
std = capsys.readouterr()
|
||||
std_json = json.loads(std.out)
|
||||
assert std_json["meta"]["analysis"]["extractor"] == "VivisectFeatureExtractor"
|
||||
assert len(std_json["rules"]) > 0
|
||||
|
||||
assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_SMDA]) == 0
|
||||
std = capsys.readouterr()
|
||||
std_json = json.loads(std.out)
|
||||
assert std_json["meta"]["analysis"]["extractor"] == "SmdaFeatureExtractor"
|
||||
assert len(std_json["rules"]) > 0
|
||||
|
||||
@@ -16,8 +16,7 @@ from fixtures import *
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_viv_features(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):
|
||||
do_test_feature_presence(get_viv_extractor, sample, scope, feature, expected)
|
||||
do_test_feature_presence(get_viv_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
@@ -26,5 +25,4 @@ def test_viv_features(sample, scope, feature, expected):
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_viv_feature_counts(sample, scope, feature, expected):
|
||||
with xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2"):
|
||||
do_test_feature_count(get_viv_extractor, sample, scope, feature, expected)
|
||||
do_test_feature_count(get_viv_extractor, sample, scope, feature, expected)
|
||||
|
||||
Reference in New Issue
Block a user