mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
lancelot: insn: simplify operand target fetching
This commit is contained in:
@@ -65,6 +65,33 @@ def get_imports(xtor):
|
||||
return imports
|
||||
|
||||
|
||||
def get_operand_target(insn, op):
|
||||
if insn.address in (0x455A41, 0x46AE12):
|
||||
print(insn, insn.operands)
|
||||
|
||||
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||
# call direct, x64
|
||||
# rip relative
|
||||
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
|
||||
if op[MEMORY_OPERAND_BASE] == "rip":
|
||||
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||
|
||||
# call direct, x32
|
||||
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
|
||||
elif op[MEMORY_OPERAND_BASE] == None:
|
||||
return op[MEMORY_OPERAND_DISP]
|
||||
|
||||
# call via thunk
|
||||
# mimikatz:0x455A41 call LsaQueryInformationPolicy
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
|
||||
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
|
||||
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
|
||||
return op[IMMEDIATE_OPERAND_VALUE]
|
||||
|
||||
raise ValueError("memory operand has no target")
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_thunks(xtor):
|
||||
thunks = {}
|
||||
@@ -79,29 +106,17 @@ def get_thunks(xtor):
|
||||
|
||||
op0 = insn.operands[0]
|
||||
|
||||
if op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||
target = op0[MEMORY_OPERAND_DISP]
|
||||
|
||||
# direct, x64, rip relative
|
||||
# 180020570 FF 25 DA 83 05 00 jmp cs:RtlCaptureContext_0
|
||||
if op0[MEMORY_OPERAND_BASE] == "rip":
|
||||
target = op0[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||
|
||||
# direct, x32
|
||||
# mimikatz:.text:0046AE12 FF 25 54 30 47 00 jmp ds:__imp_LsaQueryInformationPolicy
|
||||
elif op0[MEMORY_OPERAND_BASE] == None:
|
||||
target = op0[MEMORY_OPERAND_DISP]
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target not in imports:
|
||||
continue
|
||||
|
||||
thunks[va] = imports[target]
|
||||
try:
|
||||
target = get_operand_target(insn, op0)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target not in imports:
|
||||
continue
|
||||
|
||||
thunks[va] = imports[target]
|
||||
|
||||
return thunks
|
||||
|
||||
|
||||
@@ -113,35 +128,21 @@ def extract_insn_api_features(xtor, f, bb, insn):
|
||||
|
||||
op0 = insn.operands[0]
|
||||
|
||||
if op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||
try:
|
||||
target = get_operand_target(insn, op0)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
# call direct, x64
|
||||
# rip relative
|
||||
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
|
||||
if op0[MEMORY_OPERAND_BASE] == "rip":
|
||||
target = op0[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||
imports = get_imports(xtor)
|
||||
if target in imports:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
|
||||
yield feature, va
|
||||
return
|
||||
|
||||
# call direct, x32
|
||||
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
|
||||
elif op0[MEMORY_OPERAND_BASE] == None:
|
||||
target = op0[MEMORY_OPERAND_DISP]
|
||||
|
||||
else:
|
||||
return
|
||||
|
||||
imports = get_imports(xtor)
|
||||
if target in imports:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
|
||||
yield feature, va
|
||||
|
||||
# call via thunk
|
||||
# mimikatz:0x455A41 call LsaQueryInformationPolicy
|
||||
elif op0[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op0[IMMEDIATE_OPERAND_IS_RELATIVE]:
|
||||
target = op0[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
|
||||
thunks = get_thunks(xtor)
|
||||
if target in thunks:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
|
||||
yield feature, va
|
||||
thunks = get_thunks(xtor)
|
||||
if target in thunks:
|
||||
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
|
||||
yield feature, va
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(xtor, f, bb, insn):
|
||||
@@ -224,30 +225,6 @@ def derefs(xtor, p):
|
||||
p = next
|
||||
|
||||
|
||||
def get_operand_target(insn, op):
|
||||
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||
# call direct, x64
|
||||
# rip relative
|
||||
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
|
||||
if op[MEMORY_OPERAND_BASE] == "rip":
|
||||
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||
|
||||
# call direct, x32
|
||||
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
|
||||
elif op[MEMORY_OPERAND_BASE] == None:
|
||||
return op[MEMORY_OPERAND_DISP]
|
||||
|
||||
# call via thunk
|
||||
# mimikatz:0x455A41 call LsaQueryInformationPolicy
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
|
||||
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
|
||||
|
||||
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
|
||||
return op[IMMEDIATE_OPERAND_VALUE]
|
||||
|
||||
raise ValueError("memory operand has no target")
|
||||
|
||||
|
||||
def read_bytes(xtor, va):
|
||||
"""
|
||||
read up to MAX_BYTES_FEATURE_SIZE from the given address.
|
||||
|
||||
Reference in New Issue
Block a user