From c5a9aa21bff03583723840800f3bbaa560db2d48 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Thu, 8 Dec 2022 21:33:57 +0100 Subject: [PATCH 01/10] wip: elf: better detect linux ELF files --- capa/features/extractors/elf.py | 249 ++++++++++++++++++++++++++++++-- 1 file changed, 235 insertions(+), 14 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 9f4f9b34..3f516f27 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -7,6 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import struct import logging +import collections from enum import Enum from typing import BinaryIO @@ -21,6 +22,12 @@ def align(v, alignment): return v + (alignment - remainder) +def read_cstr(buf, offset): + s = buf[offset:] + s, _, _ = s.partition(b"\x00") + return s.decode("utf-8") + + class CorruptElfFile(ValueError): pass @@ -141,6 +148,112 @@ def detect_elf_os(f) -> str: # subsequent strategies may overwrite this value ret = OSABI[ei_osabi] + (e_machine,) = struct.unpack_from(endian + "H", file_header, 0x12) + MACHINE = { + 0: "None", + 1: "M32", + 2: "SPARC", + 3: "386", + 4: "68K", + 5: "88K", + 6: "486", + 7: "860", + 8: "MIPS", + 9: "S370", + 10: "MIPS_RS3_LE", + 11: "RS6000", + 12: "UNKNOWN12", + 13: "UNKNOWN13", + 14: "UNKNOWN14", + 15: "PA_RISC", + 16: "nCUBE", + 17: "VPP500", + 18: "SPARC32PLUS", + 19: "960", + 20: "PPC", + 21: "PPC64", + 22: "S390", + 23: "SPU", + 24: "UNKNOWN24", + 25: "UNKNOWN25", + 26: "UNKNOWN26", + 27: "UNKNOWN27", + 28: "UNKNOWN28", + 29: "UNKNOWN29", + 30: "UNKNOWN30", + 31: "UNKNOWN31", + 32: "UNKNOWN32", + 33: "UNKNOWN33", + 34: "UNKNOWN34", + 35: "UNKNOWN35", + 36: "V800", + 37: "FR20", + 38: "RH32", + 39: "RCE", + 40: "ARM", + 41: "ALPHA", + 42: "SH", + 43: "SPARCV9", + 44: "TRICORE", + 45: "ARC", + 46: "H8_300", + 47: "H8_300H", + 48: "H8S", + 49: "H8_500", + 50: "IA_64", + 51: "MIPS_X", + 52: "COLDFIRE", + 53: "68HC12", + 54: "MMA", + 55: "PCP", + 56: "NCPU", + 57: "NDR1", + 58: "STARCORE", + 59: "ME16", + 60: "ST100", + 61: "TINYJ", + 62: "X86_64", + 63: "PDSP", + 64: "PDP10", + 65: "PDP11", + 66: "FX66", + 67: "ST9PLUS", + 68: "ST7", + 69: "68HC16", + 70: "68HC11", + 71: "68HC08", + 72: "68HC05", + 73: "SVX", + 74: "ST19", + 75: "VAX", + 76: "CRIS", + 77: "JAVELIN", + 78: "FIREPATH", + 79: "ZSP", + 80: "MMIX", + 81: "HUANY", + 82: "PRISM", + 83: "AVR", + 84: "FR30", + 85: "D10V", + 86: "D30V", + 87: "V850", + 88: "M32R", + 89: "MN10300", + 90: "MN10200", + 91: "PJ", + 92: "OPENRISC", + 93: "ARC_A5", + 94: "XTENSA", + 95: "VIDEOCORE", + 96: "TMM_GPP", + 97: "NS32K", + 98: "TPC", + 99: "SNP1K", + 100: "ST200", + } + logger.debug("emachine: 0x%02x (%s)", e_machine, MACHINE.get(e_machine, "unknown")) + f.seek(e_phoff) program_header_size = e_phnum * e_phentsize program_headers = f.read(program_header_size) @@ -171,18 +284,18 @@ def detect_elf_os(f) -> str: logger.debug("ph:p_offset: 0x%02x p_filesz: 0x%04x", p_offset, p_filesz) f.seek(p_offset) - note = f.read(p_filesz) - if len(note) != p_filesz: + version_r = f.read(p_filesz) + if len(version_r) != p_filesz: logger.warning("failed to read note content") continue - namesz, descsz, type_ = struct.unpack_from(endian + "III", note, 0x0) + namesz, descsz, type_ = struct.unpack_from(endian + "III", version_r, 0x0) name_offset = 0xC desc_offset = name_offset + align(namesz, 0x4) logger.debug("ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, descsz, type_) - name = note[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") + name = version_r[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") logger.debug("name: %s", name) if type_ != 1: @@ -192,7 +305,7 @@ def detect_elf_os(f) -> str: if descsz < 16: continue - desc = note[desc_offset : desc_offset + descsz] + desc = version_r[desc_offset : desc_offset + descsz] abi_tag, kmajor, kminor, kpatch = struct.unpack_from(endian + "IIII", desc, 0x0) logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) @@ -213,6 +326,7 @@ def detect_elf_os(f) -> str: # search for recognizable dynamic linkers (interpreters) # for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2 + linker = None for i in range(e_phnum): offset = i * e_phentsize phent = program_headers[offset : offset + e_phentsize] @@ -257,9 +371,9 @@ def detect_elf_os(f) -> str: shent = section_headers[offset : offset + e_shentsize] if bitness == 32: - sh_name, sh_type, _, sh_addr, sh_offset, sh_size = struct.unpack_from(endian + "IIIIII", shent, 0x0) + sh_name, sh_type, _, sh_addr, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIIIII", shent, 0x0) elif bitness == 64: - sh_name, sh_type, _, sh_addr, sh_offset, sh_size = struct.unpack_from(endian + "IIQQQQ", shent, 0x0) + sh_name, sh_type, _, sh_addr, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIQQQQ", shent, 0x0) else: raise NotImplementedError() @@ -267,21 +381,21 @@ def detect_elf_os(f) -> str: if sh_type != SHT_NOTE: continue - logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", sh_offset, sh_size) + logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", linked_sh_offset, linked_sh_size) - f.seek(sh_offset) - note = f.read(sh_size) - if len(note) != sh_size: + f.seek(linked_sh_offset) + version_r = f.read(linked_sh_size) + if len(version_r) != linked_sh_size: logger.warning("failed to read note content") continue - namesz, descsz, type_ = struct.unpack_from(endian + "III", note, 0x0) + namesz, descsz, type_ = struct.unpack_from(endian + "III", version_r, 0x0) name_offset = 0xC desc_offset = name_offset + align(namesz, 0x4) logger.debug("sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, descsz, type_) - name = note[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") + name = version_r[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") logger.debug("name: %s", name) if name == "Linux": @@ -300,7 +414,7 @@ def detect_elf_os(f) -> str: if descsz < 16: continue - desc = note[desc_offset : desc_offset + descsz] + desc = version_r[desc_offset : desc_offset + descsz] abi_tag, kmajor, kminor, kpatch = struct.unpack_from(endian + "IIII", desc, 0x0) logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) @@ -310,6 +424,113 @@ def detect_elf_os(f) -> str: ret = GNU_ABI_TAG[abi_tag] if not ret else ret logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", ret, kmajor, kminor, kpatch) + if not ret: + # if we don't have any guesses yet, + # then lets look for GLIBC symbol versioning requirements. + # this will let us guess about linux/hurd in some cases. + # + # symbol version requirements are stored in the .gnu.version_r section, + # which has type SHT_GNU_verneed (0x6ffffffe). + # + # this contains a linked list of ElfXX_Verneed structs, + # each referencing a linked list of ElfXX_Vernaux structs. + # strings are stored in the section referenced by the sh_link field of the section header. + # each Verneed struct contains a reference to the name of the library, + # each Vernaux struct contains a reference to the name of a symbol. + for i in range(e_shnum): + offset = i * e_shentsize + shent = section_headers[offset : offset + e_shentsize] + + if bitness == 32: + sh_name, sh_type, _, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(endian + "IIIIIII", shent, 0x0) + elif bitness == 64: + sh_name, sh_type, _, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(endian + "IIQQQQI", shent, 0x0) + else: + raise NotImplementedError() + + SHT_GNU_VERNEED = 0x6ffffffe + if sh_type != SHT_GNU_VERNEED: + continue + + logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", sh_offset, sh_size) + + # read the section containing the verneed structures + f.seek(sh_offset) + version_r = f.read(sh_size) + if len(version_r) != sh_size: + logger.warning("failed to read .gnu.version_r content") + continue + + # read the linked section content + # which contains strings referenced by the verneed structures + linked_shent_offset = sh_link * e_shentsize + linked_shent = section_headers[linked_shent_offset : linked_shent_offset + e_shentsize] + + if bitness == 32: + _, _, _, _, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIIIII", linked_shent, 0x0) + elif bitness == 64: + _, _, _, _, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIQQQQ", linked_shent, 0x0) + else: + raise NotImplementedError() + + f.seek(linked_sh_offset) + linked_sh = f.read(linked_sh_size) + if len(linked_sh) != linked_sh_size: + logger.warning("failed to read linked content") + continue + + so_abis = collections.defaultdict(set) + + # read verneed structures from the start of the section + # until the vn_next link is 0x0. + # each entry describes a shared object that is required by this binary. + vn_offset = 0x0 + while True: + # ElfXX_Verneed layout is the same on 32 and 64 bit + vn_version, vn_cnt, vn_file, vn_aux, vn_next = struct.unpack_from(endian + "HHIII", version_r, vn_offset) + if vn_version != 1: + # unexpected format, don't try to keep parsing + break + + # shared object names, like: "libdl.so.2" + so_name = read_cstr(linked_sh, vn_file) + + # read vernaux structures linked from the verneed structure. + # there should be vn_cnt of these. + # each entry describes an ABI name required by the shared object. + vna_offset = vn_offset + vn_aux + for i in range(vn_cnt): + # ElfXX_Vernaux layout is the same on 32 and 64 bit + _, _, _, vna_name, vna_next = struct.unpack_from(endian + "IHHII", version_r, vna_offset) + + # ABI names, like: "GLIBC_2.2.5" + abi = read_cstr(linked_sh, vna_name) + so_abis[so_name].add(abi) + + vna_offset += vna_next + + vn_offset += vn_next + if vn_next == 0: + break + + has_glibc_verneed = False + for so_name, abis in so_abis.items(): + for abi in abis: + if abi.startswith("GLIBC"): + has_glibc_verneed = True + + if has_glibc_verneed: + if MACHINE.get(e_machine) != "386": + ret = OS.LINUX + + # TODO: check dynamic sections for libmachuser and libhurduser + + if linker and "ld-linux" in linker: + ret = OS.LINUX + + if linker and "/ld.so" in linker: + ret = OS.HURD + return ret.value if ret is not None else "unknown" From 958d5bcc6a50ad3d92da63518ab635ce511aa231 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 9 Dec 2022 12:56:09 +0100 Subject: [PATCH 02/10] elf: refactor OS detection --- capa/features/extractors/elf.py | 716 ++++++++++++++++++-------------- 1 file changed, 405 insertions(+), 311 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 3f516f27..b3b86135 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -7,9 +7,11 @@ # See the License for the specific language governing permissions and limitations under the License. import struct import logging +import itertools import collections from enum import Enum -from typing import BinaryIO +from dataclasses import dataclass +from typing import BinaryIO, Optional, Dict, Set logger = logging.getLogger(__name__) @@ -67,52 +69,94 @@ GNU_ABI_TAG = { } -def detect_elf_os(f) -> str: - """ - f: type Union[BinaryIO, IDAIO] - """ - f.seek(0x0) - file_header = f.read(0x40) +@dataclass +class Phdr: + type: int + offset: int + vaddr: int + paddr: int + filesz: int + buf: bytes - # we'll set this to the detected OS - # prefer the first heuristics, - # but rather than short circuiting, - # we'll still parse out the remainder, for debugging. - ret = None - if not file_header.startswith(b"\x7fELF"): - raise CorruptElfFile("missing magic header") +@dataclass +class Shdr: + name: int + type: int + flags: int + addr: int + offset: int + size: int + link: int + buf: bytes - ei_class, ei_data = struct.unpack_from("BB", file_header, 4) - logger.debug("ei_class: 0x%02x ei_data: 0x%02x", ei_class, ei_data) - if ei_class == 1: - bitness = 32 - elif ei_class == 2: - bitness = 64 - else: - raise CorruptElfFile("invalid ei_class: 0x%02x" % ei_class) - if ei_data == 1: - endian = "<" - elif ei_data == 2: - endian = ">" - else: - raise CorruptElfFile("not an ELF file: invalid ei_data: 0x%02x" % ei_data) +class ELF: + def __init__(self, f): + self.f = f - if bitness == 32: - (e_phoff, e_shoff) = struct.unpack_from(endian + "II", file_header, 0x1C) - e_phentsize, e_phnum = struct.unpack_from(endian + "HH", file_header, 0x2A) - e_shentsize, e_shnum = struct.unpack_from(endian + "HH", file_header, 0x2E) - elif bitness == 64: - (e_phoff, e_shoff) = struct.unpack_from(endian + "QQ", file_header, 0x20) - e_phentsize, e_phnum = struct.unpack_from(endian + "HH", file_header, 0x36) - e_shentsize, e_shnum = struct.unpack_from(endian + "HH", file_header, 0x3A) - else: - raise NotImplementedError() + self.bitness: int = None + self.endian: str = None + self.e_phentsize: int = None + self.e_phnum: int = None + self.e_shentsize: int = None + self.e_shnum: int = None + self.phbuf = None + self.shbuf = None - logger.debug("e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d", e_phoff, e_phentsize, e_phnum) + self._parse() + + def _parse(self): + + self.f.seek(0x0) + self.file_header = self.f.read(0x40) + + if not self.file_header.startswith(b"\x7fELF"): + raise CorruptElfFile("missing magic header") + + ei_class, ei_data = struct.unpack_from("BB", self.file_header, 4) + logger.debug("ei_class: 0x%02x ei_data: 0x%02x", ei_class, ei_data) + if ei_class == 1: + self.bitness = 32 + elif ei_class == 2: + self.bitness = 64 + else: + raise CorruptElfFile("invalid ei_class: 0x%02x" % ei_class) + + if ei_data == 1: + self.endian = "<" + elif ei_data == 2: + self.endian = ">" + else: + raise CorruptElfFile("not an ELF file: invalid ei_data: 0x%02x" % ei_data) + + if self.bitness == 32: + e_phoff, e_shoff = struct.unpack_from(self.endian + "II", self.file_header, 0x1C) + self.e_phentsize, self.e_phnum = struct.unpack_from(self.endian + "HH", self.file_header, 0x2A) + self.e_shentsize, self.e_shnum = struct.unpack_from(self.endian + "HH", self.file_header, 0x2E) + elif self.bitness == 64: + e_phoff, e_shoff = struct.unpack_from(self.endian + "QQ", self.file_header, 0x20) + self.e_phentsize, self.e_phnum = struct.unpack_from(self.endian + "HH", self.file_header, 0x36) + self.e_shentsize, self.e_shnum = struct.unpack_from(self.endian + "HH", self.file_header, 0x3A) + else: + raise NotImplementedError() + + logger.debug("e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d", e_phoff, self.e_phentsize, self.e_phnum) + + self.f.seek(e_phoff) + program_header_size = self.e_phnum * self.e_phentsize + self.phbuf = self.f.read(program_header_size) + if len(self.phbuf) != program_header_size: + logger.warning("failed to read program headers") + self.e_phnum = 0 + + self.f.seek(e_shoff) + section_header_size = self.e_shnum * self.e_shentsize + self.shbuf = self.f.read(section_header_size) + if len(self.shbuf) != section_header_size: + logger.warning("failed to read section headers") + self.e_shnum = 0 - (ei_osabi,) = struct.unpack_from(endian + "B", file_header, 7) OSABI = { # via pyelftools: https://github.com/eliben/pyelftools/blob/0664de05ed2db3d39041e2d51d19622a8ef4fb0f/elftools/elf/enums.py#L35-L58 # some candidates are commented out because the are not useful values, @@ -140,17 +184,14 @@ def detect_elf_os(f) -> str: # 97: "ARM", # not an OS # 255: "STANDALONE", # not an OS } - logger.debug("ei_osabi: 0x%02x (%s)", ei_osabi, OSABI.get(ei_osabi, "unknown")) - # os_osabi == 0 is commonly set even when the OS is not SYSV. - # other values are unused or unknown. - if ei_osabi in OSABI and ei_osabi != 0x0: - # subsequent strategies may overwrite this value - ret = OSABI[ei_osabi] + @property + def ei_osabi(self) -> Optional[OS]: + (ei_osabi,) = struct.unpack_from(self.endian + "B", self.file_header, 7) + return ELF.OSABI.get(ei_osabi) - (e_machine,) = struct.unpack_from(endian + "H", file_header, 0x12) MACHINE = { - 0: "None", + # via https://refspecs.linuxfoundation.org/elf/gabi4+/ch4.eheader.html 1: "M32", 2: "SPARC", 3: "386", @@ -162,9 +203,6 @@ def detect_elf_os(f) -> str: 9: "S370", 10: "MIPS_RS3_LE", 11: "RS6000", - 12: "UNKNOWN12", - 13: "UNKNOWN13", - 14: "UNKNOWN14", 15: "PA_RISC", 16: "nCUBE", 17: "VPP500", @@ -174,18 +212,6 @@ def detect_elf_os(f) -> str: 21: "PPC64", 22: "S390", 23: "SPU", - 24: "UNKNOWN24", - 25: "UNKNOWN25", - 26: "UNKNOWN26", - 27: "UNKNOWN27", - 28: "UNKNOWN28", - 29: "UNKNOWN29", - 30: "UNKNOWN30", - 31: "UNKNOWN31", - 32: "UNKNOWN32", - 33: "UNKNOWN33", - 34: "UNKNOWN34", - 35: "UNKNOWN35", 36: "V800", 37: "FR20", 38: "RH32", @@ -252,183 +278,82 @@ def detect_elf_os(f) -> str: 99: "SNP1K", 100: "ST200", } - logger.debug("emachine: 0x%02x (%s)", e_machine, MACHINE.get(e_machine, "unknown")) - - f.seek(e_phoff) - program_header_size = e_phnum * e_phentsize - program_headers = f.read(program_header_size) - if len(program_headers) != program_header_size: - logger.warning("failed to read program headers") - e_phnum = 0 - # search for PT_NOTE sections that specify an OS - # for example, on Linux there is a GNU section with minimum kernel version - for i in range(e_phnum): - offset = i * e_phentsize - phent = program_headers[offset : offset + e_phentsize] + @property + def e_machine(self) -> Optional[str]: + (e_machine,) = struct.unpack_from(self.endian + "H", self.file_header, 0x12) + return ELF.MACHINE.get(e_machine) - PT_NOTE = 0x4 + def parse_program_header(self, i) -> Phdr: + phent_offset = i * self.e_phentsize + phent = self.phbuf[phent_offset : phent_offset + self.e_phentsize] - (p_type,) = struct.unpack_from(endian + "I", phent, 0x0) + (p_type,) = struct.unpack_from(self.endian + "I", phent, 0x0) logger.debug("ph:p_type: 0x%04x", p_type) - if p_type != PT_NOTE: - continue - if bitness == 32: - p_offset, _, _, p_filesz = struct.unpack_from(endian + "IIII", phent, 0x4) - elif bitness == 64: - p_offset, _, _, p_filesz = struct.unpack_from(endian + "QQQQ", phent, 0x8) + if self.bitness == 32: + p_offset, p_vaddr, p_paddr, p_filesz = struct.unpack_from(self.endian + "IIII", phent, 0x4) + elif self.bitness == 64: + p_offset, p_vaddr, p_paddr, p_filesz = struct.unpack_from(self.endian + "QQQQ", phent, 0x8) else: raise NotImplementedError() logger.debug("ph:p_offset: 0x%02x p_filesz: 0x%04x", p_offset, p_filesz) - f.seek(p_offset) - version_r = f.read(p_filesz) - if len(version_r) != p_filesz: - logger.warning("failed to read note content") - continue + self.f.seek(p_offset) + buf = self.f.read(p_filesz) + if len(buf) != p_filesz: + raise ValueError("failed to read program header content") - namesz, descsz, type_ = struct.unpack_from(endian + "III", version_r, 0x0) - name_offset = 0xC - desc_offset = name_offset + align(namesz, 0x4) + return Phdr(p_type, p_offset, p_vaddr, p_paddr, p_filesz, buf) - logger.debug("ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, descsz, type_) - - name = version_r[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") - logger.debug("name: %s", name) - - if type_ != 1: - continue - - if name == "GNU": - if descsz < 16: + @property + def program_headers(self): + for i in range(self.e_phnum): + try: + yield self.parse_program_header(i) + except ValueError: continue - desc = version_r[desc_offset : desc_offset + descsz] - abi_tag, kmajor, kminor, kpatch = struct.unpack_from(endian + "IIII", desc, 0x0) - logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) + def parse_section_header(self, i) -> Shdr: + shent_offset = i * self.e_shentsize + shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize] - if abi_tag in GNU_ABI_TAG: - # update only if not set - # so we can get the debugging output of subsequent strategies - ret = GNU_ABI_TAG[abi_tag] if not ret else ret - logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", ret, kmajor, kminor, kpatch) - elif name == "OpenBSD": - logger.debug("note owner: %s", "OPENBSD") - ret = OS.OPENBSD if not ret else ret - elif name == "NetBSD": - logger.debug("note owner: %s", "NETBSD") - ret = OS.NETBSD if not ret else ret - elif name == "FreeBSD": - logger.debug("note owner: %s", "FREEBSD") - ret = OS.FREEBSD if not ret else ret + if self.bitness == 32: + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(self.endian + "IIIIIII", shent, 0x0) + elif self.bitness == 64: + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(self.endian + "IIQQQQI", shent, 0x0) + else: + raise NotImplementedError() - # search for recognizable dynamic linkers (interpreters) - # for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2 - linker = None - for i in range(e_phnum): - offset = i * e_phentsize - phent = program_headers[offset : offset + e_phentsize] + logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", sh_offset, sh_size) + self.f.seek(sh_offset) + buf = self.f.read(sh_size) + if len(buf) != sh_size: + raise ValueError("failed to read section header content") + + return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, buf) + + @property + def section_headers(self): + for i in range(self.e_shnum): + try: + yield self.parse_section_header(i) + except ValueError: + continue + + @property + def linker(self): PT_INTERP = 0x3 - - (p_type,) = struct.unpack_from(endian + "I", phent, 0x0) - if p_type != PT_INTERP: - continue - - if bitness == 32: - p_offset, _, _, p_filesz = struct.unpack_from(endian + "IIII", phent, 0x4) - elif bitness == 64: - p_offset, _, _, p_filesz = struct.unpack_from(endian + "QQQQ", phent, 0x8) - else: - raise NotImplementedError() - - f.seek(p_offset) - interp = f.read(p_filesz) - if len(interp) != p_filesz: - logger.warning("failed to read interp content") - continue - - linker = interp.partition(b"\x00")[0].decode("ascii") - logger.debug("linker: %s", linker) - if "ld-linux" in linker: - # update only if not set - # so we can get the debugging output of subsequent strategies - ret = OS.LINUX if ret is None else ret - - f.seek(e_shoff) - section_header_size = e_shnum * e_shentsize - section_headers = f.read(section_header_size) - if len(section_headers) != section_header_size: - logger.warning("failed to read section headers") - e_shnum = 0 - - # search for notes stored in sections that aren't visible in program headers. - # e.g. .note.Linux in Linux kernel modules. - for i in range(e_shnum): - offset = i * e_shentsize - shent = section_headers[offset : offset + e_shentsize] - - if bitness == 32: - sh_name, sh_type, _, sh_addr, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIIIII", shent, 0x0) - elif bitness == 64: - sh_name, sh_type, _, sh_addr, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIQQQQ", shent, 0x0) - else: - raise NotImplementedError() - - SHT_NOTE = 0x7 - if sh_type != SHT_NOTE: - continue - - logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", linked_sh_offset, linked_sh_size) - - f.seek(linked_sh_offset) - version_r = f.read(linked_sh_size) - if len(version_r) != linked_sh_size: - logger.warning("failed to read note content") - continue - - namesz, descsz, type_ = struct.unpack_from(endian + "III", version_r, 0x0) - name_offset = 0xC - desc_offset = name_offset + align(namesz, 0x4) - - logger.debug("sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, descsz, type_) - - name = version_r[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") - logger.debug("name: %s", name) - - if name == "Linux": - logger.debug("note owner: %s", "LINUX") - ret = OS.LINUX if not ret else ret - elif name == "OpenBSD": - logger.debug("note owner: %s", "OPENBSD") - ret = OS.OPENBSD if not ret else ret - elif name == "NetBSD": - logger.debug("note owner: %s", "NETBSD") - ret = OS.NETBSD if not ret else ret - elif name == "FreeBSD": - logger.debug("note owner: %s", "FREEBSD") - ret = OS.FREEBSD if not ret else ret - elif name == "GNU": - if descsz < 16: + for phdr in self.program_headers: + if phdr.type != PT_INTERP: continue - desc = version_r[desc_offset : desc_offset + descsz] - abi_tag, kmajor, kminor, kpatch = struct.unpack_from(endian + "IIII", desc, 0x0) - logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) + return read_cstr(phdr.buf, 0) - if abi_tag in GNU_ABI_TAG: - # update only if not set - # so we can get the debugging output of subsequent strategies - ret = GNU_ABI_TAG[abi_tag] if not ret else ret - logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", ret, kmajor, kminor, kpatch) - - if not ret: - # if we don't have any guesses yet, - # then lets look for GLIBC symbol versioning requirements. - # this will let us guess about linux/hurd in some cases. - # + @property + def versions_needed(self) -> Dict[str, Set[str]]: # symbol version requirements are stored in the .gnu.version_r section, # which has type SHT_GNU_verneed (0x6ffffffe). # @@ -437,49 +362,15 @@ def detect_elf_os(f) -> str: # strings are stored in the section referenced by the sh_link field of the section header. # each Verneed struct contains a reference to the name of the library, # each Vernaux struct contains a reference to the name of a symbol. - for i in range(e_shnum): - offset = i * e_shentsize - shent = section_headers[offset : offset + e_shentsize] - - if bitness == 32: - sh_name, sh_type, _, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(endian + "IIIIIII", shent, 0x0) - elif bitness == 64: - sh_name, sh_type, _, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(endian + "IIQQQQI", shent, 0x0) - else: - raise NotImplementedError() - - SHT_GNU_VERNEED = 0x6ffffffe - if sh_type != SHT_GNU_VERNEED: + SHT_GNU_VERNEED = 0x6ffffffe + for shdr in self.section_headers: + if shdr.type != SHT_GNU_VERNEED: continue - logger.debug("sh:sh_offset: 0x%02x sh_size: 0x%04x", sh_offset, sh_size) + # the linked section contains strings referenced by the verneed structures. + linked_shdr = self.parse_section_header(shdr.link) - # read the section containing the verneed structures - f.seek(sh_offset) - version_r = f.read(sh_size) - if len(version_r) != sh_size: - logger.warning("failed to read .gnu.version_r content") - continue - - # read the linked section content - # which contains strings referenced by the verneed structures - linked_shent_offset = sh_link * e_shentsize - linked_shent = section_headers[linked_shent_offset : linked_shent_offset + e_shentsize] - - if bitness == 32: - _, _, _, _, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIIIII", linked_shent, 0x0) - elif bitness == 64: - _, _, _, _, linked_sh_offset, linked_sh_size = struct.unpack_from(endian + "IIQQQQ", linked_shent, 0x0) - else: - raise NotImplementedError() - - f.seek(linked_sh_offset) - linked_sh = f.read(linked_sh_size) - if len(linked_sh) != linked_sh_size: - logger.warning("failed to read linked content") - continue - - so_abis = collections.defaultdict(set) + versions_needed = collections.defaultdict(set) # read verneed structures from the start of the section # until the vn_next link is 0x0. @@ -487,13 +378,13 @@ def detect_elf_os(f) -> str: vn_offset = 0x0 while True: # ElfXX_Verneed layout is the same on 32 and 64 bit - vn_version, vn_cnt, vn_file, vn_aux, vn_next = struct.unpack_from(endian + "HHIII", version_r, vn_offset) + vn_version, vn_cnt, vn_file, vn_aux, vn_next = struct.unpack_from(self.endian + "HHIII", shdr.buf, vn_offset) if vn_version != 1: # unexpected format, don't try to keep parsing break # shared object names, like: "libdl.so.2" - so_name = read_cstr(linked_sh, vn_file) + so_name = read_cstr(linked_shdr.buf, vn_file) # read vernaux structures linked from the verneed structure. # there should be vn_cnt of these. @@ -501,11 +392,11 @@ def detect_elf_os(f) -> str: vna_offset = vn_offset + vn_aux for i in range(vn_cnt): # ElfXX_Vernaux layout is the same on 32 and 64 bit - _, _, _, vna_name, vna_next = struct.unpack_from(endian + "IHHII", version_r, vna_offset) + _, _, _, vna_name, vna_next = struct.unpack_from(self.endian + "IHHII", shdr.buf, vna_offset) # ABI names, like: "GLIBC_2.2.5" - abi = read_cstr(linked_sh, vna_name) - so_abis[so_name].add(abi) + abi = read_cstr(linked_shdr.buf, vna_name) + versions_needed[so_name].add(abi) vna_offset += vna_next @@ -513,59 +404,262 @@ def detect_elf_os(f) -> str: if vn_next == 0: break - has_glibc_verneed = False - for so_name, abis in so_abis.items(): - for abi in abis: - if abi.startswith("GLIBC"): - has_glibc_verneed = True + return dict(versions_needed) - if has_glibc_verneed: - if MACHINE.get(e_machine) != "386": - ret = OS.LINUX - # TODO: check dynamic sections for libmachuser and libhurduser +@dataclass +class ABITag: + os: OS + kmajor: int + kminor: int + kpatch: int - if linker and "ld-linux" in linker: - ret = OS.LINUX - if linker and "/ld.so" in linker: - ret = OS.HURD +class PHNote: + def __init__(self, endian, buf): + self.endian = endian + self.buf = buf + + self.type_: int = None + self.descsz: int = None + self.name: str = None + + self._parse() + + def _parse(self): + namesz, self.descsz, self.type_ = struct.unpack_from(self.endian + "III", self.buf, 0x0) + name_offset = 0xC + self.desc_offset = name_offset + align(namesz, 0x4) + + logger.debug("ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) + + name = self.buf[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") + logger.debug("name: %s", name) + + @property + def abi_tag(self) -> Optional[ABITag]: + if self.type_ != 1: + # TODO: what is this constant name? + return None + + if self.name != "GNU": + return None + + if self.descsz < 16: + return None + + desc = self.buf[self.desc_offset : self.desc_offset + self.descsz] + abi_tag, kmajor, kminor, kpatch = struct.unpack_from(self.endian + "IIII", desc, 0x0) + logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) + + os = GNU_ABI_TAG.get(abi_tag) + if not os: + return None + + logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) + + return ABITag(os, kmajor, kminor, kpatch) + + +class SHNote: + def __init__(self, endian, buf): + self.endian = endian + self.buf = buf + + self.type_: int = None + self.descsz: int = None + self.name: str = None + + self._parse() + + def _parse(self): + namesz, self.descsz, self.type_ = struct.unpack_from(self.endian + "III", self.buf, 0x0) + name_offset = 0xC + self.desc_offset = name_offset + align(namesz, 0x4) + + logger.debug("sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) + + name_buf = self.buf[name_offset : name_offset + namesz] + self.name = read_cstr(name_buf, 0x0) + logger.debug("sh:name: %s", self.name) + + @property + def abi_tag(self) -> Optional[ABITag]: + if self.name != "GNU": + return None + + if self.descsz < 16: + return None + + desc = self.buf[self.desc_offset : self.desc_offset + self.descsz] + abi_tag, kmajor, kminor, kpatch = struct.unpack_from(self.endian + "IIII", desc, 0x0) + logger.debug("GNU_ABI_TAG: 0x%02x", abi_tag) + + os = GNU_ABI_TAG.get(abi_tag) + if not os: + return None + + logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) + return ABITag(os, kmajor, kminor, kpatch) + + +def guess_os_from_osabi(elf) -> Optional[OS]: + return elf.ei_osabi + + +def guess_os_from_ph_notes(elf) -> Optional[OS]: + # search for PT_NOTE sections that specify an OS + # for example, on Linux there is a GNU section with minimum kernel version + PT_NOTE = 0x4 + for phdr in elf.program_headers: + if phdr.type != PT_NOTE: + continue + + note = PHNote(elf.endian, phdr.buf) + + if note.type_ != 1: + # TODO: what is this constant name? + continue + + if note.name == "Linux": + logger.debug("note owner: %s", "LINUX") + return OS.LINUX + elif note.name == "OpenBSD": + logger.debug("note owner: %s", "OPENBSD") + return OS.OPENBSD + elif note.name == "NetBSD": + logger.debug("note owner: %s", "NETBSD") + return OS.NETBSD + elif note.name == "FreeBSD": + logger.debug("note owner: %s", "FREEBSD") + return OS.FREEBSD + elif note.name == "GNU": + abi_tag = note.abi_tag + if abi_tag: + return abi_tag.os + else: + # cannot make a guess about the OS, but probably linux or hurd + pass + + return None + + +def guess_os_from_sh_notes(elf) -> Optional[OS]: + # search for notes stored in sections that aren't visible in program headers. + # e.g. .note.Linux in Linux kernel modules. + SHT_NOTE = 0x7 + for shdr in elf.section_headers: + if shdr.type != SHT_NOTE: + continue + + note = SHNote(elf.endian, shdr.buf) + + if note.name == "Linux": + logger.debug("note owner: %s", "LINUX") + return OS.LINUX + elif note.name == "OpenBSD": + logger.debug("note owner: %s", "OPENBSD") + return OS.OPENBSD + elif note.name == "NetBSD": + logger.debug("note owner: %s", "NETBSD") + return OS.NETBSD + elif note.name == "FreeBSD": + logger.debug("note owner: %s", "FREEBSD") + return OS.FREEBSD + elif note.name == "GNU": + abi_tag = note.abi_tag + if abi_tag: + ret = abi_tag.os if not ret else ret + else: + # cannot make a guess about the OS, but probably linux or hurd + pass + + return None + + +def guess_os_from_linker(elf) -> Optional[OS]: + # search for recognizable dynamic linkers (interpreters) + # for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2 + linker = elf.linker + if linker and "ld-linux" in elf.linker: + return OS.LINUX + + return None + + +def guess_os_from_abi_versions_needed(elf) -> Optional[OS]: + # then lets look for GLIBC symbol versioning requirements. + # this will let us guess about linux/hurd in some cases. + + versions_needed = elf.versions_needed + if any(map(lambda abi: abi.startswith("GLIBC"), itertools.chain(*versions_needed.values()))): + # there are any GLIBC versions needed + + if elf.e_machine != "386": + # GLIBC runs on Linux and Hurd. + # for Hurd, its *only* on i386. + # so if we're not on i386, then we're on Linux. + return OS.LINUX + + else: + # we're on i386, so we could be on either Linux or Hurd. + linker = elf.linker + + if linker and "ld-linux" in linker: + return OS.LINUX + + elif linker and "/ld.so" in linker: + return OS.HURD + + else: + # we don't have any good guesses based on versions needed + pass + + return None + + +def detect_elf_os(f) -> str: + """ + f: type Union[BinaryIO, IDAIO] + """ + elf = ELF(f) + + osabi_guess = guess_os_from_osabi(elf) + logger.info("guess: osabi: %s", osabi_guess) + + ph_notes_guess = guess_os_from_ph_notes(elf) + logger.info("guess: ph notes: %s", ph_notes_guess) + + sh_notes_guess = guess_os_from_sh_notes(elf) + logger.info("guess: sh notes: %s", sh_notes_guess) + + linker_guess = guess_os_from_linker(elf) + logger.info("guess: linker: %s", linker_guess) + + abi_versions_needed_guess = guess_os_from_abi_versions_needed(elf) + logger.info("guess: ABI versions needed: %s", abi_versions_needed_guess) + + ret = None + + if osabi_guess: + ret = osabi_guess + + elif ph_notes_guess: + ret = ph_notes_guess + + elif sh_notes_guess: + ret = sh_notes_guess + + elif linker_guess: + ret = linker_guess + + elif abi_versions_needed_guess: + ret = abi_versions_needed_guess + + # TODO: guess by dynamic sections return ret.value if ret is not None else "unknown" -class Arch(str, Enum): - I386 = "i386" - AMD64 = "amd64" - - def detect_elf_arch(f: BinaryIO) -> str: - f.seek(0x0) - file_header = f.read(0x40) - - if not file_header.startswith(b"\x7fELF"): - raise CorruptElfFile("missing magic header") - - (ei_data,) = struct.unpack_from("B", file_header, 5) - logger.debug("ei_data: 0x%02x", ei_data) - - if ei_data == 1: - endian = "<" - elif ei_data == 2: - endian = ">" - else: - raise CorruptElfFile("not an ELF file: invalid ei_data: 0x%02x" % ei_data) - - (ei_machine,) = struct.unpack_from(endian + "H", file_header, 0x12) - logger.debug("ei_machine: 0x%02x", ei_machine) - - EM_386 = 0x3 - EM_X86_64 = 0x3E - if ei_machine == EM_386: - return Arch.I386 - elif ei_machine == EM_X86_64: - return Arch.AMD64 - else: - # not really unknown, but unsupport at the moment: - # https://github.com/eliben/pyelftools/blob/ab444d982d1849191e910299a985989857466620/elftools/elf/enums.py#L73 - return "unknown" + return ELF(f).e_machine or "unknown" From 307a6fad4f4c89917436b2df430582874197e002 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 9 Dec 2022 14:31:03 +0100 Subject: [PATCH 03/10] elf: os: detect via so dependencies --- capa/features/extractors/elf.py | 108 +++++++++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index b3b86135..af0133e1 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -11,7 +11,7 @@ import itertools import collections from enum import Enum from dataclasses import dataclass -from typing import BinaryIO, Optional, Dict, Set +from typing import BinaryIO, Optional, Dict, Set, Iterator, Tuple, List logger = logging.getLogger(__name__) @@ -406,6 +406,94 @@ class ELF: return dict(versions_needed) + @property + def dynamic_entries(self) -> Iterator[Tuple[int, int]]: + """ + read the entries from the dynamic section, + yielding the tag and value for each entry. + """ + DT_NULL = 0x0 + PT_DYNAMIC = 0x2 + for phdr in self.program_headers: + if phdr.type != PT_DYNAMIC: + continue + + offset = 0x0 + while True: + if self.bitness == 32: + d_tag, d_val = struct.unpack_from(self.endian + "II", phdr.buf, offset) + offset += 8 + elif self.bitness == 64: + d_tag, d_val = struct.unpack_from(self.endian + "QQ", phdr.buf, offset) + offset += 16 + else: + raise NotImplementedError() + + if d_tag == DT_NULL: + break + + yield d_tag, d_val + + @property + def strtab(self) -> Optional[bytes]: + """ + fetch the bytes of the string table + referenced by the dynamic section. + """ + DT_STRTAB = 0x5 + DT_STRSZ = 0xA + + strtab_addr = None + strtab_size = None + + for d_tag, d_val in self.dynamic_entries: + if d_tag == DT_STRTAB: + strtab_addr = d_val + + for d_tag, d_val in self.dynamic_entries: + if d_tag == DT_STRSZ: + strtab_size = d_val + + if strtab_addr is None: + return None + + if strtab_size is None: + return None + + strtab_offset = None + for shdr in self.section_headers: + if shdr.addr <= strtab_addr < shdr.addr + shdr.size: + strtab_offset = shdr.offset + (strtab_addr - shdr.addr) + + if strtab_offset is None: + return None + + self.f.seek(strtab_offset) + strtab_buf = self.f.read(strtab_size) + + if len(strtab_buf) != strtab_size: + return None + + return strtab_buf + + @property + def needed(self) -> Iterator[str]: + """ + read the names of DT_NEEDED entries from the dynamic section, + which correspond to dependencies on other shared objects, + like: `libpthread.so.0` + """ + DT_NEEDED = 0x1 + strtab = self.strtab + if not strtab: + return + + for d_tag, d_val in self.dynamic_entries: + if d_tag != DT_NEEDED: + continue + + yield read_cstr(strtab, d_val) + @dataclass class ABITag: @@ -569,7 +657,7 @@ def guess_os_from_sh_notes(elf) -> Optional[OS]: elif note.name == "GNU": abi_tag = note.abi_tag if abi_tag: - ret = abi_tag.os if not ret else ret + return abi_tag.os else: # cannot make a guess about the OS, but probably linux or hurd pass @@ -618,6 +706,16 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]: return None +def guess_os_from_needed_dependencies(elf) -> Optional[OS]: + for needed in elf.needed: + if needed.startswith("libmachuser.so"): + return OS.HURD + if needed.startswith("libhurduser.so"): + return OS.HURD + + return None + + def detect_elf_os(f) -> str: """ f: type Union[BinaryIO, IDAIO] @@ -639,6 +737,9 @@ def detect_elf_os(f) -> str: abi_versions_needed_guess = guess_os_from_abi_versions_needed(elf) logger.info("guess: ABI versions needed: %s", abi_versions_needed_guess) + needed_dependencies_guess = guess_os_from_needed_dependencies(elf) + logger.info("guess: needed dependencies: %s", needed_dependencies_guess) + ret = None if osabi_guess: @@ -656,7 +757,8 @@ def detect_elf_os(f) -> str: elif abi_versions_needed_guess: ret = abi_versions_needed_guess - # TODO: guess by dynamic sections + elif needed_dependencies_guess: + ret = needed_dependencies_guess return ret.value if ret is not None else "unknown" From 1583fedba2b993995ebeb1c3e3a6524d5f18f52c Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 9 Dec 2022 17:34:44 +0100 Subject: [PATCH 04/10] mypy --- capa/features/extractors/elf.py | 4 ++-- capa/features/extractors/elffile.py | 18 ++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index af0133e1..ad78c945 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -194,7 +194,7 @@ class ELF: # via https://refspecs.linuxfoundation.org/elf/gabi4+/ch4.eheader.html 1: "M32", 2: "SPARC", - 3: "386", + 3: "i386", 4: "68K", 5: "88K", 6: "486", @@ -238,7 +238,7 @@ class ELF: 59: "ME16", 60: "ST100", 61: "TINYJ", - 62: "X86_64", + 62: "amd64", 63: "PDSP", 64: "PDP10", 65: "PDP11", diff --git a/capa/features/extractors/elffile.py b/capa/features/extractors/elffile.py index 4810bb5f..d4f61a06 100644 --- a/capa/features/extractors/elffile.py +++ b/capa/features/extractors/elffile.py @@ -7,7 +7,6 @@ # See the License for the specific language governing permissions and limitations under the License. import io import logging -import contextlib from typing import Tuple, Iterator from elftools.elf.elffile import ELFFile, SymbolTableSection @@ -16,7 +15,6 @@ import capa.features.extractors.common from capa.features.file import Import, Section from capa.features.common import OS, FORMAT_ELF, Arch, Format, Feature from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress -from capa.features.extractors.elf import Arch as ElfArch from capa.features.extractors.base_extractor import FeatureExtractor logger = logging.getLogger(__name__) @@ -26,17 +24,17 @@ def extract_file_import_names(elf, **kwargs): # see https://github.com/eliben/pyelftools/blob/0664de05ed2db3d39041e2d51d19622a8ef4fb0f/scripts/readelf.py#L372 symbol_tables = [(idx, s) for idx, s in enumerate(elf.iter_sections()) if isinstance(s, SymbolTableSection)] - for section_index, section in symbol_tables: + for _, section in symbol_tables: if not isinstance(section, SymbolTableSection): continue if section["sh_entsize"] == 0: - logger.debug("Symbol table '%s' has a sh_entsize of zero!" % (section.name)) + logger.debug("Symbol table '%s' has a sh_entsize of zero!", section.name) continue - logger.debug("Symbol table '%s' contains %s entries:" % (section.name, section.num_symbols())) + logger.debug("Symbol table '%s' contains %s entries:", section.name, section.num_symbols()) - for nsym, symbol in enumerate(section.iter_symbols()): + for _, symbol in enumerate(section.iter_symbols()): if symbol.name and symbol.entry.st_info.type == "STT_FUNC": # TODO symbol address # TODO symbol version info? @@ -73,9 +71,9 @@ def extract_file_arch(elf, **kwargs): # TODO merge with capa.features.extractors.elf.detect_elf_arch() arch = elf.get_machine_arch() if arch == "x86": - yield Arch(ElfArch.I386), NO_ADDRESS + yield Arch("i386"), NO_ADDRESS elif arch == "x64": - yield Arch(ElfArch.AMD64), NO_ADDRESS + yield Arch("amd64"), NO_ADDRESS else: logger.warning("unsupported architecture: %s", arch) @@ -153,8 +151,8 @@ class ElfFeatureExtractor(FeatureExtractor): def extract_insn_features(self, f, bb, insn): raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") - def is_library_function(self, va): + def is_library_function(self, addr): raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") - def get_function_name(self, va): + def get_function_name(self, addr): raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") From c958a6a286443b21cd301918577c345a839dd2ae Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Fri, 9 Dec 2022 16:07:46 +0100 Subject: [PATCH 05/10] elf: black --- capa/features/extractors/elf.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index ad78c945..6e7a1b75 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -10,8 +10,8 @@ import logging import itertools import collections from enum import Enum +from typing import Set, Dict, List, Tuple, BinaryIO, Iterator, Optional from dataclasses import dataclass -from typing import BinaryIO, Optional, Dict, Set, Iterator, Tuple, List logger = logging.getLogger(__name__) @@ -320,9 +320,13 @@ class ELF: shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize] if self.bitness == 32: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(self.endian + "IIIIIII", shent, 0x0) + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from( + self.endian + "IIIIIII", shent, 0x0 + ) elif self.bitness == 64: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(self.endian + "IIQQQQI", shent, 0x0) + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from( + self.endian + "IIQQQQI", shent, 0x0 + ) else: raise NotImplementedError() @@ -362,7 +366,7 @@ class ELF: # strings are stored in the section referenced by the sh_link field of the section header. # each Verneed struct contains a reference to the name of the library, # each Vernaux struct contains a reference to the name of a symbol. - SHT_GNU_VERNEED = 0x6ffffffe + SHT_GNU_VERNEED = 0x6FFFFFFE for shdr in self.section_headers: if shdr.type != SHT_GNU_VERNEED: continue @@ -378,7 +382,9 @@ class ELF: vn_offset = 0x0 while True: # ElfXX_Verneed layout is the same on 32 and 64 bit - vn_version, vn_cnt, vn_file, vn_aux, vn_next = struct.unpack_from(self.endian + "HHIII", shdr.buf, vn_offset) + vn_version, vn_cnt, vn_file, vn_aux, vn_next = struct.unpack_from( + self.endian + "HHIII", shdr.buf, vn_offset + ) if vn_version != 1: # unexpected format, don't try to keep parsing break @@ -437,7 +443,7 @@ class ELF: @property def strtab(self) -> Optional[bytes]: """ - fetch the bytes of the string table + fetch the bytes of the string table referenced by the dynamic section. """ DT_STRTAB = 0x5 From 7ba08edffa402da1f373b4a0303e5c6d08722b39 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Fri, 9 Dec 2022 16:09:41 +0100 Subject: [PATCH 06/10] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2bf3693..1ba2cab5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - update pydantic model to guarantee type coercion #1176 @mike-hunhoff - do not overwrite version in version.py during PyInstaller build #1169 @mr-tz - render: fix vverbose rendering of offsets #1215 @williballenthin +- elf: better detect OS via GLIBC ABI version needed and dependencies #1221 @williballenthin ### capa explorer IDA Pro plugin - fix: display instruction items #1154 @mr-tz From b26ed47ab8f386e8e16981725c0002672fd3411e Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 12 Dec 2022 11:40:32 +0100 Subject: [PATCH 07/10] tests: add OS detection tests --- tests/test_os_detection.py | 40 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/test_os_detection.py b/tests/test_os_detection.py index e2f850d7..25f30475 100644 --- a/tests/test_os_detection.py +++ b/tests/test_os_detection.py @@ -14,13 +14,49 @@ from fixtures import * import capa.features.extractors.elf -def test_elf_section_gnu_abi_tag(): +def test_elf_sh_notes(): + # guess: osabi: None + # guess: ph notes: None + # guess: sh notes: OS.LINUX + # guess: linker: None + # guess: ABI versions needed: None + # guess: needed dependencies: None path = get_data_path_by_name("2f7f5f") with open(path, "rb") as f: assert capa.features.extractors.elf.detect_elf_os(f) == "linux" -def test_elf_program_header_gnu_abi_tag(): +def test_elf_pt_notes(): + # guess: osabi: None + # guess: ph notes: None + # guess: sh notes: OS.LINUX + # guess: linker: OS.LINUX + # guess: ABI versions needed: OS.LINUX + # guess: needed dependencies: None path = get_data_path_by_name("7351f.elf") with open(path, "rb") as f: assert capa.features.extractors.elf.detect_elf_os(f) == "linux" + + +def test_elf_so_needed(): + # guess: osabi: None + # guess: ph notes: None + # guess: sh notes: OS.HURD + # guess: linker: None + # guess: ABI versions needed: OS.HURD + # guess: needed dependencies: OS.HURD + path = get_data_path_by_name("b5f052") + with open(path, "rb") as f: + assert capa.features.extractors.elf.detect_elf_os(f) == "hurd" + + +def test_elf_abi_version_hurd(): + # guess: osabi: None + # guess: ph notes: None + # guess: sh notes: OS.HURD + # guess: linker: None + # guess: ABI versions needed: OS.HURD + # guess: needed dependencies: None + path = get_data_path_by_name("bf7a9c") + with open(path, "rb") as f: + assert capa.features.extractors.elf.detect_elf_os(f) == "unknown" From 22bef146f83174954110a83ccbc778f560831ce2 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 12 Dec 2022 11:40:43 +0100 Subject: [PATCH 08/10] tests: add OS detection tests --- tests/fixtures.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/fixtures.py b/tests/fixtures.py index 8df1153f..1d0ba0fa 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -284,6 +284,10 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_") elif name.startswith("_039a6"): return os.path.join(CD, "data", "039a6336d0802a2255669e6867a5679c7eb83313dbc61fb1c7232147379bd304.exe_") + elif name.startswith("b5f052"): + return os.path.join(CD, "data", "b5f0524e69b3a3cf636c7ac366ca57bf5e3a8fdc8a9f01caf196c611a7918a87.elf_") + elif name.startswith("bf7a9c"): + return os.path.join(CD, "data", "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_") else: raise ValueError("unexpected sample fixture: %s" % name) From d4a218e268b1b3a403cb6ea6f21858d8179d562a Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 12 Dec 2022 11:41:01 +0100 Subject: [PATCH 09/10] elf: os: bug fixes --- capa/features/extractors/elf.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 6e7a1b75..5ad13456 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -410,7 +410,9 @@ class ELF: if vn_next == 0: break - return dict(versions_needed) + return dict(versions_needed) + + return {} @property def dynamic_entries(self) -> Iterator[Tuple[int, int]]: @@ -533,7 +535,9 @@ class PHNote: @property def abi_tag(self) -> Optional[ABITag]: if self.type_ != 1: - # TODO: what is this constant name? + # > The type field shall be 1. + # Linux Standard Base Specification 1.2 + # ref: https://refspecs.linuxfoundation.org/LSB_1.2.0/gLSB/noteabitag.html return None if self.name != "GNU": @@ -612,7 +616,9 @@ def guess_os_from_ph_notes(elf) -> Optional[OS]: note = PHNote(elf.endian, phdr.buf) if note.type_ != 1: - # TODO: what is this constant name? + # > The type field shall be 1. + # Linux Standard Base Specification 1.2 + # ref: https://refspecs.linuxfoundation.org/LSB_1.2.0/gLSB/noteabitag.html continue if note.name == "Linux": @@ -689,7 +695,7 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]: if any(map(lambda abi: abi.startswith("GLIBC"), itertools.chain(*versions_needed.values()))): # there are any GLIBC versions needed - if elf.e_machine != "386": + if elf.e_machine != "i386": # GLIBC runs on Linux and Hurd. # for Hurd, its *only* on i386. # so if we're not on i386, then we're on Linux. From 0f902124d176ceb933a3f5a91ccd8332a3ba5336 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 12 Dec 2022 11:43:48 +0100 Subject: [PATCH 10/10] elf: reduce logging verbosity --- capa/features/extractors/elf.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 5ad13456..d5c187dc 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -735,22 +735,22 @@ def detect_elf_os(f) -> str: elf = ELF(f) osabi_guess = guess_os_from_osabi(elf) - logger.info("guess: osabi: %s", osabi_guess) + logger.debug("guess: osabi: %s", osabi_guess) ph_notes_guess = guess_os_from_ph_notes(elf) - logger.info("guess: ph notes: %s", ph_notes_guess) + logger.debug("guess: ph notes: %s", ph_notes_guess) sh_notes_guess = guess_os_from_sh_notes(elf) - logger.info("guess: sh notes: %s", sh_notes_guess) + logger.debug("guess: sh notes: %s", sh_notes_guess) linker_guess = guess_os_from_linker(elf) - logger.info("guess: linker: %s", linker_guess) + logger.debug("guess: linker: %s", linker_guess) abi_versions_needed_guess = guess_os_from_abi_versions_needed(elf) - logger.info("guess: ABI versions needed: %s", abi_versions_needed_guess) + logger.debug("guess: ABI versions needed: %s", abi_versions_needed_guess) needed_dependencies_guess = guess_os_from_needed_dependencies(elf) - logger.info("guess: needed dependencies: %s", needed_dependencies_guess) + logger.debug("guess: needed dependencies: %s", needed_dependencies_guess) ret = None