mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
dotnet: basic detection and feature extraction (#987)
This commit is contained in:
3
.github/mypy/mypy.ini
vendored
3
.github/mypy/mypy.ini
vendored
@@ -74,3 +74,6 @@ ignore_missing_imports = True
|
||||
|
||||
[mypy-elftools.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-dncil.*]
|
||||
ignore_missing_imports = True
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -48,7 +48,7 @@ jobs:
|
||||
- name: Checkout capa with submodules
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: true
|
||||
submodules: recursive
|
||||
- name: Set up Python 3.8
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
@@ -78,7 +78,7 @@ jobs:
|
||||
- name: Checkout capa with submodules
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: true
|
||||
submodules: recursive
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
- add new feature "operand[{0, 1, 2}].number" for matching instruction operand immediate values #767 @williballenthin
|
||||
- add new feature "operand[{0, 1, 2}].offset" for matching instruction operand offsets #767 @williballenthin
|
||||
- extract additional offset/number features in certain circumstances #320 @williballenthin
|
||||
- add detection and basic feature extraction for dotnet #987 @mr-tz, @mike-hunhoff, @williballenthin
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
14
capa/exceptions.py
Normal file
14
capa/exceptions.py
Normal file
@@ -0,0 +1,14 @@
|
||||
class UnsupportedRuntimeError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedFormatError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedArchError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedOSError(ValueError):
|
||||
pass
|
||||
@@ -390,7 +390,9 @@ class Bytes(Feature):
|
||||
# other candidates here: https://docs.microsoft.com/en-us/windows/win32/debug/pe-format#machine-types
|
||||
ARCH_I386 = "i386"
|
||||
ARCH_AMD64 = "amd64"
|
||||
VALID_ARCH = (ARCH_I386, ARCH_AMD64)
|
||||
# dotnet
|
||||
ARCH_ANY = "any"
|
||||
VALID_ARCH = (ARCH_I386, ARCH_AMD64, ARCH_ANY)
|
||||
|
||||
|
||||
class Arch(Feature):
|
||||
@@ -402,8 +404,10 @@ class Arch(Feature):
|
||||
OS_WINDOWS = "windows"
|
||||
OS_LINUX = "linux"
|
||||
OS_MACOS = "macos"
|
||||
# dotnet
|
||||
OS_ANY = "any"
|
||||
VALID_OS = {os.value for os in capa.features.extractors.elf.OS}
|
||||
VALID_OS.update({OS_WINDOWS, OS_LINUX, OS_MACOS})
|
||||
VALID_OS.update({OS_WINDOWS, OS_LINUX, OS_MACOS, OS_ANY})
|
||||
|
||||
|
||||
class OS(Feature):
|
||||
@@ -414,7 +418,14 @@ class OS(Feature):
|
||||
|
||||
FORMAT_PE = "pe"
|
||||
FORMAT_ELF = "elf"
|
||||
VALID_FORMAT = (FORMAT_PE, FORMAT_ELF)
|
||||
FORMAT_DOTNET = "dotnet"
|
||||
VALID_FORMAT = (FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET)
|
||||
# internal only, not to be used in rules
|
||||
FORMAT_AUTO = "auto"
|
||||
FORMAT_SC32 = "sc32"
|
||||
FORMAT_SC64 = "sc64"
|
||||
FORMAT_FREEZE = "freeze"
|
||||
FORMAT_UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class Format(Feature):
|
||||
|
||||
@@ -8,7 +8,8 @@ import pefile
|
||||
import capa.features
|
||||
import capa.features.extractors.elf
|
||||
import capa.features.extractors.pefile
|
||||
from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, Arch, Format, String
|
||||
from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String
|
||||
from capa.features.freeze import is_freeze
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -29,6 +30,8 @@ def extract_format(buf):
|
||||
yield Format(FORMAT_PE), 0x0
|
||||
elif buf.startswith(b"\x7fELF"):
|
||||
yield Format(FORMAT_ELF), 0x0
|
||||
elif is_freeze(buf):
|
||||
yield Format(FORMAT_FREEZE), 0x0
|
||||
else:
|
||||
# we likely end up here:
|
||||
# 1. handling a file format (e.g. macho)
|
||||
|
||||
0
capa/features/extractors/dnfile/__init__.py
Normal file
0
capa/features/extractors/dnfile/__init__.py
Normal file
70
capa/features/extractors/dnfile/extractor.py
Normal file
70
capa/features/extractors/dnfile/extractor.py
Normal file
@@ -0,0 +1,70 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any, List, Tuple
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from capa.features.common import Feature
|
||||
|
||||
import dnfile
|
||||
|
||||
import capa.features.extractors
|
||||
import capa.features.extractors.dnfile.file
|
||||
import capa.features.extractors.dnfile.insn
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor
|
||||
from capa.features.extractors.dnfile.helpers import get_dotnet_managed_method_bodies
|
||||
|
||||
|
||||
class DnfileFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, path: str):
|
||||
super(DnfileFeatureExtractor, self).__init__()
|
||||
self.pe: dnfile.dnPE = dnfile.dnPE(path)
|
||||
|
||||
# pre-compute these because we'll yield them at *every* scope.
|
||||
self.global_features: List[Tuple[Feature, int]] = []
|
||||
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe))
|
||||
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe))
|
||||
|
||||
def get_base_address(self):
|
||||
return 0x0
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
def extract_file_features(self):
|
||||
yield from capa.features.extractors.dnfile.file.extract_features(self.pe)
|
||||
|
||||
def get_functions(self):
|
||||
# data structure shared across functions yielded here.
|
||||
# useful for caching analysis relevant across a single workspace.
|
||||
ctx = {}
|
||||
ctx["pe"] = self.pe
|
||||
|
||||
for f in get_dotnet_managed_method_bodies(self.pe):
|
||||
setattr(f, "ctx", ctx)
|
||||
yield f
|
||||
|
||||
def extract_function_features(self, f):
|
||||
# TODO
|
||||
yield from []
|
||||
|
||||
def get_basic_blocks(self, f):
|
||||
# each dotnet method is considered 1 basic block
|
||||
yield f
|
||||
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
# we don't support basic block features
|
||||
yield from []
|
||||
|
||||
def get_instructions(self, f, bb):
|
||||
yield from f.instructions
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
yield from capa.features.extractors.dnfile.insn.extract_features(f, bb, insn)
|
||||
40
capa/features/extractors/dnfile/file.py
Normal file
40
capa/features/extractors/dnfile/file.py
Normal file
@@ -0,0 +1,40 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Tuple, Iterator
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import dnfile
|
||||
from capa.features.common import Feature, Format
|
||||
from capa.features.file import Import
|
||||
|
||||
import capa.features.extractors
|
||||
|
||||
|
||||
def extract_file_import_names(pe: dnfile.dnPE) -> Iterator[Tuple[Import, int]]:
|
||||
yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe)
|
||||
|
||||
|
||||
def extract_file_format(pe: dnfile.dnPE) -> Iterator[Tuple[Format, int]]:
|
||||
yield from capa.features.extractors.dotnetfile.extract_file_format(pe=pe)
|
||||
|
||||
|
||||
def extract_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for (feature, token) in file_handler(pe):
|
||||
yield feature, token
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_import_names,
|
||||
# TODO extract_file_strings,
|
||||
# TODO extract_file_function_names,
|
||||
extract_file_format,
|
||||
)
|
||||
169
capa/features/extractors/dnfile/helpers.py
Normal file
169
capa/features/extractors/dnfile/helpers.py
Normal file
@@ -0,0 +1,169 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Tuple, Iterator, Optional
|
||||
|
||||
import dnfile
|
||||
from dncil.cil.body import CilMethodBody
|
||||
from dncil.cil.error import MethodBodyFormatError
|
||||
from dncil.clr.token import Token, StringToken, InvalidToken
|
||||
from dncil.cil.body.reader import CilMethodBodyReaderBase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# key indexes to dotnet metadata tables
|
||||
DOTNET_META_TABLES_BY_INDEX = {table.value: table.name for table in dnfile.enums.MetadataTables}
|
||||
|
||||
|
||||
class DnfileMethodBodyReader(CilMethodBodyReaderBase):
|
||||
def __init__(self, pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow):
|
||||
self.pe: dnfile.dnPE = pe
|
||||
self.offset: int = self.pe.get_offset_from_rva(row.Rva)
|
||||
|
||||
def read(self, n: int) -> bytes:
|
||||
data: bytes = self.pe.get_data(self.pe.get_rva_from_offset(self.offset), n)
|
||||
self.offset += n
|
||||
return data
|
||||
|
||||
def tell(self) -> int:
|
||||
return self.offset
|
||||
|
||||
def seek(self, offset: int) -> int:
|
||||
self.offset = offset
|
||||
return self.offset
|
||||
|
||||
|
||||
def calculate_dotnet_token_value(table: int, rid: int) -> int:
|
||||
return ((table & 0xFF) << Token.TABLE_SHIFT) | (rid & Token.RID_MASK)
|
||||
|
||||
|
||||
def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Any:
|
||||
"""map generic token to string or table row"""
|
||||
if isinstance(token, StringToken):
|
||||
user_string: Optional[str] = read_dotnet_user_string(pe, token)
|
||||
if user_string is None:
|
||||
return InvalidToken(token.value)
|
||||
return user_string
|
||||
|
||||
table_name: str = DOTNET_META_TABLES_BY_INDEX.get(token.table, "")
|
||||
if not table_name:
|
||||
# table_index is not valid
|
||||
return InvalidToken(token.value)
|
||||
|
||||
table: Any = getattr(pe.net.mdtables, table_name, None)
|
||||
if table is None:
|
||||
# table index is valid but table is not present
|
||||
return InvalidToken(token.value)
|
||||
|
||||
try:
|
||||
return table.rows[token.rid - 1]
|
||||
except IndexError:
|
||||
# table index is valid but row index is not valid
|
||||
return InvalidToken(token.value)
|
||||
|
||||
|
||||
def read_dotnet_method_body(pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow) -> Optional[CilMethodBody]:
|
||||
"""read dotnet method body"""
|
||||
try:
|
||||
return CilMethodBody(DnfileMethodBodyReader(pe, row))
|
||||
except MethodBodyFormatError as e:
|
||||
logger.warn("failed to parse managed method body @ 0x%08x (%s)" % (row.Rva, e))
|
||||
return None
|
||||
|
||||
|
||||
def read_dotnet_user_string(pe: dnfile.dnPE, token: StringToken) -> Optional[str]:
|
||||
"""read user string from #US stream"""
|
||||
try:
|
||||
user_string: Optional[dnfile.stream.UserString] = pe.net.user_strings.get_us(token.rid)
|
||||
except UnicodeDecodeError as e:
|
||||
logger.warn("failed to decode #US stream index 0x%08x (%s)" % (token.rid, e))
|
||||
return None
|
||||
if user_string is None:
|
||||
return None
|
||||
return user_string.value
|
||||
|
||||
|
||||
def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
|
||||
"""get managed imports from MemberRef table
|
||||
|
||||
see https://www.ntcore.com/files/dotnetformat.htm
|
||||
|
||||
10 - MemberRef Table
|
||||
Each row represents an imported method
|
||||
Class (index into the TypeRef, ModuleRef, MethodDef, TypeSpec or TypeDef tables)
|
||||
Name (index into String heap)
|
||||
01 - TypeRef Table
|
||||
Each row represents an imported class, its namespace and the assembly which contains it
|
||||
TypeName (index into String heap)
|
||||
TypeNamespace (index into String heap)
|
||||
"""
|
||||
if not hasattr(pe.net.mdtables, "MemberRef"):
|
||||
return
|
||||
|
||||
for (rid, row) in enumerate(pe.net.mdtables.MemberRef):
|
||||
if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow,)):
|
||||
continue
|
||||
|
||||
token: int = calculate_dotnet_token_value(dnfile.enums.MetadataTables.MemberRef.value, rid + 1)
|
||||
# like System.IO.File::OpenRead
|
||||
imp: str = f"{row.Class.row.TypeNamespace}.{row.Class.row.TypeName}::{row.Name}"
|
||||
|
||||
yield token, imp
|
||||
|
||||
|
||||
def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
|
||||
"""get unmanaged imports from ImplMap table
|
||||
|
||||
see https://www.ntcore.com/files/dotnetformat.htm
|
||||
|
||||
28 - ImplMap Table
|
||||
ImplMap table holds information about unmanaged methods that can be reached from managed code, using PInvoke dispatch
|
||||
MemberForwarded (index into the Field or MethodDef table; more precisely, a MemberForwarded coded index)
|
||||
ImportName (index into the String heap)
|
||||
ImportScope (index into the ModuleRef table)
|
||||
"""
|
||||
if not hasattr(pe.net.mdtables, "ImplMap"):
|
||||
return
|
||||
|
||||
for row in pe.net.mdtables.ImplMap:
|
||||
dll: str = row.ImportScope.row.Name
|
||||
symbol: str = row.ImportName
|
||||
|
||||
# ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the
|
||||
# name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded
|
||||
# MethodDef table token to help us later record native import method calls made from CIL
|
||||
token: int = calculate_dotnet_token_value(row.MemberForwarded.table.number, row.MemberForwarded.row_index)
|
||||
|
||||
# like Kernel32.dll
|
||||
if dll and "." in dll:
|
||||
dll = dll.split(".")[0]
|
||||
|
||||
# like kernel32.CreateFileA
|
||||
imp: str = f"{dll}.{symbol}"
|
||||
|
||||
yield token, imp
|
||||
|
||||
|
||||
def get_dotnet_managed_method_bodies(pe: dnfile.dnPE) -> Iterator[CilMethodBody]:
|
||||
"""get managed methods from MethodDef table"""
|
||||
if not hasattr(pe.net.mdtables, "MethodDef"):
|
||||
return
|
||||
|
||||
for row in pe.net.mdtables.MethodDef:
|
||||
if not row.ImplFlags.miIL or any((row.Flags.mdAbstract, row.Flags.mdPinvokeImpl)):
|
||||
# skip methods that do not have a method body
|
||||
continue
|
||||
|
||||
body: Optional[CilMethodBody] = read_dotnet_method_body(pe, row)
|
||||
if body is None:
|
||||
continue
|
||||
|
||||
yield body
|
||||
96
capa/features/extractors/dnfile/insn.py
Normal file
96
capa/features/extractors/dnfile/insn.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Tuple, Iterator, Optional
|
||||
from itertools import chain
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from dncil.cil.instruction import Instruction
|
||||
from dncil.cil.body import CilMethodBody
|
||||
from capa.features.common import Feature
|
||||
|
||||
from dncil.clr.token import StringToken
|
||||
from dncil.cil.opcode import OpCodes
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.insn import API, Number
|
||||
from capa.features.common import String
|
||||
from capa.features.extractors.dnfile.helpers import (
|
||||
read_dotnet_user_string,
|
||||
get_dotnet_managed_imports,
|
||||
get_dotnet_unmanaged_imports,
|
||||
)
|
||||
|
||||
|
||||
def get_imports(ctx: Dict) -> Dict:
|
||||
if "imports_cache" not in ctx:
|
||||
ctx["imports_cache"] = {
|
||||
token: imp
|
||||
for (token, imp) in chain(get_dotnet_managed_imports(ctx["pe"]), get_dotnet_unmanaged_imports(ctx["pe"]))
|
||||
}
|
||||
return ctx["imports_cache"]
|
||||
|
||||
|
||||
def extract_insn_api_features(f: CilMethodBody, bb: CilMethodBody, insn: Instruction) -> Iterator[Tuple[API, int]]:
|
||||
"""parse instruction API features"""
|
||||
if insn.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli):
|
||||
return
|
||||
|
||||
name: str = get_imports(f.ctx).get(insn.operand.value, "")
|
||||
if not name:
|
||||
return
|
||||
|
||||
if "::" in name:
|
||||
# like System.IO.File::OpenRead
|
||||
yield API(name), insn.offset
|
||||
else:
|
||||
# like kernel32.CreateFileA
|
||||
dll, _, symbol = name.rpartition(".")
|
||||
for name_variant in capa.features.extractors.helpers.generate_symbols(dll, symbol):
|
||||
yield API(name_variant), insn.offset
|
||||
|
||||
|
||||
def extract_insn_number_features(
|
||||
f: CilMethodBody, bb: CilMethodBody, insn: Instruction
|
||||
) -> Iterator[Tuple[Number, int]]:
|
||||
"""parse instruction number features"""
|
||||
if insn.is_ldc():
|
||||
yield Number(insn.get_ldc()), insn.offset
|
||||
|
||||
|
||||
def extract_insn_string_features(
|
||||
f: CilMethodBody, bb: CilMethodBody, insn: Instruction
|
||||
) -> Iterator[Tuple[String, int]]:
|
||||
"""parse instruction string features"""
|
||||
if not insn.is_ldstr():
|
||||
return
|
||||
|
||||
if not isinstance(insn.operand, StringToken):
|
||||
return
|
||||
|
||||
user_string: Optional[str] = read_dotnet_user_string(f.ctx["pe"], insn.operand)
|
||||
if user_string is None:
|
||||
return
|
||||
|
||||
yield String(user_string), insn.offset
|
||||
|
||||
|
||||
def extract_features(f: CilMethodBody, bb: CilMethodBody, insn: Instruction) -> Iterator[Tuple[Feature, int]]:
|
||||
"""extract instruction features"""
|
||||
for inst_handler in INSTRUCTION_HANDLERS:
|
||||
for (feature, offset) in inst_handler(f, bb, insn):
|
||||
yield feature, offset
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
extract_insn_number_features,
|
||||
extract_insn_string_features,
|
||||
)
|
||||
129
capa/features/extractors/dotnetfile.py
Normal file
129
capa/features/extractors/dotnetfile.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import logging
|
||||
from typing import Tuple, Iterator
|
||||
from itertools import chain
|
||||
|
||||
import dnfile
|
||||
import pefile
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.file import Import
|
||||
from capa.features.common import OS, OS_ANY, ARCH_ANY, ARCH_I386, ARCH_AMD64, FORMAT_DOTNET, Arch, Format, Feature
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor
|
||||
from capa.features.extractors.dnfile.helpers import get_dotnet_managed_imports, get_dotnet_unmanaged_imports
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_file_format(**kwargs) -> Iterator[Tuple[Format, int]]:
|
||||
yield Format(FORMAT_DOTNET), 0x0
|
||||
|
||||
|
||||
def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Import, int]]:
|
||||
for (token, imp) in chain(get_dotnet_managed_imports(pe), get_dotnet_unmanaged_imports(pe)):
|
||||
if "::" in imp:
|
||||
# like System.IO.File::OpenRead
|
||||
yield Import(imp), token
|
||||
else:
|
||||
# like kernel32.CreateFileA
|
||||
dll, _, symbol = imp.rpartition(".")
|
||||
for symbol_variant in capa.features.extractors.helpers.generate_symbols(dll, symbol):
|
||||
yield Import(symbol_variant), token
|
||||
|
||||
|
||||
def extract_file_os(**kwargs) -> Iterator[Tuple[OS, int]]:
|
||||
yield OS(OS_ANY), 0x0
|
||||
|
||||
|
||||
def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Arch, int]]:
|
||||
# to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020
|
||||
# .NET 4.5 added option: any CPU, 32-bit preferred
|
||||
if pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE:
|
||||
yield Arch(ARCH_I386), 0x0
|
||||
elif not pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS:
|
||||
yield Arch(ARCH_AMD64), 0x0
|
||||
else:
|
||||
yield Arch(ARCH_ANY), 0x0
|
||||
|
||||
|
||||
def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(pe=pe): # type: ignore
|
||||
yield feature, va
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_import_names,
|
||||
# TODO extract_file_strings,
|
||||
# TODO extract_file_function_names,
|
||||
extract_file_format,
|
||||
)
|
||||
|
||||
|
||||
def extract_global_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
|
||||
for handler in GLOBAL_HANDLERS:
|
||||
for feature, va in handler(pe=pe): # type: ignore
|
||||
yield feature, va
|
||||
|
||||
|
||||
GLOBAL_HANDLERS = (
|
||||
extract_file_os,
|
||||
extract_file_arch,
|
||||
)
|
||||
|
||||
|
||||
class DotnetFileFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, path: str):
|
||||
super(DotnetFileFeatureExtractor, self).__init__()
|
||||
self.path: str = path
|
||||
self.pe: dnfile.dnPE = dnfile.dnPE(path)
|
||||
|
||||
def get_base_address(self) -> int:
|
||||
return 0x0
|
||||
|
||||
def get_entry_point(self) -> int:
|
||||
# self.pe.net.Flags.CLT_NATIVE_ENTRYPOINT
|
||||
# True: native EP: Token
|
||||
# False: managed EP: RVA
|
||||
return self.pe.net.struct.EntryPointTokenOrRva
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from extract_global_features(self.pe)
|
||||
|
||||
def extract_file_features(self):
|
||||
yield from extract_file_features(self.pe)
|
||||
|
||||
def is_dotnet_file(self) -> bool:
|
||||
return bool(self.pe.net)
|
||||
|
||||
def is_mixed_mode(self) -> bool:
|
||||
return not bool(self.pe.net.Flags.CLR_ILONLY)
|
||||
|
||||
def get_runtime_version(self) -> Tuple[int, int]:
|
||||
return self.pe.net.struct.MajorRuntimeVersion, self.pe.net.struct.MinorRuntimeVersion
|
||||
|
||||
def get_meta_version_string(self) -> str:
|
||||
return self.pe.net.metadata.struct.Version.rstrip(b"\x00").decode("utf-8")
|
||||
|
||||
def get_functions(self):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def extract_function_features(self, f):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def get_basic_blocks(self, f):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def get_instructions(self, f, bb):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def is_library_function(self, va):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
|
||||
def get_function_name(self, va):
|
||||
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")
|
||||
@@ -51,6 +51,9 @@ def generate_symbols(dll: str, symbol: str) -> Iterator[str]:
|
||||
- CreateFileA
|
||||
- CreateFile
|
||||
"""
|
||||
# normalize dll name
|
||||
dll = dll.lower()
|
||||
|
||||
# kernel32.CreateFileA
|
||||
yield "%s.%s" % (dll, symbol)
|
||||
|
||||
|
||||
@@ -53,12 +53,12 @@ import zlib
|
||||
import logging
|
||||
from typing import Dict, Type
|
||||
|
||||
import capa.helpers
|
||||
import capa.features.file
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
import capa.features.basicblock
|
||||
import capa.features.extractors.base_extractor
|
||||
from capa.helpers import hex
|
||||
from capa.features.common import Feature
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -87,6 +87,7 @@ def dumps(extractor):
|
||||
returns:
|
||||
str: the serialized features.
|
||||
"""
|
||||
hex = capa.helpers.hex
|
||||
ret = {
|
||||
"version": 1,
|
||||
"base address": extractor.get_base_address(),
|
||||
|
||||
@@ -13,11 +13,6 @@ from capa.features.common import Feature
|
||||
|
||||
class API(Feature):
|
||||
def __init__(self, name: str, description=None):
|
||||
# Downcase library name if given
|
||||
if "." in name:
|
||||
modname, _, impname = name.rpartition(".")
|
||||
name = modname.lower() + "." + impname
|
||||
|
||||
super(API, self).__init__(name, description=description)
|
||||
|
||||
|
||||
|
||||
@@ -5,10 +5,20 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import NoReturn
|
||||
|
||||
from capa.exceptions import UnsupportedFormatError
|
||||
from capa.features.common import FORMAT_SC32, FORMAT_SC64, FORMAT_UNKNOWN
|
||||
from capa.features.extractors.common import extract_format
|
||||
|
||||
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
|
||||
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
|
||||
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
_hex = hex
|
||||
|
||||
|
||||
@@ -35,3 +45,72 @@ def is_runtime_ida():
|
||||
|
||||
def assert_never(value: NoReturn) -> NoReturn:
|
||||
assert False, f"Unhandled value: {value} ({type(value).__name__})"
|
||||
|
||||
|
||||
def get_format_from_extension(sample: str) -> str:
|
||||
if sample.endswith(EXTENSIONS_SHELLCODE_32):
|
||||
return FORMAT_SC32
|
||||
elif sample.endswith(EXTENSIONS_SHELLCODE_64):
|
||||
return FORMAT_SC64
|
||||
return FORMAT_UNKNOWN
|
||||
|
||||
|
||||
def get_auto_format(path: str) -> str:
|
||||
format_ = get_format(path)
|
||||
if format_ == FORMAT_UNKNOWN:
|
||||
format_ = get_format_from_extension(path)
|
||||
if format_ == FORMAT_UNKNOWN:
|
||||
raise UnsupportedFormatError()
|
||||
return format_
|
||||
|
||||
|
||||
def get_format(sample: str) -> str:
|
||||
with open(sample, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
for feature, _ in extract_format(buf):
|
||||
assert isinstance(feature.value, str)
|
||||
return feature.value
|
||||
|
||||
return FORMAT_UNKNOWN
|
||||
|
||||
|
||||
def log_unsupported_format_error():
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE or ELF file.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
|
||||
)
|
||||
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
|
||||
logger.error("-" * 80)
|
||||
|
||||
|
||||
def log_unsupported_os_error():
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to target a supported OS.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux)."
|
||||
)
|
||||
logger.error("-" * 80)
|
||||
|
||||
|
||||
def log_unsupported_arch_error():
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to target a supported architecture.")
|
||||
logger.error(" ")
|
||||
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
|
||||
logger.error("-" * 80)
|
||||
|
||||
|
||||
def log_unsupported_runtime_error():
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Unsupported runtime or Python interpreter.")
|
||||
logger.error(" ")
|
||||
logger.error(" capa supports running under Python 3.7 and higher.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" If you're seeing this message on the command line, please ensure you're running a supported Python version."
|
||||
)
|
||||
logger.error("-" * 80)
|
||||
|
||||
209
capa/main.py
209
capa/main.py
@@ -42,17 +42,34 @@ import capa.features.extractors
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.elffile
|
||||
import capa.features.extractors.dotnetfile
|
||||
from capa.rules import Rule, Scope, RuleSet
|
||||
from capa.engine import FeatureSet, MatchResults
|
||||
from capa.helpers import get_file_taste
|
||||
from capa.helpers import (
|
||||
get_format,
|
||||
get_file_taste,
|
||||
get_auto_format,
|
||||
log_unsupported_os_error,
|
||||
log_unsupported_arch_error,
|
||||
log_unsupported_format_error,
|
||||
)
|
||||
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError
|
||||
from capa.features.common import (
|
||||
FORMAT_PE,
|
||||
FORMAT_ELF,
|
||||
FORMAT_AUTO,
|
||||
FORMAT_SC32,
|
||||
FORMAT_SC64,
|
||||
FORMAT_DOTNET,
|
||||
FORMAT_FREEZE,
|
||||
)
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
|
||||
BACKEND_VIV = "vivisect"
|
||||
BACKEND_SMDA = "smda"
|
||||
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
|
||||
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
|
||||
BACKEND_DOTNET = "dotnet"
|
||||
|
||||
E_MISSING_RULES = -10
|
||||
E_MISSING_FILE = -11
|
||||
@@ -287,6 +304,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
return matches, meta
|
||||
|
||||
|
||||
# TODO move all to helpers?
|
||||
def has_rule_with_namespace(rules, capabilities, rule_cat):
|
||||
for rule_name in capabilities.keys():
|
||||
if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat):
|
||||
@@ -334,17 +352,6 @@ def is_supported_format(sample: str) -> bool:
|
||||
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
|
||||
|
||||
|
||||
def get_format(sample: str) -> str:
|
||||
with open(sample, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
for feature, _ in capa.features.extractors.common.extract_format(buf):
|
||||
assert isinstance(feature.value, str)
|
||||
return feature.value
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
def is_supported_arch(sample: str) -> bool:
|
||||
with open(sample, "rb") as f:
|
||||
buf = f.read()
|
||||
@@ -433,19 +440,7 @@ def get_default_signatures() -> List[str]:
|
||||
return ret
|
||||
|
||||
|
||||
class UnsupportedFormatError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedArchError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedOSError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def get_workspace(path, format, sigpaths):
|
||||
def get_workspace(path, format_, sigpaths):
|
||||
"""
|
||||
load the program at the given path into a vivisect workspace using the given format.
|
||||
also apply the given FLIRT signatures.
|
||||
@@ -465,21 +460,22 @@ def get_workspace(path, format, sigpaths):
|
||||
import viv_utils
|
||||
|
||||
logger.debug("generating vivisect workspace for: %s", path)
|
||||
if format == "auto":
|
||||
# TODO should not be auto at this point, anymore
|
||||
if format_ == FORMAT_AUTO:
|
||||
if not is_supported_format(path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
# don't analyze, so that we can add our Flirt function analyzer first.
|
||||
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
|
||||
elif format in {"pe", "elf"}:
|
||||
elif format_ in {FORMAT_PE, FORMAT_ELF}:
|
||||
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
|
||||
elif format == "sc32":
|
||||
elif format_ == FORMAT_SC32:
|
||||
# these are not analyzed nor saved.
|
||||
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False)
|
||||
elif format == "sc64":
|
||||
elif format_ == FORMAT_SC64:
|
||||
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False)
|
||||
else:
|
||||
raise ValueError("unexpected format: " + format)
|
||||
raise ValueError("unexpected format: " + format_)
|
||||
|
||||
viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths)
|
||||
|
||||
@@ -489,12 +485,9 @@ def get_workspace(path, format, sigpaths):
|
||||
return vw
|
||||
|
||||
|
||||
class UnsupportedRuntimeError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
# TODO get_extractors -> List[FeatureExtractor]?
|
||||
def get_extractor(
|
||||
path: str, format: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
|
||||
path: str, format_: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
|
||||
) -> FeatureExtractor:
|
||||
"""
|
||||
raises:
|
||||
@@ -502,7 +495,7 @@ def get_extractor(
|
||||
UnsupportedArchError
|
||||
UnsupportedOSError
|
||||
"""
|
||||
if format not in ("sc32", "sc64"):
|
||||
if format_ not in (FORMAT_SC32, FORMAT_SC64):
|
||||
if not is_supported_format(path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
@@ -512,6 +505,11 @@ def get_extractor(
|
||||
if not is_supported_os(path):
|
||||
raise UnsupportedOSError()
|
||||
|
||||
if format_ == FORMAT_DOTNET:
|
||||
import capa.features.extractors.dnfile.extractor
|
||||
|
||||
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
|
||||
|
||||
if backend == "smda":
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
@@ -530,7 +528,7 @@ def get_extractor(
|
||||
import capa.features.extractors.viv.extractor
|
||||
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
vw = get_workspace(path, format, sigpaths)
|
||||
vw = get_workspace(path, format_, sigpaths)
|
||||
|
||||
if should_save_workspace:
|
||||
logger.debug("saving workspace")
|
||||
@@ -545,6 +543,22 @@ def get_extractor(
|
||||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
|
||||
|
||||
|
||||
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
|
||||
file_extractors: List[FeatureExtractor] = list()
|
||||
|
||||
if format_ == capa.features.extractors.common.FORMAT_PE:
|
||||
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample))
|
||||
|
||||
dotnetfile_extractor = capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(sample)
|
||||
if dotnetfile_extractor.is_dotnet_file():
|
||||
file_extractors.append(dotnetfile_extractor)
|
||||
|
||||
elif format_ == capa.features.extractors.common.FORMAT_ELF:
|
||||
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample))
|
||||
|
||||
return file_extractors
|
||||
|
||||
|
||||
def is_nursery_rule_path(path: str) -> bool:
|
||||
"""
|
||||
The nursery is a spot for rules that have not yet been fully polished.
|
||||
@@ -653,7 +667,7 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
|
||||
if rules_path != [RULES_PATH_DEFAULT_STRING]:
|
||||
rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]
|
||||
|
||||
format = get_format(sample_path)
|
||||
format_ = get_format(sample_path)
|
||||
arch = get_arch(sample_path)
|
||||
os_ = get_os(sample_path)
|
||||
|
||||
@@ -668,7 +682,7 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
|
||||
"path": os.path.normpath(sample_path),
|
||||
},
|
||||
"analysis": {
|
||||
"format": format,
|
||||
"format": format_,
|
||||
"arch": arch,
|
||||
"os": os_,
|
||||
"extractor": extractor.__class__.__name__,
|
||||
@@ -783,19 +797,20 @@ def install_common_args(parser, wanted=None):
|
||||
|
||||
if "format" in wanted:
|
||||
formats = [
|
||||
("auto", "(default) detect file type automatically"),
|
||||
("pe", "Windows PE file"),
|
||||
("elf", "Executable and Linkable Format"),
|
||||
("sc32", "32-bit shellcode"),
|
||||
("sc64", "64-bit shellcode"),
|
||||
("freeze", "features previously frozen by capa"),
|
||||
(FORMAT_AUTO, "(default) detect file type automatically"),
|
||||
(FORMAT_PE, "Windows PE file"),
|
||||
(FORMAT_DOTNET, ".NET PE file"),
|
||||
(FORMAT_ELF, "Executable and Linkable Format"),
|
||||
(FORMAT_SC32, "32-bit shellcode"),
|
||||
(FORMAT_SC64, "64-bit shellcode"),
|
||||
(FORMAT_FREEZE, "features previously frozen by capa"),
|
||||
]
|
||||
format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--format",
|
||||
choices=[f[0] for f in formats],
|
||||
default="auto",
|
||||
default=FORMAT_AUTO,
|
||||
help="select sample format, %s" % format_help,
|
||||
)
|
||||
|
||||
@@ -974,13 +989,21 @@ def main(argv=None):
|
||||
return ret
|
||||
|
||||
try:
|
||||
taste = get_file_taste(args.sample)
|
||||
_ = get_file_taste(args.sample)
|
||||
except IOError as e:
|
||||
# per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
|
||||
# handle the IOError separately and reach into the args
|
||||
logger.error("%s", e.args[0])
|
||||
return E_MISSING_FILE
|
||||
|
||||
format_ = args.format
|
||||
if format_ == FORMAT_AUTO:
|
||||
try:
|
||||
format_ = get_auto_format(args.sample)
|
||||
except UnsupportedFormatError:
|
||||
log_unsupported_format_error()
|
||||
return E_INVALID_FILE_TYPE
|
||||
|
||||
try:
|
||||
rules = get_rules(args.rules, disable_progress=args.quiet)
|
||||
rules = capa.rules.RuleSet(rules)
|
||||
@@ -1002,26 +1025,23 @@ def main(argv=None):
|
||||
logger.error("%s", str(e))
|
||||
return E_INVALID_RULE
|
||||
|
||||
file_extractor = None
|
||||
if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
|
||||
# these pefile and elffile file feature extractors are pretty light weight: they don't do any code analysis.
|
||||
# so we can fairly quickly determine if the given file has "pure" file-scope rules
|
||||
# that indicate a limitation (like "file is packed based on section names")
|
||||
# and avoid doing a full code analysis on difficult/impossible binaries.
|
||||
try:
|
||||
file_extractor = capa.features.extractors.pefile.PefileFeatureExtractor(args.sample)
|
||||
except PEFormatError as e:
|
||||
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
|
||||
return E_CORRUPT_FILE
|
||||
# file feature extractors are pretty lightweight: they don't do any code analysis.
|
||||
# so we can fairly quickly determine if the given file has "pure" file-scope rules
|
||||
# that indicate a limitation (like "file is packed based on section names")
|
||||
# and avoid doing a full code analysis on difficult/impossible binaries.
|
||||
#
|
||||
# this pass can inspect multiple file extractors, e.g., dotnet and pe to identify
|
||||
# various limitations
|
||||
try:
|
||||
file_extractors = get_file_extractors(args.sample, format_)
|
||||
except PEFormatError as e:
|
||||
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
|
||||
return E_CORRUPT_FILE
|
||||
except (ELFError, OverflowError) as e:
|
||||
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
|
||||
return E_CORRUPT_FILE
|
||||
|
||||
elif args.format == "elf" or (args.format == "auto" and taste.startswith(b"\x7fELF")):
|
||||
try:
|
||||
file_extractor = capa.features.extractors.elffile.ElfFeatureExtractor(args.sample)
|
||||
except (ELFError, OverflowError) as e:
|
||||
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
|
||||
return E_CORRUPT_FILE
|
||||
|
||||
if file_extractor:
|
||||
for file_extractor in file_extractors:
|
||||
try:
|
||||
pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
|
||||
except PEFormatError as e:
|
||||
@@ -1040,58 +1060,37 @@ def main(argv=None):
|
||||
logger.debug("file limitation short circuit, won't analyze fully.")
|
||||
return E_FILE_LIMITATION
|
||||
|
||||
try:
|
||||
if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
|
||||
sig_paths = get_signatures(args.signatures)
|
||||
else:
|
||||
sig_paths = []
|
||||
logger.debug("skipping library code matching: only have PE signatures")
|
||||
except (IOError) as e:
|
||||
logger.error("%s", str(e))
|
||||
return E_INVALID_SIG
|
||||
if isinstance(file_extractor, capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor):
|
||||
format_ = FORMAT_DOTNET
|
||||
|
||||
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
|
||||
format = "freeze"
|
||||
if format_ == FORMAT_FREEZE:
|
||||
with open(args.sample, "rb") as f:
|
||||
extractor = capa.features.freeze.load(f.read())
|
||||
else:
|
||||
format = args.format
|
||||
if format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_32):
|
||||
format = "sc32"
|
||||
elif format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_64):
|
||||
format = "sc64"
|
||||
try:
|
||||
if format_ == FORMAT_PE:
|
||||
sig_paths = get_signatures(args.signatures)
|
||||
else:
|
||||
sig_paths = []
|
||||
logger.debug("skipping library code matching: only have native PE signatures")
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return E_INVALID_SIG
|
||||
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
|
||||
try:
|
||||
extractor = get_extractor(
|
||||
args.sample, format, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
|
||||
args.sample, format_, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
|
||||
)
|
||||
except UnsupportedFormatError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE or ELF file.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
|
||||
)
|
||||
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
|
||||
logger.error("-" * 80)
|
||||
log_unsupported_format_error()
|
||||
return E_INVALID_FILE_TYPE
|
||||
except UnsupportedArchError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to target a supported architecture.")
|
||||
logger.error(" ")
|
||||
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
|
||||
logger.error("-" * 80)
|
||||
log_unsupported_arch_error()
|
||||
return E_INVALID_FILE_ARCH
|
||||
except UnsupportedOSError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to target a supported OS.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux)."
|
||||
)
|
||||
logger.error("-" * 80)
|
||||
log_unsupported_os_error()
|
||||
return E_INVALID_FILE_OS
|
||||
|
||||
meta = collect_metadata(argv, args.sample, args.rules, extractor)
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import json
|
||||
|
||||
import capa.render.result_document
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.render.result_document import convert_capabilities_to_result_document
|
||||
|
||||
|
||||
class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
@@ -27,7 +27,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
|
||||
|
||||
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
|
||||
return json.dumps(
|
||||
capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities),
|
||||
convert_capabilities_to_result_document(meta, rules, capabilities),
|
||||
cls=CapaJsonObjectEncoder,
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import copy
|
||||
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.render.utils
|
||||
import capa.features.common
|
||||
|
||||
@@ -41,6 +41,7 @@ import tqdm.contrib.logging
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.helpers
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
from capa.rules import Rule, RuleSet
|
||||
@@ -286,16 +287,16 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
|
||||
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
|
||||
return ctx.capabilities_by_sample[path]
|
||||
|
||||
if nice_path.endswith(capa.main.EXTENSIONS_SHELLCODE_32):
|
||||
format = "sc32"
|
||||
elif nice_path.endswith(capa.main.EXTENSIONS_SHELLCODE_64):
|
||||
format = "sc64"
|
||||
if nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32):
|
||||
format_ = "sc32"
|
||||
elif nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64):
|
||||
format_ = "sc64"
|
||||
else:
|
||||
format = "auto"
|
||||
format_ = "auto"
|
||||
|
||||
logger.debug("analyzing sample: %s", nice_path)
|
||||
extractor = capa.main.get_extractor(
|
||||
nice_path, format, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
nice_path, format_, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
)
|
||||
|
||||
capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True)
|
||||
|
||||
@@ -59,7 +59,9 @@ import colorama
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.render.utils as rutils
|
||||
import capa.features.freeze
|
||||
import capa.render.result_document
|
||||
@@ -162,25 +164,11 @@ def main(argv=None):
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.main.UnsupportedFormatError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE file.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing PE files (or shellcode, when using --format sc32|sc64)."
|
||||
)
|
||||
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
|
||||
logger.error("-" * 80)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.main.UnsupportedRuntimeError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Unsupported runtime or Python interpreter.")
|
||||
logger.error(" ")
|
||||
logger.error(" capa supports running under Python 2.7 using Vivisect for binary analysis.")
|
||||
logger.error(" It can also run within IDA Pro, using either Python 2.7 or 3.5+.")
|
||||
logger.error(" ")
|
||||
logger.error(" If you're seeing this message on the command line, please ensure you're running Python 2.7.")
|
||||
logger.error("-" * 80)
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
capa.helpers.log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor)
|
||||
|
||||
@@ -75,8 +75,10 @@ import capa.rules
|
||||
import capa.engine
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
|
||||
logger = logging.getLogger("capa.show-features")
|
||||
|
||||
@@ -113,27 +115,19 @@ def main(argv=None):
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.main.UnsupportedFormatError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a PE file.")
|
||||
logger.error(" ")
|
||||
logger.error(
|
||||
" capa currently only supports analyzing PE files (or shellcode, when using --format sc32|sc64)."
|
||||
)
|
||||
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
|
||||
logger.error("-" * 80)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.main.UnsupportedRuntimeError:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Unsupported runtime or Python interpreter.")
|
||||
logger.error(" ")
|
||||
logger.error(" capa supports running under Python 2.7 using Vivisect for binary analysis.")
|
||||
logger.error(" It can also run within IDA Pro, using either Python 2.7 or 3.5+.")
|
||||
logger.error(" ")
|
||||
logger.error(" If you're seeing this message on the command line, please ensure you're running Python 2.7.")
|
||||
logger.error("-" * 80)
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
for feature, va in extractor.extract_global_features():
|
||||
if va:
|
||||
print("global: 0x%08x: %s" % (va, feature))
|
||||
else:
|
||||
print("global: 0x00000000: %s" % (feature))
|
||||
|
||||
if not args.function:
|
||||
for feature, va in extractor.extract_file_features():
|
||||
if va:
|
||||
|
||||
2
setup.py
2
setup.py
@@ -26,6 +26,8 @@ requirements = [
|
||||
"smda==1.7.1",
|
||||
"pefile==2021.9.3",
|
||||
"pyelftools==0.28",
|
||||
"dnfile==0.10.0",
|
||||
"dncil==1.0.0",
|
||||
]
|
||||
|
||||
# this sets __version__
|
||||
|
||||
@@ -22,9 +22,23 @@ import capa.features.file
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
import capa.features.basicblock
|
||||
from capa.features.common import OS, OS_LINUX, ARCH_I386, FORMAT_PE, ARCH_AMD64, FORMAT_ELF, OS_WINDOWS, Arch, Format
|
||||
from capa.features.common import (
|
||||
OS,
|
||||
OS_ANY,
|
||||
OS_LINUX,
|
||||
ARCH_I386,
|
||||
FORMAT_PE,
|
||||
ARCH_AMD64,
|
||||
FORMAT_ELF,
|
||||
OS_WINDOWS,
|
||||
FORMAT_DOTNET,
|
||||
Arch,
|
||||
Format,
|
||||
)
|
||||
|
||||
CD = os.path.dirname(__file__)
|
||||
DOTNET_DIR = os.path.join(CD, "data", "dotnet")
|
||||
DNFILE_TESTFILES = os.path.join(DOTNET_DIR, "dnfile-testfiles")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
@@ -122,6 +136,19 @@ def get_pefile_extractor(path):
|
||||
return capa.features.extractors.pefile.PefileFeatureExtractor(path)
|
||||
|
||||
|
||||
def get_dotnetfile_extractor(path):
|
||||
import capa.features.extractors.dotnetfile
|
||||
|
||||
return capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(path)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_dnfile_extractor(path):
|
||||
import capa.features.extractors.dnfile.extractor
|
||||
|
||||
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
|
||||
|
||||
|
||||
def extract_global_features(extractor):
|
||||
features = collections.defaultdict(set)
|
||||
for feature, va in extractor.extract_global_features():
|
||||
@@ -220,6 +247,14 @@ def get_data_path_by_name(name):
|
||||
return os.path.join(CD, "data", "79abd17391adc6251ecdc58d13d76baf.dll_")
|
||||
elif name.startswith("946a9"):
|
||||
return os.path.join(CD, "data", "946a99f36a46d335dec080d9a4371940.dll_")
|
||||
elif name.startswith("b9f5b"):
|
||||
return os.path.join(CD, "data", "b9f5bd514485fb06da39beff051b9fdc.exe_")
|
||||
elif name.startswith("mixed-mode-64"):
|
||||
return os.path.join(DNFILE_TESTFILES, "mixed-mode", "ModuleCode", "bin", "ModuleCode_amd64.exe")
|
||||
elif name.startswith("hello-world"):
|
||||
return os.path.join(DNFILE_TESTFILES, "hello-world", "hello-world.exe")
|
||||
elif name.startswith("_1c444"):
|
||||
return os.path.join(CD, "data", "dotnet", "1c444ebeba24dcba8628b7dfe5fec7c6.exe_")
|
||||
else:
|
||||
raise ValueError("unexpected sample fixture: %s" % name)
|
||||
|
||||
@@ -272,7 +307,9 @@ def get_sample_md5_by_name(name):
|
||||
elif name.startswith("79abd"):
|
||||
return "79abd17391adc6251ecdc58d13d76baf"
|
||||
elif name.startswith("946a9"):
|
||||
return "946a99f36a46d335dec080d9a4371940.dll_"
|
||||
return "946a99f36a46d335dec080d9a4371940"
|
||||
elif name.startswith("b9f5b"):
|
||||
return "b9f5bd514485fb06da39beff051b9fdc"
|
||||
else:
|
||||
raise ValueError("unexpected sample fixture: %s" % name)
|
||||
|
||||
@@ -626,6 +663,39 @@ FEATURE_PRESENCE_TESTS = sorted(
|
||||
key=lambda t: (t[0], t[1]),
|
||||
)
|
||||
|
||||
FEATURE_PRESENCE_TESTS_DOTNET = sorted(
|
||||
[
|
||||
("b9f5b", "file", Arch(ARCH_I386), True),
|
||||
("b9f5b", "file", Arch(ARCH_AMD64), False),
|
||||
("mixed-mode-64", "file", Arch(ARCH_AMD64), True),
|
||||
("mixed-mode-64", "file", Arch(ARCH_I386), False),
|
||||
("b9f5b", "file", OS(OS_ANY), True),
|
||||
("b9f5b", "file", Format(FORMAT_DOTNET), True),
|
||||
("hello-world", "function=0x250", capa.features.common.String("Hello World!"), True),
|
||||
("hello-world", "function=0x250, bb=0x250, insn=0x252", capa.features.common.String("Hello World!"), True),
|
||||
("hello-world", "function=0x250", capa.features.insn.API("System.Console::WriteLine"), True),
|
||||
("hello-world", "file", capa.features.file.Import("System.Console::WriteLine"), True),
|
||||
("_1c444", "file", capa.features.file.Import("gdi32.CreateCompatibleBitmap"), True),
|
||||
("_1c444", "file", capa.features.file.Import("CreateCompatibleBitmap"), True),
|
||||
("_1c444", "file", capa.features.file.Import("gdi32::CreateCompatibleBitmap"), False),
|
||||
("_1c444", "function=0x1F68", capa.features.insn.API("GetWindowDC"), True),
|
||||
("_1c444", "function=0x1F68", capa.features.insn.API("user32.GetWindowDC"), True),
|
||||
("_1c444", "function=0x1F68", capa.features.insn.Number(0xCC0020), True),
|
||||
("_1c444", "function=0x1F68", capa.features.insn.Number(0x0), True),
|
||||
("_1c444", "function=0x1F68", capa.features.insn.Number(0x1), False),
|
||||
(
|
||||
"_1c444",
|
||||
"function=0x1F68, bb=0x1F68, insn=0x1FF9",
|
||||
capa.features.insn.API("System.Drawing.Image::FromHbitmap"),
|
||||
True,
|
||||
),
|
||||
("_1c444", "function=0x1F68, bb=0x1F68, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False),
|
||||
],
|
||||
# order tests by (file, item)
|
||||
# so that our LRU cache is most effective.
|
||||
key=lambda t: (t[0], t[1]),
|
||||
)
|
||||
|
||||
FEATURE_PRESENCE_TESTS_IDA = [
|
||||
# file/imports
|
||||
# IDA can recover more names of APIs imported by ordinal
|
||||
@@ -641,6 +711,9 @@ FEATURE_COUNT_TESTS = [
|
||||
]
|
||||
|
||||
|
||||
FEATURE_COUNT_TESTS_DOTNET = [] # type: ignore
|
||||
|
||||
|
||||
def do_test_feature_presence(get_extractor, sample, scope, feature, expected):
|
||||
extractor = get_extractor(sample)
|
||||
features = scope(extractor)
|
||||
@@ -738,3 +811,23 @@ def al_khaser_x86_extractor():
|
||||
@pytest.fixture
|
||||
def pingtaest_extractor():
|
||||
return get_extractor(get_data_path_by_name("pingtaest"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def b9f5b_dotnetfile_extractor():
|
||||
return get_dotnetfile_extractor(get_data_path_by_name("b9f5b"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mixed_mode_64_dotnetfile_extractor():
|
||||
return get_dotnetfile_extractor(get_data_path_by_name("mixed-mode-64"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hello_world_dnfile_extractor():
|
||||
return get_dnfile_extractor(get_data_path_by_name("hello-world"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _1c444_dnfile_extractor():
|
||||
return get_dnfile_extractor(get_data_path_by_name("1c444..."))
|
||||
|
||||
30
tests/test_dnfile_features.py
Normal file
30
tests/test_dnfile_features.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import pytest
|
||||
import fixtures
|
||||
from fixtures import *
|
||||
from fixtures import parametrize
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_PRESENCE_TESTS_DOTNET,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_dnfile_features(sample, scope, feature, expected):
|
||||
fixtures.do_test_feature_presence(fixtures.get_dnfile_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_COUNT_TESTS_DOTNET,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_dnfile_feature_counts(sample, scope, feature, expected):
|
||||
fixtures.do_test_feature_count(fixtures.get_dnfile_extractor, sample, scope, feature, expected)
|
||||
43
tests/test_dotnetfile_features.py
Normal file
43
tests/test_dotnetfile_features.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import pytest
|
||||
import fixtures
|
||||
from fixtures import *
|
||||
from fixtures import parametrize
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_PRESENCE_TESTS_DOTNET,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_dotnetfile_features(sample, scope, feature, expected):
|
||||
if scope.__name__ != "file":
|
||||
pytest.xfail("dotnetfile only extracts file scope features")
|
||||
|
||||
if isinstance(feature, capa.features.file.FunctionName):
|
||||
pytest.xfail("dotnetfile doesn't extract function names")
|
||||
|
||||
fixtures.do_test_feature_presence(fixtures.get_dotnetfile_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"extractor,function,expected",
|
||||
[
|
||||
("b9f5b_dotnetfile_extractor", "is_dotnet_file", True),
|
||||
("b9f5b_dotnetfile_extractor", "is_mixed_mode", False),
|
||||
("mixed_mode_64_dotnetfile_extractor", "is_mixed_mode", True),
|
||||
("b9f5b_dotnetfile_extractor", "get_entry_point", 0x6000007),
|
||||
("b9f5b_dotnetfile_extractor", "get_runtime_version", (2, 5)),
|
||||
("b9f5b_dotnetfile_extractor", "get_meta_version_string", "v2.0.50727"),
|
||||
],
|
||||
)
|
||||
def test_dotnetfile_extractor(request, extractor, function, expected):
|
||||
extractor_function = getattr(request.getfixturevalue(extractor), function)
|
||||
assert extractor_function() == expected
|
||||
Reference in New Issue
Block a user