Merge pull request #1423 from mandiant/mypy-111

more mypy v1.1.1 fixes
This commit is contained in:
Willi Ballenthin
2023-04-03 21:48:51 +02:00
committed by GitHub
9 changed files with 246 additions and 116 deletions

View File

@@ -15,7 +15,7 @@ jobs:
fail-fast: true
matrix:
include:
- os: ubuntu-18.04
- os: ubuntu-20.04
# use old linux so that the shared library versioning is more portable
artifact_name: capa
asset_name: linux
@@ -36,7 +36,7 @@ jobs:
uses: actions/setup-python@d27e3f3d7c64b4bbf8e4abfb9b63b83e846e0435 # v4.5.0
with:
python-version: 3.8
- if: matrix.os == 'ubuntu-18.04'
- if: matrix.os == 'ubuntu-20.04'
run: sudo apt-get install -y libyaml-dev
- name: Upgrade pip, setuptools
run: python -m pip install --upgrade pip setuptools
@@ -65,10 +65,7 @@ jobs:
matrix:
include:
# OSs not already tested above
- os: ubuntu-18.04
artifact_name: capa
asset_name: linux
- os: ubuntu-20.04
- os: ubuntu-22.04
artifact_name: capa
asset_name: linux
- os: windows-2022

View File

@@ -74,6 +74,8 @@ jobs:
python-version: "3.8"
- os: ubuntu-20.04
python-version: "3.9"
- os: ubuntu-20.04
python-version: "3.10"
steps:
- name: Checkout capa with submodules
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3.3.0

View File

