Compare commits

...

18 Commits

Author SHA1 Message Date
mr-tz
8c58a616c1 add pseudo layout with function chunks size and basic stats table 2024-10-23 09:11:10 +00:00
mr-tz
1b72c81df1 do not propagate library name on callgraph id 2024-10-23 07:40:37 +00:00
mr-tz
deed98b87e add call graph based id of functions called by library code 2024-10-22 13:00:01 +00:00
Willi Ballenthin
2db0cc457f detect user code via entry points (main function name) 2024-10-22 09:21:59 +00:00
Willi Ballenthin
3cad8d12af mute unknown lines 2024-10-22 09:21:40 +00:00
Willi Ballenthin
5be96d7ddc consider thunks library functions 2024-10-22 09:21:16 +00:00
mr-tz
a3b6aef67f render from doc 2024-10-21 12:43:47 +00:00
mr-tz
077fa2e7e1 simplify and include thunks 2024-10-21 11:50:25 +00:00
mr-tz
c3b8e7c638 remove Python 3.12 syntax 2024-10-21 11:49:45 +00:00
Willi Ballenthin
4346922b9a library-detection: add json output format 2024-10-21 10:42:30 +00:00
Willi Ballenthin
d652192af1 library-detection: cleanup script 2024-10-21 10:26:19 +00:00
Moritz
d83750c901 Add LookupError exception 2024-10-15 17:10:59 +02:00
mr-tz
8394b81841 init add result structure and render 2024-10-14 16:05:01 +00:00
mr-tz
febda7d0e2 add option to save idb 2024-10-14 06:15:06 +00:00
mr-tz
f9abb5e83f ease/document extra dependency 2024-10-14 05:53:03 +00:00
Willi Ballenthin
f69602d085 library detection: rough integration of algorithms 2024-10-11 15:58:37 +00:00
Willi Ballenthin
ad187fc3bd library detection: merge flirt and string branches 2024-10-11 13:43:10 +00:00
mr-tz
637926e0b6 initial commit of out-of-the box flirt-based library id 2024-10-11 12:36:42 +00:00
8 changed files with 640 additions and 203 deletions

View File

@@ -108,6 +108,7 @@ repos:
- "--check-untyped-defs" - "--check-untyped-defs"
- "--ignore-missing-imports" - "--ignore-missing-imports"
- "--config-file=.github/mypy/mypy.ini" - "--config-file=.github/mypy/mypy.ini"
- "--enable-incomplete-feature=NewGenericSyntax"
- "capa/" - "capa/"
- "scripts/" - "scripts/"
- "tests/" - "tests/"

38
capa/analysis/flirt.py Normal file
View File

@@ -0,0 +1,38 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from pydantic import BaseModel
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import idaapi
import idautils
class FunctionId(BaseModel):
va: int
is_library: bool
name: str
def get_flirt_matches(lib_only=True):
for fva in idautils.Functions():
f = idaapi.get_func(fva)
is_lib = bool(f.flags & idaapi.FUNC_LIB)
fname = idaapi.get_func_name(fva)
if lib_only and not is_lib:
continue
yield FunctionId(va=fva, is_library=is_lib, name=fname)

View File

