mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
library-detection: cleanup script
This commit is contained in:
@@ -108,6 +108,7 @@ repos:
|
||||
- "--check-untyped-defs"
|
||||
- "--ignore-missing-imports"
|
||||
- "--config-file=.github/mypy/mypy.ini"
|
||||
- "--enable-incomplete-feature=NewGenericSyntax"
|
||||
- "capa/"
|
||||
- "scripts/"
|
||||
- "tests/"
|
||||
|
||||
@@ -135,9 +135,6 @@ def main(argv=None):
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
@@ -154,7 +151,6 @@ def main(argv=None):
|
||||
|
||||
rich.print(table)
|
||||
|
||||
# TODO can we include which signature matched per function?
|
||||
for index in range(0, ida_funcs.get_idasgn_qty()):
|
||||
signame, optlibs, nmatches = ida_funcs.get_idasgn_desc_with_matches(index)
|
||||
rich.print(signame, optlibs, nmatches)
|
||||
|
||||
@@ -7,19 +7,16 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import io
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import argparse
|
||||
import tempfile
|
||||
import contextlib
|
||||
import collections
|
||||
from enum import Enum
|
||||
from typing import List, Literal, Optional
|
||||
from typing import List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from pydantic import Field, BaseModel
|
||||
from pydantic import BaseModel
|
||||
from rich.text import Text
|
||||
from rich.console import Console
|
||||
|
||||
@@ -39,7 +36,6 @@ import idaapi
|
||||
import idapro
|
||||
import ida_auto
|
||||
import idautils
|
||||
import ida_funcs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -56,20 +52,22 @@ class Method(str, Enum):
|
||||
|
||||
|
||||
class FunctionClassification(BaseModel):
|
||||
va: int # rva? va?
|
||||
classification: Literal[Classification.USER, Classification.LIBRARY, Classification.UNKNOWN]
|
||||
method: Literal[Method.FLIRT, Method.STRINGS]
|
||||
# if is library
|
||||
va: int
|
||||
classification: Classification
|
||||
method: Method
|
||||
|
||||
# if is library, these can optionally be provided.
|
||||
library_name: Optional[str] = None
|
||||
library_version: Optional[str] = None
|
||||
|
||||
|
||||
class Layout(BaseModel):
|
||||
functions: List[int] = list()
|
||||
functions: List[int]
|
||||
|
||||
|
||||
class FunctionIdResults(BaseModel):
|
||||
function_classifications: List[FunctionClassification] = list()
|
||||
function_classifications: List[FunctionClassification]
|
||||
|
||||
# layout: Layout
|
||||
|
||||
|
||||
@@ -81,7 +79,8 @@ def ida_session(input_path: Path, use_temp_dir=True):
|
||||
t = input_path
|
||||
|
||||
logger.debug("using %s", str(t))
|
||||
# stderr=True is used here to redirect the spinner banner to stderr, so that users can redirect capa's output.
|
||||
# stderr=True is used here to redirect the spinner banner to stderr,
|
||||
# so that users can redirect capa's output.
|
||||
console = Console(stderr=True, quiet=False)
|
||||
|
||||
try:
|
||||
@@ -92,9 +91,10 @@ def ida_session(input_path: Path, use_temp_dir=True):
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
idapro.enable_console_messages(False)
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
with capa.main.timing("analyze program"):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
@@ -114,6 +114,7 @@ def main(argv=None):
|
||||
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
|
||||
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
@@ -121,96 +122,70 @@ def main(argv=None):
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
N = 8
|
||||
time0 = time.time()
|
||||
|
||||
results = FunctionIdResults()
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
|
||||
|
||||
function_classifications: List[FunctionClassification] = []
|
||||
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
|
||||
for fid in capa.analysis.flirt.get_flirt_matches(lib_only=False):
|
||||
results.function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=fid.address,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.FLIRT,
|
||||
# note: we cannot currently include which signature matched per function via the IDA API
|
||||
with capa.main.timing("FLIRT-based library identification"):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
for flirt_match in capa.analysis.flirt.get_flirt_matches():
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=flirt_match.address,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.FLIRT,
|
||||
# note: we cannot currently include which signature matched per function via the IDA API
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
min, sec = divmod(time.time() - time0, 60)
|
||||
logger.debug("FLIRT-based library identification ran for ~ %02d:%02dm", min, sec)
|
||||
with capa.main.timing("string-based library identification"):
|
||||
for string_match in capa.analysis.strings.get_string_matches(dbs):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=string_match.va,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.STRINGS,
|
||||
library_name=string_match.metadata.library_name,
|
||||
library_version=string_match.metadata.library_version,
|
||||
)
|
||||
)
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||
table = rich.table.Table()
|
||||
table.add_column("FVA")
|
||||
table.add_column("CLASSIFICATION")
|
||||
table.add_column("METHOD")
|
||||
table.add_column("FNAME")
|
||||
table.add_column("EXTRA INFO")
|
||||
|
||||
console = rich.get_console()
|
||||
for function, strings in sorted(capa.analysis.strings.get_function_strings().items()):
|
||||
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||
for va in idautils.Functions(start=0, end=None):
|
||||
name = idaapi.get_func_name(va)
|
||||
if name.startswith("sub_"):
|
||||
name = Text(name, style="grey37")
|
||||
|
||||
matched_strings = set()
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if string in db.metadata_by_string:
|
||||
matched_strings.add(string)
|
||||
if classifications := classifications_by_va.get(va):
|
||||
classification = {c.classification for c in classifications}
|
||||
method = {c.method for c in classifications}
|
||||
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
|
||||
|
||||
if matched_strings:
|
||||
name = ida_funcs.get_func_name(function)
|
||||
table.add_row(
|
||||
hex(va),
|
||||
", ".join(classification),
|
||||
", ".join(method),
|
||||
name,
|
||||
", ".join(extra),
|
||||
)
|
||||
else:
|
||||
table.add_row(
|
||||
hex(va),
|
||||
Text("unknown", style="grey37"),
|
||||
"",
|
||||
name,
|
||||
)
|
||||
|
||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||
|
||||
for string in matched_strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
location = Text(
|
||||
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||
style="grey37",
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
results.function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=function,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.STRINGS,
|
||||
library_name=metadata.library_name,
|
||||
library_version=metadata.library_version,
|
||||
)
|
||||
)
|
||||
|
||||
# TODO: ensure there aren't conflicts among the matches
|
||||
|
||||
# RENDER
|
||||
table = rich.table.Table()
|
||||
table.add_column("FVA")
|
||||
# table.add_column("FNAME")
|
||||
table.add_column("CLASSIFICATION")
|
||||
table.add_column("METHOD")
|
||||
table.add_column("EXTRA INFO")
|
||||
|
||||
idx = collections.defaultdict(list)
|
||||
for r in sorted(results.function_classifications, key=lambda d: d.va):
|
||||
# idx[r.va].append(r)
|
||||
table.add_row(
|
||||
*[
|
||||
hex(r.va),
|
||||
# bug? idaapi.get_func_name(r.va),
|
||||
r.classification,
|
||||
r.method,
|
||||
f"{r.library_name}@{r.library_version}" if r.library_name else "",
|
||||
]
|
||||
)
|
||||
|
||||
# bug in IDA (no-op) when calling generator again?
|
||||
# for va in idautils.Functions(start=0, end=None):
|
||||
# if va in idx:
|
||||
# for d in idx[va]:
|
||||
# table.add_row([hex(va), ida_funcs.get_func_name(va), d.classification, d.method])
|
||||
# else:
|
||||
# table.add_row([hex(va)])
|
||||
|
||||
rich.print(table)
|
||||
rich.print(table)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -13,7 +13,7 @@ further requirements:
|
||||
import gzip
|
||||
import logging
|
||||
import collections
|
||||
from typing import Dict
|
||||
from typing import Any, Dict, Mapping
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
@@ -133,8 +133,9 @@ def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||
|
||||
try:
|
||||
from nltk.corpus import words as nltk_words
|
||||
|
||||
nltk_words.words()
|
||||
except ImportError, LookupError:
|
||||
except (ImportError, LookupError):
|
||||
# one-time download of dataset.
|
||||
# this probably doesn't work well for embedded use.
|
||||
import nltk
|
||||
@@ -207,3 +208,59 @@ def get_function_strings():
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
return strings_by_function
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringClassification:
|
||||
va: int
|
||||
string: str
|
||||
library_name: str
|
||||
metadata: LibraryString
|
||||
|
||||
|
||||
def create_index[T](s: list[T], k: str) -> Mapping[Any, list[T]]:
|
||||
"""create an index of the elements in `s` using the key `k`"""
|
||||
s_by_k = collections.defaultdict(list)
|
||||
for v in s:
|
||||
p = getattr(v, k)
|
||||
s_by_k[p].append(v)
|
||||
return s_by_k
|
||||
|
||||
|
||||
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
|
||||
matches: list[LibraryStringClassification] = []
|
||||
|
||||
for function, strings in sorted(get_function_strings().items()):
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
matches.append(
|
||||
LibraryStringClassification(
|
||||
va=function,
|
||||
string=string,
|
||||
library_name=metadata.library_name,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
|
||||
# if there are less than N strings per library, ignore that library
|
||||
matches_by_library = create_index(matches, "library_name")
|
||||
for library_name, library_matches in matches_by_library.items():
|
||||
if len(library_matches) > 5:
|
||||
continue
|
||||
|
||||
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
|
||||
matches = [m for m in matches if m.library_name != library_name]
|
||||
|
||||
# if there are conflicts within a single function, don't label it
|
||||
matches_by_function = create_index(matches, "va")
|
||||
for va, function_matches in matches_by_function.items():
|
||||
library_names = {m.library_name for m in function_matches}
|
||||
if len(library_names) == 1:
|
||||
continue
|
||||
|
||||
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
|
||||
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
|
||||
matches = [m for m in matches if m.va != va]
|
||||
|
||||
return matches
|
||||
|
||||
@@ -71,8 +71,6 @@ def main():
|
||||
# return early and don't do IDA analysis
|
||||
return
|
||||
|
||||
# TODO: ensure there are XXX matches for each library, or ignore those entries
|
||||
|
||||
open_ida(input_path)
|
||||
|
||||
import idaapi
|
||||
@@ -121,8 +119,6 @@ def main():
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
# TODO: ensure there aren't conflicts among the matches
|
||||
|
||||
console.print()
|
||||
|
||||
console.print(
|
||||
|
||||
@@ -77,6 +77,8 @@ dependencies = [
|
||||
"protobuf>=5",
|
||||
"msgspec>=0.18.6",
|
||||
"xmltodict>=0.13.0",
|
||||
# for library detection (in development)
|
||||
"nltk>=3",
|
||||
|
||||
# ---------------------------------------
|
||||
# Dependencies that we develop
|
||||
|
||||
Reference in New Issue
Block a user