@@ -503,6 +503,23 @@ class ELF:
yield read_cstr(strtab, d_val)
@property
def symtab(self) -> Optional[Tuple[Shdr, Shdr]]:
"""
fetch the Shdr for the symtab and the associated strtab.
"""
SHT_SYMTAB = 0x2
for shdr in self.section_headers:
if shdr.type != SHT_SYMTAB:
continue
# the linked section contains strings referenced by the symtab structures.
strtab_shdr = self.parse_section_header(shdr.link)
return shdr, strtab_shdr
return None
@dataclass
class ABITag:
@@ -604,40 +621,63 @@ class SHNote:
return ABITag(os, kmajor, kminor, kpatch)
class SymTab:
def __init__(self, endian: str, bitness: int, symtab_buf: bytes, symtab_entsize: int, symtab_sz: int, strtab_buf: bytes, strtab_sz: int) -> None:
self.symbols = []
self.symnum = int(symtab_sz / symtab_entsize)
self.entsize = symtab_entsize
self.strings = strtab_buf
self.strings_sz = strtab_sz
@dataclass
class Symbol:
name_offset: int
value: int
size: int
info: int
other: int
shndx: int
self._parse(endian, bitness, symtab_buf)
class SymTab:
def __init__(
self,
endian: str,
bitness: int,
symtab: Shdr,
strtab: Shdr,
) -> None:
self.symbols: List[Symbol] = []
self.symtab = symtab
self.strtab = strtab
self._parse(endian, bitness, symtab.buf)
def _parse(self, endian: str, bitness: int, symtab_buf: bytes) -> None:
"""
return the symbol's information in
return the symbol's information in
the order specified by sys/elf32.h
"""
for i in range(self.symnum):
for i in range(int(len(self.symtab.buf) / self.symtab.entsize)):
if bitness == 32:
name, value, size, info, other, shndx = struct.unpack_from(endian+"IIIBBH", symtab_buf, i*self.entsize)
name_offset, value, size, info, other, shndx = struct.unpack_from(
endian + "IIIBBH", symtab_buf, i * self.symtab.entsize
)
elif bitness == 64:
name, info, other, shndx, value, size = struct.unpack_from(endian+"IBBBQQ", symtab_buf, i*self.entsize)
name_offset, info, other, shndx, value, size = struct.unpack_from(
endian + "IBBBQQ", symtab_buf, i * self.symtab.entsize
)
self.symbols.append((name, value, size, info, other, shndx))
self.symbols.append(Symbol(name_offset, value, size, info, other, shndx))
def fetch_str(self, offset) -> str:
def get_name(self, symbol: Symbol) -> str:
"""
fetch a symbol's name from symtab's
associated strings' section (SHT_STRTAB)
"""
for i in range(offset, self.strings_sz):
if self.strings[i] == 0:
return self.strings[offset:i].decode()
if not self.strtab:
raise ValueError("no strings found")
def get_symbols(self) -> Iterator[Tuple[int, int, int, int, int, int]]:
for i in range(symbol.name_offset, self.strtab.size):
if self.strtab.buf[i] == 0:
return self.strtab.buf[symbol.name_offset : i].decode("utf-8")
raise ValueError("symbol name not found")
def get_symbols(self) -> Iterator[Symbol]:
"""
return a tuple: (name, value, size, info, other, shndx)
for each symbol contained in the symbol table
@@ -646,11 +686,11 @@ class SymTab:
yield symbol
def guess_os_from_osabi(elf) -> Optional[OS]:
def guess_os_from_osabi(elf: ELF) -> Optional[OS]:
return elf.ei_osabi
def guess_os_from_ph_notes(elf) -> Optional[OS]:
def guess_os_from_ph_notes(elf: ELF) -> Optional[OS]:
# search for PT_NOTE sections that specify an OS
# for example, on Linux there is a GNU section with minimum kernel version
PT_NOTE = 0x4
@@ -689,7 +729,7 @@ def guess_os_from_ph_notes(elf) -> Optional[OS]:
return None
def guess_os_from_sh_notes(elf) -> Optional[OS]:
def guess_os_from_sh_notes(elf: ELF) -> Optional[OS]:
# search for notes stored in sections that aren't visible in program headers.
# e.g. .note.Linux in Linux kernel modules.
SHT_NOTE = 0x7
@@ -722,7 +762,7 @@ def guess_os_from_sh_notes(elf) -> Optional[OS]:
return None
def guess_os_from_linker(elf) -> Optional[OS]:
def guess_os_from_linker(elf: ELF) -> Optional[OS]:
# search for recognizable dynamic linkers (interpreters)
# for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2
linker = elf.linker
@@ -732,7 +772,7 @@ def guess_os_from_linker(elf) -> Optional[OS]:
return None
def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
def guess_os_from_abi_versions_needed(elf: ELF) -> Optional[OS]:
# then lets look for GLIBC symbol versioning requirements.
# this will let us guess about linux/hurd in some cases.
@@ -763,7 +803,7 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
return None
def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
def guess_os_from_needed_dependencies(elf: ELF) -> Optional[OS]:
for needed in elf.needed:
if needed.startswith("libmachuser.so"):
return OS.HURD
@@ -773,38 +813,30 @@ def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
return None
def guess_os_from_symtab(elf) -> Optional[OS]:
SHT_SYMTAB = 0x2
SHT_STRTAB = 0x3
strtab_buf = symtab_buf = None
for shdr in elf.section_headers:
if shdr.type == SHT_STRTAB:
strtab_buf, strtab_sz= shdr.buf, shdr.size
elif shdr.type == SHT_SYMTAB:
symtab_buf, symtab_entsize, symtab_sz = shdr.buf, shdr.entsize, shdr.size
if None in (strtab_buf, symtab_buf):
def guess_os_from_symtab(elf: ELF) -> Optional[OS]:
shdrs = elf.symtab
if not shdrs:
# executable does not contain a symbol table
# or the symbol's names are stripped
return None
symtab = SymTab(
elf.endian, elf.bitness, symtab_buf, symtab_entsize, symtab_sz, strtab_buf, strtab_sz
)
symtab_shdr, strtab_shdr = shdrs
symtab = SymTab(elf.endian, elf.bitness, symtab_shdr, strtab_shdr)
keywords = {
OS.LINUX: ['linux', '/linux/',],
OS.LINUX: [
"linux",
"/linux/",
],
}
for name, *_ in symtab.get_symbols():
sym_name = symtab.fetch_str(name)
for symbol in symtab.get_symbols():
sym_name = symtab.get_name(symbol)
for os, hints in keywords.items():
if any(map(lambda x: x in sym_name, hints)):
return os
return None
@@ -832,7 +864,7 @@ def detect_elf_os(f) -> str:
needed_dependencies_guess = guess_os_from_needed_dependencies(elf)
logger.debug("guess: needed dependencies: %s", needed_dependencies_guess)
symtab_guess = guess_os_from_symtab(elf)
symtab_guess = guess_os_from_symtab(elf)
logger.debug("guess: pertinent symbol name: %s", symtab_guess)
ret = None

View File

@@ -92,7 +92,7 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]:
# IDA uses section names for the library of ELF imports, like ".dynsym".
# These are not useful to us, we may need to expand this list over time
# TODO: exhaust this list, see #1419
# TODO: exhaust this list, see #1419
if library == ".dynsym":
library = ""

View File

@@ -1181,46 +1181,62 @@ def main(argv=None):
if not (args.verbose or args.vverbose or args.json):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION
# TODO: #1411 use a real type, not a dict here.
meta: Dict[str, Any]
capabilities: MatchResults
counts: Dict[str, Any]
if format_ == FORMAT_RESULT:
# result document directly parses into meta, capabilities
result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample)
meta, capabilities = result_doc.to_capa()
elif format_ == FORMAT_FREEZE:
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have native PE signatures")
except IOError as e:
logger.error("%s", str(e))
return E_INVALID_SIG
# all other formats we must create an extractor
# and use that to extract meta and capabilities
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces
# and use those for extracting.
try:
extractor = get_extractor(
args.sample,
format_,
args.os,
args.backend,
sig_paths,
should_save_workspace,
disable_progress=args.quiet,
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS
try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have native PE signatures")
except IOError as e:
logger.error("%s", str(e))
return E_INVALID_SIG
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = get_extractor(
args.sample,
format_,
args.os,
args.backend,
sig_paths,
should_save_workspace,
disable_progress=args.quiet,
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS
if format_ != FORMAT_RESULT:
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import datetime
import collections
from typing import Any, Dict, List, Tuple, Union, Optional
from pydantic import Field, BaseModel
@@ -262,6 +263,54 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N
assert_never(node)
def node_to_capa(
node: Node, children: List[Union[capa.engine.Statement, capa.engine.Feature]]
) -> Union[capa.engine.Statement, capa.engine.Feature]:
if isinstance(node, StatementNode):
if isinstance(node.statement, CompoundStatement):
if node.statement.type == CompoundStatementType.AND:
return capa.engine.And(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.OR:
return capa.engine.Or(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.NOT:
return capa.engine.Not(description=node.statement.description, child=children[0])
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)
else:
assert_never(node.statement.type)
elif isinstance(node.statement, SomeStatement):
return capa.engine.Some(
description=node.statement.description, count=node.statement.count, children=children
)
elif isinstance(node.statement, RangeStatement):
return capa.engine.Range(
description=node.statement.description,
min=node.statement.min,
max=node.statement.max,
child=node.statement.child.to_capa(),
)
elif isinstance(node.statement, SubscopeStatement):
return capa.engine.Subscope(
description=node.statement.description, scope=node.statement.scope, child=children[0]
)
else:
assert_never(node.statement)
elif isinstance(node, FeatureNode):
return node.feature.to_capa()
else:
assert_never(node)
class Match(FrozenModel):
"""
args:
@@ -394,6 +443,39 @@ class Match(FrozenModel):
captures={capture: tuple(captures[capture]) for capture in captures},
)
def to_capa(self, rules_by_name: Dict[str, capa.rules.Rule]) -> capa.engine.Result:
children = [child.to_capa(rules_by_name) for child in self.children]
statement = node_to_capa(self.node, [child.statement for child in children])
if isinstance(self.node, FeatureNode):
feature = self.node.feature
if isinstance(feature, (frzf.SubstringFeature, frzf.RegexFeature)):
matches = {capture: {loc.to_capa() for loc in locs} for capture, locs in self.captures.items()}
if isinstance(feature, frzf.SubstringFeature):
assert isinstance(statement, capa.features.common.Substring)
statement = capa.features.common._MatchedSubstring(statement, matches)
elif isinstance(feature, frzf.RegexFeature):
assert isinstance(statement, capa.features.common.Regex)
statement = capa.features.common._MatchedRegex(statement, matches)
else:
assert_never(feature)
# apparently we don't have to fixup match and subscope entries here.
# at least, default, verbose, and vverbose renderers seem to work well without any special handling here.
#
# children contains a single tree of results, corresponding to the logic of the matched rule.
# self.node.feature.match contains the name of the rule that was matched.
# so its all available to reconstruct, if necessary.
return capa.features.common.Result(
success=self.success,
statement=statement,
locations={loc.to_capa() for loc in self.locations},
children=children,
)
def parse_parts_id(s: str):
id_ = ""
@@ -581,35 +663,19 @@ class ResultDocument(FrozenModel):
def to_capa(self) -> Tuple[Dict, Dict]:
meta = self.meta.to_capa()
capabilities: Dict[str, List[Tuple[frz.Address, capa.features.common.Result]]] = {}
capabilities: Dict[
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
] = collections.defaultdict(list)
# this doesn't quite work because we don't have the rule source for rules that aren't matched.
rules_by_name = {
rule_name: capa.rules.Rule.from_yaml(rule_match.source) for rule_name, rule_match in self.rules.items()
}
for rule_name, rule_match in self.rules.items():
# Parse the YAML source into a Rule instance
rule = capa.rules.Rule.from_yaml(rule_match.source)
# Extract the capabilities from the RuleMatches object
for addr, match in rule_match.matches:
if isinstance(match.node, StatementNode):
if isinstance(match.node.statement, CompoundStatement):
statement = rule.statement
else:
statement = statement_from_capa(match.node.statement)
elif isinstance(match.node, FeatureNode):
statement = match.node.feature.to_capa()
if isinstance(statement, (capa.features.common.String, capa.features.common.Regex)):
statement.matches = match.captures
else:
raise ValueError("Invalid node type")
result: capa.engine.Result = match.to_capa(rules_by_name)
result = capa.features.common.Result(
statement=statement,
success=match.success,
locations=[frz.Address.to_capa(loc) for loc in match.locations],
children=[],
)
if rule_name not in capabilities:
capabilities[rule_name] = []
capabilities[rule_name].append((frz.Address.from_capa(addr), result))
capabilities[rule_name].append((addr.to_capa(), result))
return meta, capabilities

