Combine DEX feature extraction into a single class

This commit is contained in:
Duncan Ogilvie
2023-12-06 21:32:08 +01:00
parent e90be5a9bb
commit 52d20d2f46
4 changed files with 187 additions and 125 deletions

View File

@@ -5,15 +5,23 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import struct
import logging
from typing import Tuple, Iterator
from typing import List, Tuple, Iterator, TypedDict
from pathlib import Path
from dataclasses import dataclass
from dexparser import DEXParser
from capa.features.common import OS, FORMAT_DEX, OS_ANDROID, ARCH_DALVIK, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor
from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
SampleHashes,
FunctionHandle,
StaticFeatureExtractor,
)
logger = logging.getLogger(__name__)
@@ -31,61 +39,204 @@ def extract_file_features(dex: DEXParser) -> Iterator[Tuple[Feature, Address]]:
yield feature, addr
def extract_file_os(**kwargs) -> Iterator[Tuple[OS, Address]]:
yield OS(OS_ANDROID), NO_ADDRESS
# Reference: https://source.android.com/docs/core/runtime/dex-format
def extract_file_arch(**kwargs) -> Iterator[Tuple[Arch, Address]]:
yield Arch(ARCH_DALVIK), NO_ADDRESS
class DexProtoId(TypedDict):
shorty_idx: int
return_type_idx: int
param_off: int
GLOBAL_HANDLERS = (
extract_file_os,
extract_file_arch,
)
class DexMethodId(TypedDict):
class_idx: int
proto_idx: int
name_idx: int
def extract_global_features(dex: DEXParser) -> Iterator[Tuple[Feature, Address]]:
for handler in GLOBAL_HANDLERS:
for feature, va in handler(dex=dex): # type: ignore
yield feature, va
class DexFieldId(TypedDict):
class_idx: int
type_idx: int
name_idx: int
class DexFileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
class DexClassDef(TypedDict):
class_idx: int
access_flags: int
superclass_idx: int
interfaces_off: int
source_file_idx: int
annotations_off: int
class_data_off: int
static_values_off: int
class DexFieldDef(TypedDict):
diff: int
access_flags: int
class DexMethodDef(TypedDict):
diff: int
access_flags: int
code_off: int
class DexClass(TypedDict):
static_fields: List[DexFieldDef]
instance_fields: List[DexFieldDef]
direct_methods: List[DexMethodDef]
virtual_methods: List[DexMethodDef]
class DexAnnotation(TypedDict):
visibility: int
type_idx_diff: int
size_diff: int
name_idx_diff: int
value_type: int
encoded_value: int
class DexMethodAddress(int, Address):
def __new__(cls, index: int):
return int.__new__(cls, index)
def __repr__(self):
return f"DexMethodAddress(index={int(self)})"
def __str__(self) -> str:
return repr(self)
def __hash__(self):
return int.__hash__(self)
@dataclass
class DexAnalyzedMethod:
address: DexMethodAddress
class_type: str
name: str
shorty_descriptor: str
return_type: str
parameters: List[str]
class DexAnalysis:
def __init__(self, dex: DEXParser):
self.dex = dex
self.strings: List[str] = dex.get_strings()
self.type_ids: List[int] = dex.get_typeids()
self.method_ids: List[DexMethodId] = dex.get_methods()
self.proto_ids: List[DexProtoId] = dex.get_protoids()
self.field_ids: List[DexFieldId] = dex.get_fieldids()
self.class_defs: List[DexClassDef] = dex.get_classdef_data()
# Only available after analysis
self.methods: List[DexAnalyzedMethod] = []
def analyze_code(self):
# Loop over the classes and analyze them
# self.classes: List[DexClass] = dex.get_class_data(offset=-1)
# self.annotations: List[DexAnnotation] = dex.get_annotations(offset=-1)
# self.static_values: List[int] = dex.get_static_values(offset=-1)
self._analyze_methods()
def _analyze_methods(self):
for index, method in enumerate(self.method_ids):
proto = self.proto_ids[method["proto_idx"]]
parameters = []
param_off = proto["param_off"]
if param_off != 0:
size = struct.unpack("<L", self.dex.data[param_off : param_off + 4])[0]
for i in range(size):
type_idx = struct.unpack("<H", self.dex.data[param_off + 4 + i * 2 : param_off + 6 + i * 2])[0]
param_type = self.strings[self.type_ids[type_idx]]
parameters.append(param_type)
self.methods.append(
DexAnalyzedMethod(
address=DexMethodAddress(index),
class_type=self.strings[self.type_ids[method["class_idx"]]],
name=self.strings[method["name_idx"]],
shorty_descriptor=self.strings[proto["shorty_idx"]],
return_type=self.strings[self.type_ids[proto["return_type_idx"]]],
parameters=parameters,
)
)
class DexFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path, *, code_analysis: bool):
super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes()))
self.path: Path = path
self.code_analysis = code_analysis
self.dex = DEXParser(filedir=str(path))
self.analysis = DexAnalysis(self.dex)
# Perform more expensive code analysis only when requested
if self.code_analysis:
self.analysis.analyze_code()
def todo(self):
import inspect
message = "[DexparserFeatureExtractor:TODO] " + inspect.stack()[1].function
logger.debug(message)
def get_base_address(self):
return NO_ADDRESS
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from extract_global_features(self.dex)
# These are hardcoded global features
yield Format(FORMAT_DEX), NO_ADDRESS
yield OS(OS_ANDROID), NO_ADDRESS
yield Arch(ARCH_DALVIK), NO_ADDRESS
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from extract_file_features(self.dex)
def get_functions(self):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def is_library_function(self, addr: Address) -> bool:
# exclude androidx stuff?
return super().is_library_function(addr)
def extract_function_features(self, f):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def get_functions(self) -> Iterator[FunctionHandle]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
def get_basic_blocks(self, f):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
for index in range(len(self.analysis.methods)):
yield FunctionHandle(DexMethodAddress(index), self.analysis)
def extract_basic_block_features(self, f, bb):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
return self.todo()
yield
def get_instructions(self, f, bb):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def get_basic_blocks(self, f: FunctionHandle) -> Iterator[BBHandle]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
return self.todo()
yield
def extract_insn_features(self, f, bb, insn):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
return self.todo()
yield
def is_library_function(self, va):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
return self.todo()
yield
def get_function_name(self, va):
raise NotImplementedError("DexFileFeatureExtractor can only be used to extract file features")
def extract_insn_features(
self, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
if not self.code_analysis:
raise Exception("code analysis is disabled")
return self.todo()
yield

View File

@@ -1,89 +0,0 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from __future__ import annotations
import logging
from typing import List, Tuple, Iterator
from pathlib import Path
import dexparser
import capa.features.extractors
import capa.features.extractors.dexfile
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
SampleHashes,
FunctionHandle,
StaticFeatureExtractor,
)
logger = logging.getLogger(__name__)
class DexparserFeatureExtractorCache:
def __init__(self, dex: dexparser.DEXParser):
self.dex = dex
class DexparserFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
self.dex = dexparser.DEXParser(filedir=str(path))
super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes()))
self.cache = DexparserFeatureExtractorCache(self.dex)
# pre-compute these because we'll yield them at *every* scope.
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.dexfile.extract_file_format())
self.global_features.extend(capa.features.extractors.dexfile.extract_file_os(dex=self.dex))
self.global_features.extend(capa.features.extractors.dexfile.extract_file_arch(dex=self.dex))
def todo(self):
import inspect
logger.debug("[DexparserFeatureExtractor:TODO] " + inspect.stack()[1].function)
def get_base_address(self):
return NO_ADDRESS
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
return self.todo()
yield
def get_functions(self) -> Iterator[FunctionHandle]:
return self.todo()
yield
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
return self.todo()
yield
def get_basic_blocks(self, f: FunctionHandle) -> Iterator[BBHandle]:
return self.todo()
yield
def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
return self.todo()
yield
def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]:
return self.todo()
yield
def extract_insn_features(
self, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
return self.todo()
yield

View File

@@ -309,9 +309,9 @@ def get_extractor(
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
elif format_ == FORMAT_DEX:
import capa.features.extractors.dexparser.extractor
import capa.features.extractors.dexfile
return capa.features.extractors.dexparser.extractor.DexparserFeatureExtractor(path)
return capa.features.extractors.dexfile.DexFeatureExtractor(path, code_analysis=True)
elif backend == BACKEND_BINJA:
from capa.features.extractors.binja.find_binja_api import find_binja_path
@@ -382,7 +382,7 @@ def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]:
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample))
elif format_ == capa.features.common.FORMAT_DEX:
file_extractors.append(capa.features.extractors.dexfile.DexFileFeatureExtractor(sample))
file_extractors.append(capa.features.extractors.dexfile.DexFeatureExtractor(sample, code_analysis=False))
elif format_ == FORMAT_CAPE:
report = json.load(Path(sample).open(encoding="utf-8"))