@@ -1,193 +1,335 @@
""" # Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
further requirements: # Licensed under the Apache License, Version 2.0 (the "License");
- nltk # you may not use this file except in compliance with the License.
""" # You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import sys import sys
import logging import logging
import argparse
import tempfile
import contextlib
import collections import collections
from enum import Enum
from typing import List, Iterable, Optional
from pathlib import Path from pathlib import Path
import rich import rich
from pydantic import BaseModel
from rich.text import Text from rich.text import Text
from rich.console import Console
import capa.main
import capa.helpers
import capa.analysis.flirt
import capa.analysis.strings import capa.analysis.strings
import capa.features.extractors.strings import capa.features.extractors.ida.idalib as idalib
from capa.analysis.strings import LibraryStringDatabase
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import idaapi
import idapro
import ida_auto
import idautils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def extract_strings(buf, n=4): class Classification(str, Enum):
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n) USER = "user"
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n) LIBRARY = "library"
UNKNOWN = "unknown"
def prune_databases(dbs: list[LibraryStringDatabase], n=8): class Method(str, Enum):
"""remove less trustyworthy database entries. FLIRT = "flirt"
STRINGS = "strings"
THUNK = "thunk"
ENTRYPOINT = "entrypoint"
CALLGRAPH = "callgraph"
such as:
- those found in multiple databases
- those that are English words
- those that are too short
- Windows API and DLL names
"""
# TODO: consider applying these filters directly to the persisted databases, not at load time. class FunctionClassification(BaseModel):
va: int
classification: Classification
# name per the disassembler/analysis tool
# may be combined with the recovered/suspected name TODO below
name: str
# if is library, this must be provided
method: Optional[Method]
# TODO if is library, recovered/suspected name?
# if is library, these can optionally be provided.
library_name: Optional[str] = None
library_version: Optional[str] = None
# additional note on the classification, TODO removeme if not useful beyond dev/debug
note: Optional[str] = None
class BinaryLayout(BaseModel):
va: int
# size of the function chunks in bytes
size: int
class FunctionIdResults(BaseModel):
function_classifications: List[FunctionClassification]
layout: List[BinaryLayout]
@contextlib.contextmanager
def ida_session(input_path: Path, use_temp_dir=True):
if use_temp_dir:
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
else:
t = input_path
logger.debug("using %s", str(t))
# stderr=True is used here to redirect the spinner banner to stderr,
# so that users can redirect capa's output.
console = Console(stderr=True, quiet=False)
winapi = capa.analysis.strings.WindowsApiStringDatabase.from_defaults()
try: try:
from nltk.corpus import words as nltk_words if use_temp_dir:
except ImportError: t.write_bytes(input_path.read_bytes())
# one-time download of dataset.
# this probably doesn't work well for embedded use.
import nltk
nltk.download("words")
from nltk.corpus import words as nltk_words
words = set(nltk_words.words())
counter = collections.Counter() # idalib writes to stdout (ugh), so we have to capture that
to_remove = set() # so as not to screw up structured output.
for db in dbs: with capa.helpers.stdout_redirector(io.BytesIO()):
for string in db.metadata_by_string.keys(): idapro.enable_console_messages(False)
counter[string] += 1 with capa.main.timing("analyze program"):
with console.status("analyzing program...", spinner="dots"):
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
raise RuntimeError("failed to analyze input file")
if string in words: logger.debug("idalib: waiting for analysis...")
to_remove.add(string) ida_auto.auto_wait()
logger.debug("idalib: opened database.")
yield
finally:
idapro.close_database()
if use_temp_dir:
t.unlink()
def get_library_called_functions(
function_classifications: list[FunctionClassification],
) -> Iterable[FunctionClassification]:
MAX_PASSES = 10
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
for n in range(MAX_PASSES):
found_new_lib_func = False
for fva in idautils.Functions():
if classifications_by_va.get(fva):
# already classified
continue continue
if len(string) < n: for ref in idautils.CodeRefsTo(fva, True):
to_remove.add(string) f: idaapi.func_t = idaapi.get_func(ref)
continue if not f:
# no function associated with reference location
continue
if string in winapi.api_names: ref_fva = f.start_ea
to_remove.add(string) fname = idaapi.get_func_name(ref_fva)
continue if fname in ("___tmainCRTStartup",):
# ignore library functions, where we know that they call user-code
# TODO(mr): extend this list
continue
if string in winapi.dll_names: if classifications := classifications_by_va.get(ref_fva):
to_remove.add(string) for c in classifications:
continue if c.classification == Classification.LIBRARY:
fc = FunctionClassification(
va=fva,
name=idaapi.get_func_name(fva),
classification=Classification.LIBRARY,
method=Method.CALLGRAPH,
note=f"called by 0x{ref_fva:x} ({c.method.value}{f', {c.library_name}@{c.library_version})' if c.library_name else ')'}",
)
classifications_by_va[fva].append(fc)
yield fc
found_new_lib_func = True
break
for string, count in counter.most_common(): if not found_new_lib_func:
if count <= 1: logger.debug("no update in pass %d, done here", n)
break return
# remove strings that are seen in more than one database
to_remove.add(string)
for db in dbs:
for string in to_remove:
if string in db.metadata_by_string:
del db.metadata_by_string[string]
def open_ida(input_path: Path): def is_thunk_function(fva):
import tempfile f = idaapi.get_func(fva)
return bool(f.flags & idaapi.FUNC_THUNK)
import idapro
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
t.write_bytes(input_path.read_bytes())
# resource leak: we should delete this upon exit
idapro.enable_console_messages(False)
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
import ida_auto
ida_auto.auto_wait()
def get_function_size(fva):
f = idaapi.get_func(fva)
assert f.start_ea == fva
return sum([end_ea - start_ea for (start_ea, end_ea) in idautils.Chunks(fva)])
def main():
logging.basicConfig(level=logging.DEBUG)
# use n=8 to ignore common words def main(argv=None):
N = 8 if argv is None:
argv = sys.argv[1:]
input_path = Path(sys.argv[1]) parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
input_buf = input_path.read_bytes() capa.main.install_common_args(parser, wanted={"input_file"})
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
dbs = capa.analysis.strings.get_default_databases() dbs = capa.analysis.strings.get_default_databases()
prune_databases(dbs, n=N) capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
strings_by_library = collections.defaultdict(set) function_classifications: List[FunctionClassification] = []
for string in extract_strings(input_path.read_bytes(), n=N): with ida_session(args.input_file, use_temp_dir=not args.store_idb):
for db in dbs: with capa.main.timing("FLIRT-based library identification"):
if (metadata := db.metadata_by_string.get(string.s)): # TODO: add more signature (files)
strings_by_library[metadata.library_name].add(string.s) # TOOD: apply more signatures
for flirt_match in capa.analysis.flirt.get_flirt_matches():
function_classifications.append(
FunctionClassification(
va=flirt_match.va,
name=flirt_match.name,
classification=Classification.LIBRARY,
method=Method.FLIRT,
# note: we cannot currently include which signature matched per function via the IDA API
)
)
console = rich.get_console() # thunks
console.print(f"found libraries:", style="bold") for fva in idautils.Functions():
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True): if is_thunk_function(fva):
console.print(f" - [b]{library}[/] ({len(strings)} strings)") function_classifications.append(
FunctionClassification(
va=fva,
name=idaapi.get_func_name(fva),
classification=Classification.LIBRARY,
method=Method.THUNK,
)
)
for string in sorted(strings)[:10]: with capa.main.timing("string-based library identification"):
console.print(f" - {string}", markup=False, style="grey37") for string_match in capa.analysis.strings.get_string_matches(dbs):
function_classifications.append(
FunctionClassification(
va=string_match.va,
name=idaapi.get_func_name(string_match.va),
classification=Classification.LIBRARY,
method=Method.STRINGS,
library_name=string_match.metadata.library_name,
library_version=string_match.metadata.library_version,
)
)
if len(strings) > 10: for va in idautils.Functions():
console.print(" ...", style="grey37") name = idaapi.get_func_name(va)
if name not in {
"WinMain",
"_main",
"main",
}:
continue
if not strings_by_library: function_classifications.append(
console.print(" (none)", style="grey37") FunctionClassification(
# since we're not going to find any strings va=va,
# return early and don't do IDA analysis name=name,
return classification=Classification.USER,
method=Method.ENTRYPOINT,
)
)
# TODO: ensure there are XXX matches for each library, or ignore those entries with capa.main.timing("call graph based library identification"):
for fc in get_library_called_functions(function_classifications):
function_classifications.append(fc)
open_ida(input_path) doc = FunctionIdResults(function_classifications=[], layout=[])
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
for va in idautils.Functions():
if classifications := classifications_by_va.get(va):
doc.function_classifications.extend(classifications)
else:
doc.function_classifications.append(
FunctionClassification(
va=va,
name=idaapi.get_func_name(va),
classification=Classification.UNKNOWN,
method=None,
)
)
doc.layout.append(
BinaryLayout(
va=va,
size=get_function_size(va),
)
)
import idaapi if args.json:
import idautils print(doc.model_dump_json()) # noqa: T201 print found
import ida_funcs
import capa.features.extractors.ida.helpers as ida_helpers
strings_by_function = collections.defaultdict(set) else:
for ea in idautils.Functions(): table = rich.table.Table(
f = idaapi.get_func(ea) "FVA",
"CLASSIFICATION",
"METHOD",
"FNAME",
"EXTRA",
"SIZE"
)
# ignore library functions and thunk functions as identified by IDA classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
if f.flags & idaapi.FUNC_THUNK: size_by_va = {layout.va: layout.size for layout in doc.layout}
continue size_by_classification = collections.defaultdict(int)
if f.flags & idaapi.FUNC_LIB: for va, classifications in classifications_by_va.items():
continue # TODO count of classifications if multiple?
name = ", ".join({c.name for c in classifications})
if "sub_" in name:
name = Text(name, style="grey53")
for bb in ida_helpers.get_function_blocks(f): classification = {c.classification for c in classifications}
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea): method = {c.method for c in classifications if c.method}
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn) extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
if ref == insn.ea: note = {f"{c.note}" for c in classifications if c.note}
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref) table.add_row(
if not string: hex(va),
continue ", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
", ".join(method),
name,
f"{', '.join(extra)} {', '.join(note)}",
f"{size_by_va[va]}",
)
for db in dbs: size_by_classification["-".join(classification)] += size_by_va[va]
if (metadata := db.metadata_by_string.get(string)):
strings_by_function[ea].add(string)
# ensure there are at least XXX functions renamed, or ignore those entries rich.print(table)
console.print("functions:", style="bold") stats_table = rich.table.Table(
for function, strings in sorted(strings_by_function.items()): "ID", rich.table.Column("SIZE", justify="right"), rich.table.Column("%", justify="right")
if strings: )
name = ida_funcs.get_func_name(function) size_all = sum(size_by_classification.values())
for k, s in size_by_classification.items():
console.print(f" [b]{name}[/]@{function:08x}:") stats_table.add_row(k, f"{s:d}", f"{100 * s / size_all:.2f}")
rich.print(stats_table)
for string in strings:
for db in dbs:
if (metadata := db.metadata_by_string.get(string)):
location = Text(f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}", style="grey37")
console.print(" - ", location, ": ", string.rstrip())
# TODO: ensure there aren't conflicts among the matches
console.print()
console.print(f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions")
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

View File

@@ -0,0 +1,2 @@
# temporary extra file to track dependencies of the analysis directory
nltk==3.9.1

View File

@@ -1,10 +1,28 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
further requirements:
- nltk
"""
import gzip import gzip
import pathlib import logging
from typing import Dict, Sequence import collections
from typing import Any, Dict, Mapping
from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
import msgspec import msgspec
import capa.features.extractors.strings
logger = logging.getLogger(__name__)
class LibraryString(msgspec.Struct): class LibraryString(msgspec.Struct):
string: str string: str
@@ -23,7 +41,7 @@ class LibraryStringDatabase:
return len(self.metadata_by_string) return len(self.metadata_by_string)
@classmethod @classmethod
def from_file(cls, path: pathlib.Path) -> "LibraryStringDatabase": def from_file(cls, path: Path) -> "LibraryStringDatabase":
metadata_by_string: Dict[str, LibraryString] = {} metadata_by_string: Dict[str, LibraryString] = {}
decoder = msgspec.json.Decoder(type=LibraryString) decoder = msgspec.json.Decoder(type=LibraryString)
for line in gzip.decompress(path.read_bytes()).split(b"\n"): for line in gzip.decompress(path.read_bytes()).split(b"\n"):
@@ -55,12 +73,12 @@ DEFAULT_FILENAMES = (
"zlib.jsonl.gz", "zlib.jsonl.gz",
) )
DEFAULT_PATHS = tuple( DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
pathlib.Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
) + (pathlib.Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",) )
def get_default_databases() -> Sequence[LibraryStringDatabase]: def get_default_databases() -> list[LibraryStringDatabase]:
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS] return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
@@ -73,9 +91,9 @@ class WindowsApiStringDatabase:
return len(self.dll_names) + len(self.api_names) return len(self.dll_names) + len(self.api_names)
@classmethod @classmethod
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase": def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
dll_names: Set[str] = set() dll_names: set[str] = set()
api_names: Set[str] = set() api_names: set[str] = set()
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines(): for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
if not line: if not line:
@@ -91,5 +109,161 @@ class WindowsApiStringDatabase:
@classmethod @classmethod
def from_defaults(cls) -> "WindowsApiStringDatabase": def from_defaults(cls) -> "WindowsApiStringDatabase":
return cls.from_dir(pathlib.Path(__file__).parent / "data" / "winapi") return cls.from_dir(Path(__file__).parent / "data" / "winapi")
def extract_strings(buf, n=4):
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
"""remove less trustyworthy database entries.
such as:
- those found in multiple databases
- those that are English words
- those that are too short
- Windows API and DLL names
"""
# TODO: consider applying these filters directly to the persisted databases, not at load time.
winapi = WindowsApiStringDatabase.from_defaults()
try:
from nltk.corpus import words as nltk_words
nltk_words.words()
except (ImportError, LookupError):
# one-time download of dataset.
# this probably doesn't work well for embedded use.
import nltk
nltk.download("words")
from nltk.corpus import words as nltk_words
words = set(nltk_words.words())
counter: collections.Counter[str] = collections.Counter()
to_remove = set()
for db in dbs:
for string in db.metadata_by_string.keys():
counter[string] += 1
if string in words:
to_remove.add(string)
continue
if len(string) < n:
to_remove.add(string)
continue
if string in winapi.api_names:
to_remove.add(string)
continue
if string in winapi.dll_names:
to_remove.add(string)
continue
for string, count in counter.most_common():
if count <= 1:
break
# remove strings that are seen in more than one database
to_remove.add(string)
for db in dbs:
for string in to_remove:
if string in db.metadata_by_string:
del db.metadata_by_string[string]
def get_function_strings():
import idaapi
import idautils
import capa.features.extractors.ida.helpers as ida_helpers
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
strings_by_function[ea].add(string)
return strings_by_function
@dataclass
class LibraryStringClassification:
va: int
string: str
library_name: str
metadata: LibraryString
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
if sorted_:
s = sorted(s, key=lambda x: getattr(x, k))
s_by_k = collections.defaultdict(list)
for v in s:
p = getattr(v, k)
s_by_k[p].append(v)
return s_by_k
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
matches: list[LibraryStringClassification] = []
for function, strings in sorted(get_function_strings().items()):
for string in strings:
for db in dbs:
if metadata := db.metadata_by_string.get(string):
matches.append(
LibraryStringClassification(
va=function,
string=string,
library_name=metadata.library_name,
metadata=metadata,
)
)
# if there are less than N strings per library, ignore that library
matches_by_library = create_index(matches, "library_name")
for library_name, library_matches in matches_by_library.items():
if len(library_matches) > 5:
continue
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
matches = [m for m in matches if m.library_name != library_name]
# if there are conflicts within a single function, don't label it
matches_by_function = create_index(matches, "va")
for va, function_matches in matches_by_function.items():
library_names = {m.library_name for m in function_matches}
if len(library_names) == 1:
continue
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
matches = [m for m in matches if m.va != va]
return matches

View File

@@ -0,0 +1,130 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import collections
from pathlib import Path
import rich
from rich.text import Text
import capa.analysis.strings
import capa.features.extractors.strings
import capa.features.extractors.ida.helpers as ida_helpers
logger = logging.getLogger(__name__)
def open_ida(input_path: Path):
import tempfile
import idapro
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
t.write_bytes(input_path.read_bytes())
# resource leak: we should delete this upon exit
idapro.enable_console_messages(False)
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
import ida_auto
ida_auto.auto_wait()
def main():
logging.basicConfig(level=logging.DEBUG)
# use n=8 to ignore common words
N = 8
input_path = Path(sys.argv[1])
dbs = capa.analysis.strings.get_default_databases()
capa.analysis.strings.prune_databases(dbs, n=N)
strings_by_library = collections.defaultdict(set)
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
for db in dbs:
if metadata := db.metadata_by_string.get(string.s):
strings_by_library[metadata.library_name].add(string.s)
console = rich.get_console()
console.print("found libraries:", style="bold")
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
for string in sorted(strings)[:10]:
console.print(f" - {string}", markup=False, style="grey37")
if len(strings) > 10:
console.print(" ...", style="grey37")
if not strings_by_library:
console.print(" (none)", style="grey37")
# since we're not going to find any strings
# return early and don't do IDA analysis
return
open_ida(input_path)
import idaapi
import idautils
import ida_funcs
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
for db in dbs:
if metadata := db.metadata_by_string.get(string):
strings_by_function[ea].add(string)
# ensure there are at least XXX functions renamed, or ignore those entries
console.print("functions:", style="bold")
for function, strings in sorted(strings_by_function.items()):
if strings:
name = ida_funcs.get_func_name(function)
console.print(f" [b]{name}[/]@{function:08x}:")
for string in strings:
for db in dbs:
if metadata := db.metadata_by_string.get(string):
location = Text(
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
style="grey37",
)
console.print(" - ", location, ": ", string.rstrip())
console.print()
console.print(
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
)
if __name__ == "__main__":
main()

View File

@@ -1,52 +0,0 @@
"""
convert from a jh CSV file to a .jsonl.gz OpenSourceString database.
the jh file looks like:
# triplet,compiler,library,version,profile,path,function,type,value
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffff8
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffffe
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,BZ2_bzCompressInit
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,handle_compress
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0x0000fa90
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff8
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff9
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffffd
jh is found here: https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs
"""
import sys
import json
import pathlib
import msgspec
from capa.analysis.strings import LibraryString
p = pathlib.Path(sys.argv[1])
for line in p.read_text().split("\n"):
if not line:
continue
if line.startswith("#"):
continue
triplet, compiler, library, version, profile, path, function, rest = line.split(",", 7)
type, _, value = rest.partition(",")
if type != "string":
continue
if value.startswith('"'):
value = json.loads(value)
s = LibraryString(
string=value,
library_name=library,
library_version=version,
file_path=path,
function_name=function,
)
sys.stdout.buffer.write(msgspec.json.encode(s))
sys.stdout.buffer.write(b"\n")

View File

@@ -77,6 +77,8 @@ dependencies = [
"protobuf>=5", "protobuf>=5",
"msgspec>=0.18.6", "msgspec>=0.18.6",
"xmltodict>=0.13.0", "xmltodict>=0.13.0",
# for library detection (in development)
"nltk>=3",
# --------------------------------------- # ---------------------------------------
# Dependencies that we develop # Dependencies that we develop