mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 23:59:48 -08:00
Compare commits
18 Commits
wb/library
...
library-de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c58a616c1 | ||
|
|
1b72c81df1 | ||
|
|
deed98b87e | ||
|
|
2db0cc457f | ||
|
|
3cad8d12af | ||
|
|
5be96d7ddc | ||
|
|
a3b6aef67f | ||
|
|
077fa2e7e1 | ||
|
|
c3b8e7c638 | ||
|
|
4346922b9a | ||
|
|
d652192af1 | ||
|
|
d83750c901 | ||
|
|
8394b81841 | ||
|
|
febda7d0e2 | ||
|
|
f9abb5e83f | ||
|
|
f69602d085 | ||
|
|
ad187fc3bd | ||
|
|
637926e0b6 |
@@ -108,6 +108,7 @@ repos:
|
|||||||
- "--check-untyped-defs"
|
- "--check-untyped-defs"
|
||||||
- "--ignore-missing-imports"
|
- "--ignore-missing-imports"
|
||||||
- "--config-file=.github/mypy/mypy.ini"
|
- "--config-file=.github/mypy/mypy.ini"
|
||||||
|
- "--enable-incomplete-feature=NewGenericSyntax"
|
||||||
- "capa/"
|
- "capa/"
|
||||||
- "scripts/"
|
- "scripts/"
|
||||||
- "tests/"
|
- "tests/"
|
||||||
|
|||||||
38
capa/analysis/flirt.py
Normal file
38
capa/analysis/flirt.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
import capa.features.extractors.ida.idalib as idalib
|
||||||
|
|
||||||
|
if not idalib.has_idalib():
|
||||||
|
raise RuntimeError("cannot find IDA idalib module.")
|
||||||
|
|
||||||
|
if not idalib.load_idalib():
|
||||||
|
raise RuntimeError("failed to load IDA idalib module.")
|
||||||
|
|
||||||
|
import idaapi
|
||||||
|
import idautils
|
||||||
|
|
||||||
|
|
||||||
|
class FunctionId(BaseModel):
|
||||||
|
va: int
|
||||||
|
is_library: bool
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
def get_flirt_matches(lib_only=True):
|
||||||
|
for fva in idautils.Functions():
|
||||||
|
f = idaapi.get_func(fva)
|
||||||
|
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||||
|
fname = idaapi.get_func_name(fva)
|
||||||
|
|
||||||
|
if lib_only and not is_lib:
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield FunctionId(va=fva, is_library=is_lib, name=fname)
|
||||||
@@ -1,193 +1,335 @@
|
|||||||
"""
|
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||||
further requirements:
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
- nltk
|
# you may not use this file except in compliance with the License.
|
||||||
"""
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
import io
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
|
import argparse
|
||||||
|
import tempfile
|
||||||
|
import contextlib
|
||||||
import collections
|
import collections
|
||||||
|
from enum import Enum
|
||||||
|
from typing import List, Iterable, Optional
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import rich
|
import rich
|
||||||
|
from pydantic import BaseModel
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
|
from rich.console import Console
|
||||||
|
|
||||||
|
import capa.main
|
||||||
|
import capa.helpers
|
||||||
|
import capa.analysis.flirt
|
||||||
import capa.analysis.strings
|
import capa.analysis.strings
|
||||||
import capa.features.extractors.strings
|
import capa.features.extractors.ida.idalib as idalib
|
||||||
from capa.analysis.strings import LibraryStringDatabase
|
|
||||||
|
if not idalib.has_idalib():
|
||||||
|
raise RuntimeError("cannot find IDA idalib module.")
|
||||||
|
|
||||||
|
if not idalib.load_idalib():
|
||||||
|
raise RuntimeError("failed to load IDA idalib module.")
|
||||||
|
|
||||||
|
import idaapi
|
||||||
|
import idapro
|
||||||
|
import ida_auto
|
||||||
|
import idautils
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def extract_strings(buf, n=4):
|
class Classification(str, Enum):
|
||||||
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
|
USER = "user"
|
||||||
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
|
LIBRARY = "library"
|
||||||
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
|
||||||
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
class Method(str, Enum):
|
||||||
"""remove less trustyworthy database entries.
|
FLIRT = "flirt"
|
||||||
|
STRINGS = "strings"
|
||||||
|
THUNK = "thunk"
|
||||||
|
ENTRYPOINT = "entrypoint"
|
||||||
|
CALLGRAPH = "callgraph"
|
||||||
|
|
||||||
such as:
|
|
||||||
- those found in multiple databases
|
|
||||||
- those that are English words
|
|
||||||
- those that are too short
|
|
||||||
- Windows API and DLL names
|
|
||||||
"""
|
|
||||||
|
|
||||||
# TODO: consider applying these filters directly to the persisted databases, not at load time.
|
class FunctionClassification(BaseModel):
|
||||||
|
va: int
|
||||||
|
classification: Classification
|
||||||
|
# name per the disassembler/analysis tool
|
||||||
|
# may be combined with the recovered/suspected name TODO below
|
||||||
|
name: str
|
||||||
|
|
||||||
|
# if is library, this must be provided
|
||||||
|
method: Optional[Method]
|
||||||
|
|
||||||
|
# TODO if is library, recovered/suspected name?
|
||||||
|
|
||||||
|
# if is library, these can optionally be provided.
|
||||||
|
library_name: Optional[str] = None
|
||||||
|
library_version: Optional[str] = None
|
||||||
|
|
||||||
|
# additional note on the classification, TODO removeme if not useful beyond dev/debug
|
||||||
|
note: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class BinaryLayout(BaseModel):
|
||||||
|
va: int
|
||||||
|
# size of the function chunks in bytes
|
||||||
|
size: int
|
||||||
|
|
||||||
|
|
||||||
|
class FunctionIdResults(BaseModel):
|
||||||
|
function_classifications: List[FunctionClassification]
|
||||||
|
layout: List[BinaryLayout]
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def ida_session(input_path: Path, use_temp_dir=True):
|
||||||
|
if use_temp_dir:
|
||||||
|
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||||
|
else:
|
||||||
|
t = input_path
|
||||||
|
|
||||||
|
logger.debug("using %s", str(t))
|
||||||
|
# stderr=True is used here to redirect the spinner banner to stderr,
|
||||||
|
# so that users can redirect capa's output.
|
||||||
|
console = Console(stderr=True, quiet=False)
|
||||||
|
|
||||||
winapi = capa.analysis.strings.WindowsApiStringDatabase.from_defaults()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from nltk.corpus import words as nltk_words
|
if use_temp_dir:
|
||||||
except ImportError:
|
t.write_bytes(input_path.read_bytes())
|
||||||
# one-time download of dataset.
|
|
||||||
# this probably doesn't work well for embedded use.
|
|
||||||
import nltk
|
|
||||||
nltk.download("words")
|
|
||||||
from nltk.corpus import words as nltk_words
|
|
||||||
words = set(nltk_words.words())
|
|
||||||
|
|
||||||
counter = collections.Counter()
|
# idalib writes to stdout (ugh), so we have to capture that
|
||||||
to_remove = set()
|
# so as not to screw up structured output.
|
||||||
for db in dbs:
|
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||||
for string in db.metadata_by_string.keys():
|
idapro.enable_console_messages(False)
|
||||||
counter[string] += 1
|
with capa.main.timing("analyze program"):
|
||||||
|
with console.status("analyzing program...", spinner="dots"):
|
||||||
|
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||||
|
raise RuntimeError("failed to analyze input file")
|
||||||
|
|
||||||
if string in words:
|
logger.debug("idalib: waiting for analysis...")
|
||||||
to_remove.add(string)
|
ida_auto.auto_wait()
|
||||||
|
logger.debug("idalib: opened database.")
|
||||||
|
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
idapro.close_database()
|
||||||
|
if use_temp_dir:
|
||||||
|
t.unlink()
|
||||||
|
|
||||||
|
|
||||||
|
def get_library_called_functions(
|
||||||
|
function_classifications: list[FunctionClassification],
|
||||||
|
) -> Iterable[FunctionClassification]:
|
||||||
|
MAX_PASSES = 10
|
||||||
|
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||||
|
for n in range(MAX_PASSES):
|
||||||
|
found_new_lib_func = False
|
||||||
|
|
||||||
|
for fva in idautils.Functions():
|
||||||
|
if classifications_by_va.get(fva):
|
||||||
|
# already classified
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if len(string) < n:
|
for ref in idautils.CodeRefsTo(fva, True):
|
||||||
to_remove.add(string)
|
f: idaapi.func_t = idaapi.get_func(ref)
|
||||||
continue
|
if not f:
|
||||||
|
# no function associated with reference location
|
||||||
|
continue
|
||||||
|
|
||||||
if string in winapi.api_names:
|
ref_fva = f.start_ea
|
||||||
to_remove.add(string)
|
fname = idaapi.get_func_name(ref_fva)
|
||||||
continue
|
if fname in ("___tmainCRTStartup",):
|
||||||
|
# ignore library functions, where we know that they call user-code
|
||||||
|
# TODO(mr): extend this list
|
||||||
|
continue
|
||||||
|
|
||||||
if string in winapi.dll_names:
|
if classifications := classifications_by_va.get(ref_fva):
|
||||||
to_remove.add(string)
|
for c in classifications:
|
||||||
continue
|
if c.classification == Classification.LIBRARY:
|
||||||
|
fc = FunctionClassification(
|
||||||
|
va=fva,
|
||||||
|
name=idaapi.get_func_name(fva),
|
||||||
|
classification=Classification.LIBRARY,
|
||||||
|
method=Method.CALLGRAPH,
|
||||||
|
note=f"called by 0x{ref_fva:x} ({c.method.value}{f', {c.library_name}@{c.library_version})' if c.library_name else ')'}",
|
||||||
|
)
|
||||||
|
classifications_by_va[fva].append(fc)
|
||||||
|
yield fc
|
||||||
|
found_new_lib_func = True
|
||||||
|
break
|
||||||
|
|
||||||
for string, count in counter.most_common():
|
if not found_new_lib_func:
|
||||||
if count <= 1:
|
logger.debug("no update in pass %d, done here", n)
|
||||||
break
|
return
|
||||||
|
|
||||||
# remove strings that are seen in more than one database
|
|
||||||
to_remove.add(string)
|
|
||||||
|
|
||||||
for db in dbs:
|
|
||||||
for string in to_remove:
|
|
||||||
if string in db.metadata_by_string:
|
|
||||||
del db.metadata_by_string[string]
|
|
||||||
|
|
||||||
|
|
||||||
def open_ida(input_path: Path):
|
def is_thunk_function(fva):
|
||||||
import tempfile
|
f = idaapi.get_func(fva)
|
||||||
|
return bool(f.flags & idaapi.FUNC_THUNK)
|
||||||
import idapro
|
|
||||||
|
|
||||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
|
||||||
t.write_bytes(input_path.read_bytes())
|
|
||||||
# resource leak: we should delete this upon exit
|
|
||||||
|
|
||||||
idapro.enable_console_messages(False)
|
|
||||||
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
|
|
||||||
|
|
||||||
import ida_auto
|
|
||||||
ida_auto.auto_wait()
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_function_size(fva):
|
||||||
|
f = idaapi.get_func(fva)
|
||||||
|
assert f.start_ea == fva
|
||||||
|
return sum([end_ea - start_ea for (start_ea, end_ea) in idautils.Chunks(fva)])
|
||||||
|
|
||||||
def main():
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
# use n=8 to ignore common words
|
def main(argv=None):
|
||||||
N = 8
|
if argv is None:
|
||||||
|
argv = sys.argv[1:]
|
||||||
|
|
||||||
input_path = Path(sys.argv[1])
|
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||||
input_buf = input_path.read_bytes()
|
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||||
|
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
|
||||||
|
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
|
||||||
|
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||||
|
args = parser.parse_args(args=argv)
|
||||||
|
|
||||||
|
try:
|
||||||
|
capa.main.handle_common_args(args)
|
||||||
|
except capa.main.ShouldExitError as e:
|
||||||
|
return e.status_code
|
||||||
|
|
||||||
dbs = capa.analysis.strings.get_default_databases()
|
dbs = capa.analysis.strings.get_default_databases()
|
||||||
prune_databases(dbs, n=N)
|
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
|
||||||
|
|
||||||
strings_by_library = collections.defaultdict(set)
|
function_classifications: List[FunctionClassification] = []
|
||||||
for string in extract_strings(input_path.read_bytes(), n=N):
|
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
|
||||||
for db in dbs:
|
with capa.main.timing("FLIRT-based library identification"):
|
||||||
if (metadata := db.metadata_by_string.get(string.s)):
|
# TODO: add more signature (files)
|
||||||
strings_by_library[metadata.library_name].add(string.s)
|
# TOOD: apply more signatures
|
||||||
|
for flirt_match in capa.analysis.flirt.get_flirt_matches():
|
||||||
|
function_classifications.append(
|
||||||
|
FunctionClassification(
|
||||||
|
va=flirt_match.va,
|
||||||
|
name=flirt_match.name,
|
||||||
|
classification=Classification.LIBRARY,
|
||||||
|
method=Method.FLIRT,
|
||||||
|
# note: we cannot currently include which signature matched per function via the IDA API
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
console = rich.get_console()
|
# thunks
|
||||||
console.print(f"found libraries:", style="bold")
|
for fva in idautils.Functions():
|
||||||
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
|
if is_thunk_function(fva):
|
||||||
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
|
function_classifications.append(
|
||||||
|
FunctionClassification(
|
||||||
|
va=fva,
|
||||||
|
name=idaapi.get_func_name(fva),
|
||||||
|
classification=Classification.LIBRARY,
|
||||||
|
method=Method.THUNK,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
for string in sorted(strings)[:10]:
|
with capa.main.timing("string-based library identification"):
|
||||||
console.print(f" - {string}", markup=False, style="grey37")
|
for string_match in capa.analysis.strings.get_string_matches(dbs):
|
||||||
|
function_classifications.append(
|
||||||
|
FunctionClassification(
|
||||||
|
va=string_match.va,
|
||||||
|
name=idaapi.get_func_name(string_match.va),
|
||||||
|
classification=Classification.LIBRARY,
|
||||||
|
method=Method.STRINGS,
|
||||||
|
library_name=string_match.metadata.library_name,
|
||||||
|
library_version=string_match.metadata.library_version,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if len(strings) > 10:
|
for va in idautils.Functions():
|
||||||
console.print(" ...", style="grey37")
|
name = idaapi.get_func_name(va)
|
||||||
|
if name not in {
|
||||||
|
"WinMain",
|
||||||
|
"_main",
|
||||||
|
"main",
|
||||||
|
}:
|
||||||
|
continue
|
||||||
|
|
||||||
if not strings_by_library:
|
function_classifications.append(
|
||||||
console.print(" (none)", style="grey37")
|
FunctionClassification(
|
||||||
# since we're not going to find any strings
|
va=va,
|
||||||
# return early and don't do IDA analysis
|
name=name,
|
||||||
return
|
classification=Classification.USER,
|
||||||
|
method=Method.ENTRYPOINT,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: ensure there are XXX matches for each library, or ignore those entries
|
with capa.main.timing("call graph based library identification"):
|
||||||
|
for fc in get_library_called_functions(function_classifications):
|
||||||
|
function_classifications.append(fc)
|
||||||
|
|
||||||
open_ida(input_path)
|
doc = FunctionIdResults(function_classifications=[], layout=[])
|
||||||
|
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||||
|
for va in idautils.Functions():
|
||||||
|
if classifications := classifications_by_va.get(va):
|
||||||
|
doc.function_classifications.extend(classifications)
|
||||||
|
else:
|
||||||
|
doc.function_classifications.append(
|
||||||
|
FunctionClassification(
|
||||||
|
va=va,
|
||||||
|
name=idaapi.get_func_name(va),
|
||||||
|
classification=Classification.UNKNOWN,
|
||||||
|
method=None,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
doc.layout.append(
|
||||||
|
BinaryLayout(
|
||||||
|
va=va,
|
||||||
|
size=get_function_size(va),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
import idaapi
|
if args.json:
|
||||||
import idautils
|
print(doc.model_dump_json()) # noqa: T201 print found
|
||||||
import ida_funcs
|
|
||||||
import capa.features.extractors.ida.helpers as ida_helpers
|
|
||||||
|
|
||||||
strings_by_function = collections.defaultdict(set)
|
else:
|
||||||
for ea in idautils.Functions():
|
table = rich.table.Table(
|
||||||
f = idaapi.get_func(ea)
|
"FVA",
|
||||||
|
"CLASSIFICATION",
|
||||||
|
"METHOD",
|
||||||
|
"FNAME",
|
||||||
|
"EXTRA",
|
||||||
|
"SIZE"
|
||||||
|
)
|
||||||
|
|
||||||
# ignore library functions and thunk functions as identified by IDA
|
classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
|
||||||
if f.flags & idaapi.FUNC_THUNK:
|
size_by_va = {layout.va: layout.size for layout in doc.layout}
|
||||||
continue
|
size_by_classification = collections.defaultdict(int)
|
||||||
if f.flags & idaapi.FUNC_LIB:
|
for va, classifications in classifications_by_va.items():
|
||||||
continue
|
# TODO count of classifications if multiple?
|
||||||
|
name = ", ".join({c.name for c in classifications})
|
||||||
|
if "sub_" in name:
|
||||||
|
name = Text(name, style="grey53")
|
||||||
|
|
||||||
for bb in ida_helpers.get_function_blocks(f):
|
classification = {c.classification for c in classifications}
|
||||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
method = {c.method for c in classifications if c.method}
|
||||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
|
||||||
if ref == insn.ea:
|
note = {f"{c.note}" for c in classifications if c.note}
|
||||||
continue
|
|
||||||
|
|
||||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
table.add_row(
|
||||||
if not string:
|
hex(va),
|
||||||
continue
|
", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
|
||||||
|
", ".join(method),
|
||||||
|
name,
|
||||||
|
f"{', '.join(extra)} {', '.join(note)}",
|
||||||
|
f"{size_by_va[va]}",
|
||||||
|
)
|
||||||
|
|
||||||
for db in dbs:
|
size_by_classification["-".join(classification)] += size_by_va[va]
|
||||||
if (metadata := db.metadata_by_string.get(string)):
|
|
||||||
strings_by_function[ea].add(string)
|
|
||||||
|
|
||||||
# ensure there are at least XXX functions renamed, or ignore those entries
|
rich.print(table)
|
||||||
|
|
||||||
console.print("functions:", style="bold")
|
stats_table = rich.table.Table(
|
||||||
for function, strings in sorted(strings_by_function.items()):
|
"ID", rich.table.Column("SIZE", justify="right"), rich.table.Column("%", justify="right")
|
||||||
if strings:
|
)
|
||||||
name = ida_funcs.get_func_name(function)
|
size_all = sum(size_by_classification.values())
|
||||||
|
for k, s in size_by_classification.items():
|
||||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
stats_table.add_row(k, f"{s:d}", f"{100 * s / size_all:.2f}")
|
||||||
|
rich.print(stats_table)
|
||||||
for string in strings:
|
|
||||||
for db in dbs:
|
|
||||||
if (metadata := db.metadata_by_string.get(string)):
|
|
||||||
location = Text(f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}", style="grey37")
|
|
||||||
console.print(" - ", location, ": ", string.rstrip())
|
|
||||||
|
|
||||||
# TODO: ensure there aren't conflicts among the matches
|
|
||||||
|
|
||||||
console.print()
|
|
||||||
|
|
||||||
console.print(f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
sys.exit(main())
|
||||||
|
|||||||
2
capa/analysis/requirements.txt
Normal file
2
capa/analysis/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
# temporary extra file to track dependencies of the analysis directory
|
||||||
|
nltk==3.9.1
|
||||||
@@ -1,10 +1,28 @@
|
|||||||
|
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
further requirements:
|
||||||
|
- nltk
|
||||||
|
"""
|
||||||
import gzip
|
import gzip
|
||||||
import pathlib
|
import logging
|
||||||
from typing import Dict, Sequence
|
import collections
|
||||||
|
from typing import Any, Dict, Mapping
|
||||||
|
from pathlib import Path
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
|
|
||||||
|
import capa.features.extractors.strings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LibraryString(msgspec.Struct):
|
class LibraryString(msgspec.Struct):
|
||||||
string: str
|
string: str
|
||||||
@@ -23,7 +41,7 @@ class LibraryStringDatabase:
|
|||||||
return len(self.metadata_by_string)
|
return len(self.metadata_by_string)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_file(cls, path: pathlib.Path) -> "LibraryStringDatabase":
|
def from_file(cls, path: Path) -> "LibraryStringDatabase":
|
||||||
metadata_by_string: Dict[str, LibraryString] = {}
|
metadata_by_string: Dict[str, LibraryString] = {}
|
||||||
decoder = msgspec.json.Decoder(type=LibraryString)
|
decoder = msgspec.json.Decoder(type=LibraryString)
|
||||||
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
|
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
|
||||||
@@ -55,12 +73,12 @@ DEFAULT_FILENAMES = (
|
|||||||
"zlib.jsonl.gz",
|
"zlib.jsonl.gz",
|
||||||
)
|
)
|
||||||
|
|
||||||
DEFAULT_PATHS = tuple(
|
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
|
||||||
pathlib.Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES
|
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
|
||||||
) + (pathlib.Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_default_databases() -> Sequence[LibraryStringDatabase]:
|
def get_default_databases() -> list[LibraryStringDatabase]:
|
||||||
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
|
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
|
||||||
|
|
||||||
|
|
||||||
@@ -73,9 +91,9 @@ class WindowsApiStringDatabase:
|
|||||||
return len(self.dll_names) + len(self.api_names)
|
return len(self.dll_names) + len(self.api_names)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase":
|
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
|
||||||
dll_names: Set[str] = set()
|
dll_names: set[str] = set()
|
||||||
api_names: Set[str] = set()
|
api_names: set[str] = set()
|
||||||
|
|
||||||
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||||
if not line:
|
if not line:
|
||||||
@@ -91,5 +109,161 @@ class WindowsApiStringDatabase:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_defaults(cls) -> "WindowsApiStringDatabase":
|
def from_defaults(cls) -> "WindowsApiStringDatabase":
|
||||||
return cls.from_dir(pathlib.Path(__file__).parent / "data" / "winapi")
|
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
|
||||||
|
|
||||||
|
|
||||||
|
def extract_strings(buf, n=4):
|
||||||
|
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
|
||||||
|
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
|
||||||
|
|
||||||
|
|
||||||
|
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||||
|
"""remove less trustyworthy database entries.
|
||||||
|
|
||||||
|
such as:
|
||||||
|
- those found in multiple databases
|
||||||
|
- those that are English words
|
||||||
|
- those that are too short
|
||||||
|
- Windows API and DLL names
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO: consider applying these filters directly to the persisted databases, not at load time.
|
||||||
|
|
||||||
|
winapi = WindowsApiStringDatabase.from_defaults()
|
||||||
|
|
||||||
|
try:
|
||||||
|
from nltk.corpus import words as nltk_words
|
||||||
|
|
||||||
|
nltk_words.words()
|
||||||
|
except (ImportError, LookupError):
|
||||||
|
# one-time download of dataset.
|
||||||
|
# this probably doesn't work well for embedded use.
|
||||||
|
import nltk
|
||||||
|
|
||||||
|
nltk.download("words")
|
||||||
|
from nltk.corpus import words as nltk_words
|
||||||
|
words = set(nltk_words.words())
|
||||||
|
|
||||||
|
counter: collections.Counter[str] = collections.Counter()
|
||||||
|
to_remove = set()
|
||||||
|
for db in dbs:
|
||||||
|
for string in db.metadata_by_string.keys():
|
||||||
|
counter[string] += 1
|
||||||
|
|
||||||
|
if string in words:
|
||||||
|
to_remove.add(string)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if len(string) < n:
|
||||||
|
to_remove.add(string)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if string in winapi.api_names:
|
||||||
|
to_remove.add(string)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if string in winapi.dll_names:
|
||||||
|
to_remove.add(string)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for string, count in counter.most_common():
|
||||||
|
if count <= 1:
|
||||||
|
break
|
||||||
|
|
||||||
|
# remove strings that are seen in more than one database
|
||||||
|
to_remove.add(string)
|
||||||
|
|
||||||
|
for db in dbs:
|
||||||
|
for string in to_remove:
|
||||||
|
if string in db.metadata_by_string:
|
||||||
|
del db.metadata_by_string[string]
|
||||||
|
|
||||||
|
|
||||||
|
def get_function_strings():
|
||||||
|
import idaapi
|
||||||
|
import idautils
|
||||||
|
|
||||||
|
import capa.features.extractors.ida.helpers as ida_helpers
|
||||||
|
|
||||||
|
strings_by_function = collections.defaultdict(set)
|
||||||
|
for ea in idautils.Functions():
|
||||||
|
f = idaapi.get_func(ea)
|
||||||
|
|
||||||
|
# ignore library functions and thunk functions as identified by IDA
|
||||||
|
if f.flags & idaapi.FUNC_THUNK:
|
||||||
|
continue
|
||||||
|
if f.flags & idaapi.FUNC_LIB:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for bb in ida_helpers.get_function_blocks(f):
|
||||||
|
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||||
|
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||||
|
if ref == insn.ea:
|
||||||
|
continue
|
||||||
|
|
||||||
|
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||||
|
if not string:
|
||||||
|
continue
|
||||||
|
|
||||||
|
strings_by_function[ea].add(string)
|
||||||
|
|
||||||
|
return strings_by_function
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class LibraryStringClassification:
|
||||||
|
va: int
|
||||||
|
string: str
|
||||||
|
library_name: str
|
||||||
|
metadata: LibraryString
|
||||||
|
|
||||||
|
|
||||||
|
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
|
||||||
|
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
|
||||||
|
if sorted_:
|
||||||
|
s = sorted(s, key=lambda x: getattr(x, k))
|
||||||
|
|
||||||
|
s_by_k = collections.defaultdict(list)
|
||||||
|
for v in s:
|
||||||
|
p = getattr(v, k)
|
||||||
|
s_by_k[p].append(v)
|
||||||
|
return s_by_k
|
||||||
|
|
||||||
|
|
||||||
|
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
|
||||||
|
matches: list[LibraryStringClassification] = []
|
||||||
|
|
||||||
|
for function, strings in sorted(get_function_strings().items()):
|
||||||
|
for string in strings:
|
||||||
|
for db in dbs:
|
||||||
|
if metadata := db.metadata_by_string.get(string):
|
||||||
|
matches.append(
|
||||||
|
LibraryStringClassification(
|
||||||
|
va=function,
|
||||||
|
string=string,
|
||||||
|
library_name=metadata.library_name,
|
||||||
|
metadata=metadata,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# if there are less than N strings per library, ignore that library
|
||||||
|
matches_by_library = create_index(matches, "library_name")
|
||||||
|
for library_name, library_matches in matches_by_library.items():
|
||||||
|
if len(library_matches) > 5:
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
|
||||||
|
matches = [m for m in matches if m.library_name != library_name]
|
||||||
|
|
||||||
|
# if there are conflicts within a single function, don't label it
|
||||||
|
matches_by_function = create_index(matches, "va")
|
||||||
|
for va, function_matches in matches_by_function.items():
|
||||||
|
library_names = {m.library_name for m in function_matches}
|
||||||
|
if len(library_names) == 1:
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
|
||||||
|
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
|
||||||
|
matches = [m for m in matches if m.va != va]
|
||||||
|
|
||||||
|
return matches
|
||||||
|
|||||||
130
capa/analysis/strings/__main__.py
Normal file
130
capa/analysis/strings/__main__.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import collections
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import rich
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
import capa.analysis.strings
|
||||||
|
import capa.features.extractors.strings
|
||||||
|
import capa.features.extractors.ida.helpers as ida_helpers
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def open_ida(input_path: Path):
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import idapro
|
||||||
|
|
||||||
|
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||||
|
t.write_bytes(input_path.read_bytes())
|
||||||
|
# resource leak: we should delete this upon exit
|
||||||
|
|
||||||
|
idapro.enable_console_messages(False)
|
||||||
|
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
|
||||||
|
|
||||||
|
import ida_auto
|
||||||
|
|
||||||
|
ida_auto.auto_wait()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
# use n=8 to ignore common words
|
||||||
|
N = 8
|
||||||
|
|
||||||
|
input_path = Path(sys.argv[1])
|
||||||
|
|
||||||
|
dbs = capa.analysis.strings.get_default_databases()
|
||||||
|
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||||
|
|
||||||
|
strings_by_library = collections.defaultdict(set)
|
||||||
|
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
|
||||||
|
for db in dbs:
|
||||||
|
if metadata := db.metadata_by_string.get(string.s):
|
||||||
|
strings_by_library[metadata.library_name].add(string.s)
|
||||||
|
|
||||||
|
console = rich.get_console()
|
||||||
|
console.print("found libraries:", style="bold")
|
||||||
|
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
|
||||||
|
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
|
||||||
|
|
||||||
|
for string in sorted(strings)[:10]:
|
||||||
|
console.print(f" - {string}", markup=False, style="grey37")
|
||||||
|
|
||||||
|
if len(strings) > 10:
|
||||||
|
console.print(" ...", style="grey37")
|
||||||
|
|
||||||
|
if not strings_by_library:
|
||||||
|
console.print(" (none)", style="grey37")
|
||||||
|
# since we're not going to find any strings
|
||||||
|
# return early and don't do IDA analysis
|
||||||
|
return
|
||||||
|
|
||||||
|
open_ida(input_path)
|
||||||
|
|
||||||
|
import idaapi
|
||||||
|
import idautils
|
||||||
|
import ida_funcs
|
||||||
|
|
||||||
|
strings_by_function = collections.defaultdict(set)
|
||||||
|
for ea in idautils.Functions():
|
||||||
|
f = idaapi.get_func(ea)
|
||||||
|
|
||||||
|
# ignore library functions and thunk functions as identified by IDA
|
||||||
|
if f.flags & idaapi.FUNC_THUNK:
|
||||||
|
continue
|
||||||
|
if f.flags & idaapi.FUNC_LIB:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for bb in ida_helpers.get_function_blocks(f):
|
||||||
|
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||||
|
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||||
|
if ref == insn.ea:
|
||||||
|
continue
|
||||||
|
|
||||||
|
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||||
|
if not string:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for db in dbs:
|
||||||
|
if metadata := db.metadata_by_string.get(string):
|
||||||
|
strings_by_function[ea].add(string)
|
||||||
|
|
||||||
|
# ensure there are at least XXX functions renamed, or ignore those entries
|
||||||
|
|
||||||
|
console.print("functions:", style="bold")
|
||||||
|
for function, strings in sorted(strings_by_function.items()):
|
||||||
|
if strings:
|
||||||
|
name = ida_funcs.get_func_name(function)
|
||||||
|
|
||||||
|
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||||
|
|
||||||
|
for string in strings:
|
||||||
|
for db in dbs:
|
||||||
|
if metadata := db.metadata_by_string.get(string):
|
||||||
|
location = Text(
|
||||||
|
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||||
|
style="grey37",
|
||||||
|
)
|
||||||
|
console.print(" - ", location, ": ", string.rstrip())
|
||||||
|
|
||||||
|
console.print()
|
||||||
|
|
||||||
|
console.print(
|
||||||
|
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -1,52 +0,0 @@
|
|||||||
"""
|
|
||||||
convert from a jh CSV file to a .jsonl.gz OpenSourceString database.
|
|
||||||
|
|
||||||
the jh file looks like:
|
|
||||||
|
|
||||||
# triplet,compiler,library,version,profile,path,function,type,value
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffff8
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffffe
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,BZ2_bzCompressInit
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,handle_compress
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0x0000fa90
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff8
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff9
|
|
||||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffffd
|
|
||||||
|
|
||||||
jh is found here: https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs
|
|
||||||
"""
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import pathlib
|
|
||||||
|
|
||||||
import msgspec
|
|
||||||
|
|
||||||
from capa.analysis.strings import LibraryString
|
|
||||||
|
|
||||||
p = pathlib.Path(sys.argv[1])
|
|
||||||
for line in p.read_text().split("\n"):
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if line.startswith("#"):
|
|
||||||
continue
|
|
||||||
|
|
||||||
triplet, compiler, library, version, profile, path, function, rest = line.split(",", 7)
|
|
||||||
type, _, value = rest.partition(",")
|
|
||||||
if type != "string":
|
|
||||||
continue
|
|
||||||
|
|
||||||
if value.startswith('"'):
|
|
||||||
value = json.loads(value)
|
|
||||||
|
|
||||||
s = LibraryString(
|
|
||||||
string=value,
|
|
||||||
library_name=library,
|
|
||||||
library_version=version,
|
|
||||||
file_path=path,
|
|
||||||
function_name=function,
|
|
||||||
)
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(msgspec.json.encode(s))
|
|
||||||
sys.stdout.buffer.write(b"\n")
|
|
||||||
@@ -77,6 +77,8 @@ dependencies = [
|
|||||||
"protobuf>=5",
|
"protobuf>=5",
|
||||||
"msgspec>=0.18.6",
|
"msgspec>=0.18.6",
|
||||||
"xmltodict>=0.13.0",
|
"xmltodict>=0.13.0",
|
||||||
|
# for library detection (in development)
|
||||||
|
"nltk>=3",
|
||||||
|
|
||||||
# ---------------------------------------
|
# ---------------------------------------
|
||||||
# Dependencies that we develop
|
# Dependencies that we develop
|
||||||
|
|||||||
Reference in New Issue
Block a user