View File

@@ -309,6 +309,8 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_")
elif name.startswith("294b8d"):
return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_")
elif name.startswith("2bf18d"):
return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_")
else:
raise ValueError(f"unexpected sample fixture: {name}")
@@ -367,6 +369,8 @@ def get_sample_md5_by_name(name):
elif name.startswith("294b8d"):
# file name is SHA256 hash
return "3db3e55b16a7b1b1afb970d5e77c5d98"
elif name.startswith("2bf18d"):
return "2bf18d0403677378adad9001b1243211"
else:
raise ValueError(f"unexpected sample fixture: {name}")

View File

@@ -467,3 +467,12 @@ def test_main_dotnet4(_039a6_dotnetfile_extractor):
# tests successful execution and one rendering
path = _039a6_dotnetfile_extractor.path
assert capa.main.main([path, "-vv"]) == 0
def test_main_rd():
path = fixtures.get_data_path_by_name("pma01-01-rd")
assert capa.main.main([path, "-vv"]) == 0
assert capa.main.main([path, "-v"]) == 0
assert capa.main.main([path, "-j"]) == 0
assert capa.main.main([path, "-q"]) == 0
assert capa.main.main([path]) == 0

View File

@@ -278,5 +278,9 @@ def test_json_to_rdoc():
def test_rdoc_to_capa():
path = fixtures.get_data_path_by_name("pma01-01-rd")
assert len(rdoc.ResultDocument.parse_file(path).to_capa()) == 2
assert isinstance(rdoc.ResultDocument.parse_file(path).to_capa(), tuple)
rd = rdoc.ResultDocument.parse_file(path)
meta, capabilites = rd.to_capa()
assert isinstance(meta, dict)
assert isinstance(capabilites, dict)