mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
unpack: aspack: fixup PE imports after dumping
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
import io
|
||||
import struct
|
||||
import logging
|
||||
import contextlib
|
||||
import collections
|
||||
|
||||
import pefile
|
||||
import speakeasy
|
||||
@@ -181,10 +184,165 @@ class AspackUnpacker(speakeasy.Speakeasy):
|
||||
return buf, oep
|
||||
|
||||
def fixup(self, buf, oep):
|
||||
# it seems the .adata section (last section) may not be present.
|
||||
# we need this to be around because we're going to place the import table here.
|
||||
# so pad this out with NULL bytes.
|
||||
pe = pefile.PE(data=buf, fast_load=True)
|
||||
last_section = pe.sections[-1]
|
||||
expected_size = last_section.VirtualAddress + last_section.Misc_VirtualSize
|
||||
if len(buf) < expected_size:
|
||||
buf += b"\x00" * (expected_size - len(buf))
|
||||
|
||||
pe = pefile.PE(data=buf)
|
||||
pe.OPTIONAL_HEADER.AddressOfEntryPoint = oep - self.module.base
|
||||
|
||||
# since we're just pulling a big chunk from memory,
|
||||
# update the sections to point to their virtual layout.
|
||||
for section in pe.sections:
|
||||
section.PointerToRawData = section.VirtualAddress
|
||||
section.SizeOfRawData = section.Misc_VirtualSize
|
||||
|
||||
# mapping from virtual address to (dll name, symbol name).
|
||||
# the virtual address is generated by speakeasy and is not mapped.
|
||||
# it often looks something like 0xfeedf008.
|
||||
# as we encounter pointers with values like this, we can resolve the symbol.
|
||||
imports = {}
|
||||
for addr, (dll, sym) in self.module.import_table.items():
|
||||
# these are items in the original import table.
|
||||
logger.debug(f"found static import {dll}.{sym}")
|
||||
imports[addr] = (dll, sym)
|
||||
|
||||
for (addr, dll, sym) in self.emu.dyn_imps:
|
||||
# these are imports that have been resolved at runtime by the unpacking stub.
|
||||
logger.debug(f"found dynamic import {dll}.{sym}")
|
||||
imports[addr] = (dll, sym)
|
||||
|
||||
# find the existing thunk tables
|
||||
# these are pointer-aligned tables of import pointers
|
||||
|
||||
# ordered list of tuples (VA, import pointer)
|
||||
# look up the symbol using the import pointer and the `imports` mapping.
|
||||
thunks = []
|
||||
|
||||
# aspack puts the import table at the start of the first section?
|
||||
# or maybe its just the sample i'm looking at.
|
||||
for va in range(pe.sections[0].VirtualAddress + self.module.base, 0xFFFFFFFFFFFFFFFF, self.emu.get_ptr_size()):
|
||||
ptr = self.read_ptr(va)
|
||||
if ptr == 0:
|
||||
continue
|
||||
|
||||
if ptr in imports:
|
||||
thunks.append((va, ptr,))
|
||||
logger.debug(f"found import thunk at {va:08x} to {ptr:08x} for {imports[ptr][0]}\t{imports[ptr][1]}")
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
# list of dll names
|
||||
dlls = list(sorted(set(map(lambda pair: pair[0], imports.values()))))
|
||||
# mapping from dll name to list of symbols
|
||||
symbols = collections.defaultdict(set)
|
||||
for dll, sym in imports.values():
|
||||
symbols[dll].add(sym)
|
||||
for dll, syms in list(symbols.items()):
|
||||
symbols[dll] = list(sorted(syms))
|
||||
|
||||
adata_rva = 0x0
|
||||
for section in pe.sections:
|
||||
try:
|
||||
section_name = section.Name.partition(b"\x00")[0].decode("ascii")
|
||||
except:
|
||||
continue
|
||||
|
||||
if section_name == ".adata":
|
||||
adata_rva = section.VirtualAddress
|
||||
break
|
||||
assert adata_rva != 0x0
|
||||
# assume .adata is big enough
|
||||
reconstruction_target = adata_rva
|
||||
|
||||
# mapping from the data identifier to its RVA (and found within the reconstruction blob)
|
||||
locations = {}
|
||||
reconstruction = io.BytesIO()
|
||||
|
||||
# emit strings into the reconstruction blob
|
||||
for dll in dlls:
|
||||
locations[("dll", dll)] = reconstruction_target + reconstruction.tell()
|
||||
reconstruction.write(dll.encode("ascii") + b"\x00")
|
||||
|
||||
for sym in symbols[dll]:
|
||||
locations[("hint", dll, sym)] = reconstruction_target + reconstruction.tell()
|
||||
# hint == 0
|
||||
reconstruction.write(b"\x00\x00")
|
||||
# name
|
||||
reconstruction.write(sym.encode("ascii") + b"\x00")
|
||||
if reconstruction.tell() % 2 == 1:
|
||||
# padding
|
||||
reconstruction.write(b"\x00")
|
||||
|
||||
# list of thunk tuples from thunks that are contiguous and have the same dll name.
|
||||
# (VA, import pointer, dll name, symbol name)
|
||||
curr_idt_entry = []
|
||||
# list of list of thunk tuples, like above
|
||||
idt_entries = []
|
||||
for thunk in thunks:
|
||||
va, imp = thunk
|
||||
dll, sym = imports[imp]
|
||||
|
||||
if not curr_idt_entry:
|
||||
curr_idt_entry.append((va, imp, dll, sym))
|
||||
elif curr_idt_entry[0][2] == dll:
|
||||
curr_idt_entry.append((va, imp, dll, sym))
|
||||
else:
|
||||
idt_entries.append(curr_idt_entry)
|
||||
curr_idt_entry = [(va, imp, dll, sym)]
|
||||
idt_entries.append(curr_idt_entry)
|
||||
|
||||
# emit name tables for each IDT/dll
|
||||
ptr_format = "<I" if self.emu.get_ptr_size() == 4 else "<Q"
|
||||
for i, idt_entry in enumerate(idt_entries):
|
||||
print(idt_entry[0][2], len(idt_entry))
|
||||
|
||||
locations[("import lookup table", i)] = reconstruction_target + reconstruction.tell()
|
||||
for (va, imp, dll, sym) in idt_entry:
|
||||
reconstruction.write(struct.pack(ptr_format, locations[("hint", dll, sym)]))
|
||||
reconstruction.write(b"\x00" * 8)
|
||||
|
||||
# emit IDTs
|
||||
for i, idt_entry in enumerate(idt_entries):
|
||||
va, _, dll, _ = idt_entry[0]
|
||||
rva = va - self.module.get_base()
|
||||
locations[("idt", i)] = reconstruction_target + reconstruction.tell()
|
||||
|
||||
# import lookup table rva
|
||||
reconstruction.write(struct.pack("<I", locations[("import lookup table", i)]))
|
||||
# date stamp
|
||||
reconstruction.write(struct.pack("<I", 0x0))
|
||||
# forwarder chain
|
||||
reconstruction.write(struct.pack("<I", 0x0))
|
||||
# name rva
|
||||
reconstruction.write(struct.pack("<I", locations[("dll", dll)]))
|
||||
# import address table rva
|
||||
reconstruction.write(struct.pack("<I", rva))
|
||||
|
||||
reconstruction.write(b"\x00\x00\x00\x00" * 5)
|
||||
|
||||
IDT_ENTRY_SIZE = 0x20
|
||||
|
||||
# TODO assert size is ok
|
||||
# and/or extend .adata
|
||||
|
||||
pe.set_bytes_at_rva(reconstruction_target, reconstruction.getvalue())
|
||||
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = locations[("idt", 0)]
|
||||
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = IDT_ENTRY_SIZE * len(idt_entries)
|
||||
|
||||
return pe.write()
|
||||
|
||||
def read_ptr(self, va):
|
||||
endian = "little"
|
||||
val = self.mem_read(va, self.emu.get_ptr_size())
|
||||
return int.from_bytes(val, endian)
|
||||
|
||||
def unpack(self):
|
||||
buf, oep = self.dump()
|
||||
buf = self.fixup(buf, oep)
|
||||
|
||||
@@ -14,7 +14,7 @@ from fixtures import *
|
||||
import capa.unpack
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 0), reason="auto-unpack only works on py3")
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_is_packed(aspack_extractor):
|
||||
path = aspack_extractor.path
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_aspack_is_packed(aspack_extractor):
|
||||
assert capa.unpack.is_packed(buf) is True
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 0), reason="auto-unpack only works on py3")
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_detect(aspack_extractor):
|
||||
path = aspack_extractor.path
|
||||
|
||||
@@ -34,7 +34,7 @@ def test_aspack_detect(aspack_extractor):
|
||||
assert capa.unpack.detect_packer(buf) == capa.unpack.ASPACK
|
||||
|
||||
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 0), reason="auto-unpack only works on py3")
|
||||
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||
def test_aspack_unpack(aspack_extractor):
|
||||
path = aspack_extractor.path
|
||||
|
||||
|
||||
Reference in New Issue
Block a user