mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
Merge branch 'mandiant:master' into fix-shadowed-variable
This commit is contained in:
@@ -4,10 +4,10 @@
|
||||
|
||||
### New Features
|
||||
- add protobuf format for result documents #1219 @williballenthin @mr-tz
|
||||
|
||||
- extractor: add Binary Ninja feature extractor @xusheng6
|
||||
- new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42
|
||||
- Change colour/highlight to "cyan" instead of "blue" for easy noticing.#1384 @ggold7046
|
||||
- add new format to parse output json back to capa #1396 @ooprathamm
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
@@ -450,6 +450,7 @@ FORMAT_AUTO = "auto"
|
||||
FORMAT_SC32 = "sc32"
|
||||
FORMAT_SC64 = "sc64"
|
||||
FORMAT_FREEZE = "freeze"
|
||||
FORMAT_RESULT = "result"
|
||||
FORMAT_UNKNOWN = "unknown"
|
||||
|
||||
|
||||
|
||||
@@ -103,7 +103,7 @@ def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address
|
||||
|
||||
ordinal = sym.ordinal
|
||||
if ordinal != 0 and (lib_name != ""):
|
||||
ordinal_name = "#%d" % (ordinal)
|
||||
ordinal_name = f"#{ordinal}"
|
||||
for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name):
|
||||
yield Import(name), addr
|
||||
|
||||
@@ -147,7 +147,7 @@ def extract_file_format(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
|
||||
# no file type to return when processing a binary file, but we want to continue processing
|
||||
return
|
||||
else:
|
||||
raise NotImplementedError("unexpected file format: %d" % view_type)
|
||||
raise NotImplementedError(f"unexpected file format: {view_type}")
|
||||
|
||||
|
||||
def extract_features(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
|
||||
|
||||
@@ -26,7 +26,7 @@ if spec is not None:
|
||||
|
||||
|
||||
def find_binja_path() -> str:
|
||||
raw_output = subprocess.check_output(["python", "-c", "%s" % code]).decode("ascii").strip()
|
||||
raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip()
|
||||
return bytes.fromhex(raw_output).decode("utf8")
|
||||
|
||||
|
||||
|
||||
@@ -12,11 +12,14 @@ import capa.features.extractors.pefile
|
||||
import capa.features.extractors.strings
|
||||
from capa.features.common import (
|
||||
OS,
|
||||
OS_ANY,
|
||||
OS_AUTO,
|
||||
ARCH_ANY,
|
||||
FORMAT_PE,
|
||||
FORMAT_ELF,
|
||||
OS_WINDOWS,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
Arch,
|
||||
Format,
|
||||
String,
|
||||
@@ -27,6 +30,11 @@ from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# match strings for formats
|
||||
MATCH_PE = b"MZ"
|
||||
MATCH_ELF = b"\x7fELF"
|
||||
MATCH_RESULT = b'{"meta":'
|
||||
|
||||
|
||||
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
|
||||
"""
|
||||
@@ -40,12 +48,14 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
|
||||
|
||||
|
||||
def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
|
||||
if buf.startswith(b"MZ"):
|
||||
if buf.startswith(MATCH_PE):
|
||||
yield Format(FORMAT_PE), NO_ADDRESS
|
||||
elif buf.startswith(b"\x7fELF"):
|
||||
elif buf.startswith(MATCH_ELF):
|
||||
yield Format(FORMAT_ELF), NO_ADDRESS
|
||||
elif is_freeze(buf):
|
||||
yield Format(FORMAT_FREEZE), NO_ADDRESS
|
||||
elif buf.startswith(MATCH_RESULT):
|
||||
yield Format(FORMAT_RESULT), NO_ADDRESS
|
||||
else:
|
||||
# we likely end up here:
|
||||
# 1. handling a file format (e.g. macho)
|
||||
@@ -56,10 +66,13 @@ def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
|
||||
|
||||
|
||||
def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]:
|
||||
if buf.startswith(b"MZ"):
|
||||
if buf.startswith(MATCH_PE):
|
||||
yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf))
|
||||
|
||||
elif buf.startswith(MATCH_RESULT):
|
||||
yield Arch(ARCH_ANY), NO_ADDRESS
|
||||
|
||||
elif buf.startswith(b"\x7fELF"):
|
||||
elif buf.startswith(MATCH_ELF):
|
||||
with contextlib.closing(io.BytesIO(buf)) as f:
|
||||
arch = capa.features.extractors.elf.detect_elf_arch(f)
|
||||
|
||||
@@ -88,9 +101,11 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]:
|
||||
if os != OS_AUTO:
|
||||
yield OS(os), NO_ADDRESS
|
||||
|
||||
if buf.startswith(b"MZ"):
|
||||
if buf.startswith(MATCH_PE):
|
||||
yield OS(OS_WINDOWS), NO_ADDRESS
|
||||
elif buf.startswith(b"\x7fELF"):
|
||||
elif buf.startswith(MATCH_RESULT):
|
||||
yield OS(OS_ANY), NO_ADDRESS
|
||||
elif buf.startswith(MATCH_ELF):
|
||||
with contextlib.closing(io.BytesIO(buf)) as f:
|
||||
os = capa.features.extractors.elf.detect_elf_os(f)
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ class Number(Feature):
|
||||
elif isinstance(self.value, float):
|
||||
return str(self.value)
|
||||
else:
|
||||
raise ValueError("invalid value type %s" % (type(self.value)))
|
||||
raise ValueError(f"invalid value type {type(self.value)}")
|
||||
|
||||
|
||||
# max recognized structure size (and therefore, offset size)
|
||||
|
||||
32
capa/main.py
32
capa/main.py
@@ -69,6 +69,7 @@ from capa.features.common import (
|
||||
FORMAT_SC64,
|
||||
FORMAT_DOTNET,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
)
|
||||
from capa.features.address import NO_ADDRESS, Address
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
@@ -547,7 +548,7 @@ def get_extractor(
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
bv: BinaryView = BinaryViewType.get_view_of_file(path)
|
||||
if bv is None:
|
||||
raise RuntimeError("Binary Ninja cannot open file %s" % (path))
|
||||
raise RuntimeError(f"Binary Ninja cannot open file {path}")
|
||||
|
||||
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
|
||||
|
||||
@@ -912,12 +913,12 @@ def install_common_args(parser, wanted=None):
|
||||
(OS_MACOS,),
|
||||
(OS_WINDOWS,),
|
||||
]
|
||||
os_help = ", ".join(["%s (%s)" % (o[0], o[1]) if len(o) == 2 else o[0] for o in oses])
|
||||
os_help = ", ".join([f"{o[0]} ({o[1]})" if len(o) == 2 else o[0] for o in oses])
|
||||
parser.add_argument(
|
||||
"--os",
|
||||
choices=[o[0] for o in oses],
|
||||
default=OS_AUTO,
|
||||
help="select sample OS: %s" % os_help,
|
||||
help=f"select sample OS: {os_help}",
|
||||
)
|
||||
|
||||
if "rules" in wanted:
|
||||
@@ -1180,8 +1181,10 @@ def main(argv=None):
|
||||
if not (args.verbose or args.vverbose or args.json):
|
||||
logger.debug("file limitation short circuit, won't analyze fully.")
|
||||
return E_FILE_LIMITATION
|
||||
|
||||
if format_ == FORMAT_FREEZE:
|
||||
if format_ == FORMAT_RESULT:
|
||||
result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample)
|
||||
meta, capabilities = result_doc.to_capa()
|
||||
elif format_ == FORMAT_FREEZE:
|
||||
with open(args.sample, "rb") as f:
|
||||
extractor = capa.features.freeze.load(f.read())
|
||||
else:
|
||||
@@ -1217,17 +1220,18 @@ def main(argv=None):
|
||||
log_unsupported_os_error()
|
||||
return E_INVALID_FILE_OS
|
||||
|
||||
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
|
||||
if format_ != FORMAT_RESULT:
|
||||
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
|
||||
|
||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
|
||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
# do show the output in verbose mode, though.
|
||||
if not (args.verbose or args.vverbose or args.json):
|
||||
return E_FILE_LIMITATION
|
||||
if has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
# do show the output in verbose mode, though.
|
||||
if not (args.verbose or args.vverbose or args.json):
|
||||
return E_FILE_LIMITATION
|
||||
|
||||
if args.json:
|
||||
print(capa.render.json.render(meta, rules, capabilities))
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import datetime
|
||||
from typing import Any, Dict, Tuple, Union, Optional
|
||||
from typing import Any, Dict, List, Tuple, Union, Optional
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
|
||||
@@ -125,6 +125,41 @@ class Metadata(FrozenModel):
|
||||
),
|
||||
)
|
||||
|
||||
def to_capa(self) -> Dict[str, Any]:
|
||||
capa_meta = {
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"version": self.version,
|
||||
"sample": {
|
||||
"md5": self.sample.md5,
|
||||
"sha1": self.sample.sha1,
|
||||
"sha256": self.sample.sha256,
|
||||
"path": self.sample.path,
|
||||
},
|
||||
"analysis": {
|
||||
"format": self.analysis.format,
|
||||
"arch": self.analysis.arch,
|
||||
"os": self.analysis.os,
|
||||
"extractor": self.analysis.extractor,
|
||||
"rules": self.analysis.rules,
|
||||
"base_address": self.analysis.base_address.to_capa(),
|
||||
"layout": {
|
||||
"functions": {
|
||||
f.address.to_capa(): {
|
||||
"matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks]
|
||||
}
|
||||
for f in self.analysis.layout.functions
|
||||
}
|
||||
},
|
||||
"feature_counts": {
|
||||
"file": self.analysis.feature_counts.file,
|
||||
"functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions},
|
||||
},
|
||||
"library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions},
|
||||
},
|
||||
}
|
||||
|
||||
return capa_meta
|
||||
|
||||
|
||||
class CompoundStatementType:
|
||||
AND = "and"
|
||||
@@ -543,3 +578,38 @@ class ResultDocument(FrozenModel):
|
||||
)
|
||||
|
||||
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
|
||||
|
||||
def to_capa(self) -> Tuple[Dict, Dict]:
|
||||
meta = self.meta.to_capa()
|
||||
capabilities: Dict[str, List[Tuple[frz.Address, capa.features.common.Result]]] = {}
|
||||
|
||||
for rule_name, rule_match in self.rules.items():
|
||||
# Parse the YAML source into a Rule instance
|
||||
rule = capa.rules.Rule.from_yaml(rule_match.source)
|
||||
|
||||
# Extract the capabilities from the RuleMatches object
|
||||
for addr, match in rule_match.matches:
|
||||
if isinstance(match.node, StatementNode):
|
||||
if isinstance(match.node.statement, CompoundStatement):
|
||||
statement = rule.statement
|
||||
else:
|
||||
statement = statement_from_capa(match.node.statement)
|
||||
elif isinstance(match.node, FeatureNode):
|
||||
statement = match.node.feature.to_capa()
|
||||
if isinstance(statement, (capa.features.common.String, capa.features.common.Regex)):
|
||||
statement.matches = match.captures
|
||||
else:
|
||||
raise ValueError("Invalid node type")
|
||||
|
||||
result = capa.features.common.Result(
|
||||
statement=statement,
|
||||
success=match.success,
|
||||
locations=[frz.Address.to_capa(loc) for loc in match.locations],
|
||||
children=[],
|
||||
)
|
||||
|
||||
if rule_name not in capabilities:
|
||||
capabilities[rule_name] = []
|
||||
capabilities[rule_name].append((frz.Address.from_capa(addr), result))
|
||||
|
||||
return meta, capabilities
|
||||
|
||||
2
setup.py
2
setup.py
@@ -28,7 +28,7 @@ requirements = [
|
||||
"dnfile==0.13.0",
|
||||
"dncil==1.0.2",
|
||||
"pydantic==1.10.7",
|
||||
"protobuf==4.21.12",
|
||||
"protobuf==4.22.1",
|
||||
]
|
||||
|
||||
# this sets __version__
|
||||
|
||||
@@ -241,6 +241,8 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "kernel32-64.dll_")
|
||||
elif name == "pma01-01":
|
||||
return os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_")
|
||||
elif name == "pma01-01-rd":
|
||||
return os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json")
|
||||
elif name == "pma12-04":
|
||||
return os.path.join(CD, "data", "Practical Malware Analysis Lab 12-04.exe_")
|
||||
elif name == "pma16-01":
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import copy
|
||||
|
||||
import pytest
|
||||
import fixtures
|
||||
from fixtures import *
|
||||
|
||||
import capa
|
||||
@@ -268,3 +269,14 @@ def assert_round_trip(rd: rdoc.ResultDocument):
|
||||
def test_round_trip(request, rd_file):
|
||||
rd: rdoc.ResultDocument = request.getfixturevalue(rd_file)
|
||||
assert_round_trip(rd)
|
||||
|
||||
|
||||
def test_json_to_rdoc():
|
||||
path = fixtures.get_data_path_by_name("pma01-01-rd")
|
||||
assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument)
|
||||
|
||||
|
||||
def test_rdoc_to_capa():
|
||||
path = fixtures.get_data_path_by_name("pma01-01-rd")
|
||||
assert len(rdoc.ResultDocument.parse_file(path).to_capa()) == 2
|
||||
assert isinstance(rdoc.ResultDocument.parse_file(path).to_capa(), tuple)
|
||||
|
||||
@@ -81,4 +81,4 @@ def test_proto_conversion(tmpdir):
|
||||
p = run_program(get_script_path("proto-to-results.py"), [pb])
|
||||
assert p.returncode == 0
|
||||
|
||||
assert p.stdout.startswith(b'{\n "meta": ')
|
||||
assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')
|
||||
|
||||
Reference in New Issue
Block a user