In preparazione di una CTF ho creato un debugger per studiare Assembly. Questo strumento prende un PE, e fa le seguenti cose:
Il nostro debugger disassembla ciascuna sezione, e tramite la libreria Capstone genera un "disassembly.txt" per ciascuna sezione del file, rivelando l'Assembly del PE. Questa libreria converte i dati binari del PE in istruzioni Assembly, in plain text.
La sintassi del tool è
In questo thread vado a caricare (zippato, disassemblato e disarmato) un payload puzzo generato con metasploit (che ovviamente punta a 127.0.0.1) - Da notare come il peso del PE iniziale è 73 KB, dopo esser stato processato dal mio tool, estratto dalla ZIP pesa oltre 1 MB.
- Rileva dinamicamente tutte le sezioni di memoria di un PE (.exe);
- Vede se è compresso con un Packer. Di default controlla solo se c'è UPX, Themida e PECompact;
- Analisi generale (timestamp di creazione/modifica dell'artefatto, architettura, ecc;
- Enumerazione di chiamate API ed Import/Export: a tal proposito i RAT usano spesso troppe referenze all'API Winsock (WSOCK32.dll) come socket, connect, send, recv eccetera - Quindi se state esaminando un .exe che sta idle nel task manager mentre fa troppe chiamate a quest'API consiglio di approfondire l'analisi;
- Prova ad estrarre informazioni su linker e compiler, in seguito calcola l'entropia di ciascuna sezione di memoria. Solitamente le sezioni con entropia molto elevata sono indici di attività sospetta;
- Logging degli errori: utilissima per debuggare lo script stesso, in caso gli sia passato come argomento un'exe con caratteristiche particolari. Alcuni PE usano ad esempio l'API "IsDebuggerPresent" per non farsi reversare, e il mio script non tiene in conto di questo. Quindi se analizzate un PE con caratteristiche evasive, come l'antisandbox o l'antireversing dovrete fare degli aggiustamenti;
Il nostro debugger disassembla ciascuna sezione, e tramite la libreria Capstone genera un "disassembly.txt" per ciascuna sezione del file, rivelando l'Assembly del PE. Questa libreria converte i dati binari del PE in istruzioni Assembly, in plain text.
La sintassi del tool è
python debugger2.py iltuofile.exe
In questo thread vado a caricare (zippato, disassemblato e disarmato) un payload puzzo generato con metasploit (che ovviamente punta a 127.0.0.1) - Da notare come il peso del PE iniziale è 73 KB, dopo esser stato processato dal mio tool, estratto dalla ZIP pesa oltre 1 MB.
Python:
import pefile
import os
import hexdump
import re
import logging
import math
from capstone import Cs, CS_ARCH_X86, CS_MODE_32, CS_MODE_64, CsError
from datetime import datetime
logging.basicConfig(filename='disassembly_errors.log', level=logging.ERROR)
def get_file_timestamps(file_path):
try:
stat_info = os.stat(file_path)
creation_time = datetime.fromtimestamp(stat_info.st_ctime)
modification_time = datetime.fromtimestamp(stat_info.st_mtime)
return creation_time, modification_time
except Exception as e:
print(f"Errore durante il recupero dei timestamp del file: {e}")
return None, None
def print_timestamp_info(file_path):
creation_time, modification_time = get_file_timestamps(file_path)
if creation_time and modification_time:
print_header("Timestamp del File")
print_info("Data di Creazione", creation_time.strftime("%Y-%m-%d %H:%M:%S"),
"Data e ora in cui il file è stato creato")
print_info("Data di Modifica", modification_time.strftime("%Y-%m-%d %H:%M:%S"),
"Data e ora dell'ultima modifica del file")
def get_architecture_description(machine_type):
if machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386']:
return "x86 (32-bit)"
elif machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_AMD64']:
return "x64 (64-bit)"
else:
return "Sconosciuta"
def calculate_entropy(data):
if not data:
return 0
freq = [0] * 256
for byte in data:
freq[byte] += 1
entropy = 0
length = len(data)
for count in freq:
if count > 0:
p = count / length
entropy -= p * math.log2(p)
return entropy
def detect_packer(data):
packer_signatures = [
b'UPX', b'Themida', b'PECompact'
]
for sig in packer_signatures:
if sig in data:
return f"Il file è stato compresso con il packer: {sig.decode()}"
else:
print("Il file non è stato compresso con un Packer.")
def extract_pe_info(file_path):
try:
pe = pefile.PE(file_path)
except FileNotFoundError:
print(f"Errore: Il file '{file_path}' non è stato trovato.")
return
except pefile.PEFormatError as e:
print(f"Errore: Il formato del file PE non è valido. Dettagli: {e}")
return
print_header("Informazioni PE")
print_timestamp_info(file_path)
packer_detection_result = detect_packer(pe.__data__)
print(packer_detection_result)
print_header("Informazioni DOS")
print_info("e_magic", f"{pe.DOS_HEADER.e_magic}",
"Numero magico del DOS Header, deve essere 0x5A4D per i file PE32")
print_info("e_lfanew", f"{pe.DOS_HEADER.e_lfanew}",
"Offset dell'header PE dal DOS Header, indica dove inizia l'header PE")
print_header("Informazioni File")
architecture_description = get_architecture_description(pe.FILE_HEADER.Machine)
print_info("Machine", f"{pe.FILE_HEADER.Machine} ({architecture_description})",
"Tipo di macchina per la quale il file è compilato")
print_info("Number of Sections", f"{pe.FILE_HEADER.NumberOfSections}",
"Numero di sezioni nel file PE")
print_info("TimeDateStamp", f"{pe.FILE_HEADER.TimeDateStamp}",
"Timestamp della creazione del file (secondi dal 1 Gennaio 1970, Epoch)")
print_info("Pointer to Symbol Table", f"{pe.FILE_HEADER.PointerToSymbolTable}",
"Offset alla tabella dei simboli (per PE moderni questo valore è solitamente 0)")
print_header("Informazioni Opzionali")
print_info("AddressOfEntryPoint", f"{pe.OPTIONAL_HEADER.AddressOfEntryPoint}",
"Indirizzo di ingresso del programma relativo alla base dell'immagine")
print_info("ImageBase", f"{pe.OPTIONAL_HEADER.ImageBase}",
"Indirizzo base dell'immagine quando il file è caricato in memoria")
print_info("SectionAlignment", f"{pe.OPTIONAL_HEADER.SectionAlignment}",
"Allineamento delle sezioni in memoria (spazio riservato tra le sezioni)")
print_header("Sezioni")
for section in pe.sections:
name = section.Name.decode().strip('\x00')
section_data = section.get_data()
entropy = calculate_entropy(section_data)
print(f"\nSezione '{name}':")
print_info("Virtual Address", f"{section.VirtualAddress}",
"Indirizzo virtuale della sezione in memoria")
print_info("Size of Raw Data", f"{section.SizeOfRawData}",
"Dimensione dei dati raw della sezione (dimensione sul disco)")
print_info("Entropy", f"{entropy:.4f}",
"Entropia della sezione (misura della casualità dei dati)")
print_header("Tabella degli Import")
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode()
print(f"\nDLL: {dll_name}")
for imp in entry.imports:
import_name = imp.name.decode() if imp.name else 'None'
print(f" {import_name} a RVA {hex(imp.address)}")
if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
print_header("Tabella delle Esportazioni")
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
export_name = exp.name.decode() if exp.name else 'None'
print(f" {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")
def extract_sections(file_path):
pe = pefile.PE(file_path)
output_dir = "sections"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
arch = CS_ARCH_X86
mode = CS_MODE_32 if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386'] else CS_MODE_64
for section in pe.sections:
section_name = section.Name.decode().strip('\x00') or "unnamed_section"
section_data = section.get_data()
with open(os.path.join(output_dir, f"{section_name}.bin"), "wb") as f:
f.write(section_data)
with open(os.path.join(output_dir, f"{section_name}.txt"), "w") as f:
f.write(hexdump.hexdump(section_data, result='return'))
strings = extract_strings(section_data)
if strings:
with open(os.path.join(output_dir, f"{section_name}_strings.txt"), "w") as f:
f.write("\n".join(strings))
section_start = section.VirtualAddress
disassemble(section_data, section_start, arch, mode, section_name, output_dir)
extract_imports(pe, output_dir)
extract_compiler_linker_info(pe, output_dir)
def extract_strings(data):
strings = []
for match in re.finditer(b'[ -~]{4,}', data):
string = match.group().decode(errors='ignore')
if len(string) > 4: #regola il valore a seconda delle tue esigenze
strings.append(string)
return strings
def disassemble(section_data, section_start, arch, mode, section_name, output_dir):
md = Cs(arch, mode)
disassembly = ""
if len(section_data) < 16: #regola il valore a seconda delle tue esigenze
warning_message = "Attenzione: I dati della sezione sono troppo brevi per la disassemblatura."
print(warning_message)
disassembly += warning_message + "\n"
try:
with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
f.write(disassembly)
except IOError as io_error:
error_message = f"Errore IO durante la scrittura del file: {io_error}"
logging.error(error_message)
disassembly += error_message + "\n"
return
try:
for i in md.disasm(section_data, section_start):
try:
disassembly += (
f"{i.address:08x}:\t{i.mnemonic}\t{i.op_str}\n"
f" bytes: {' '.join(f'{b:02x}' for b in i.bytes)}\n"
f" length: {i.size} bytes\n"
f" RVA: {i.address - section_start + section_start:08x}\n"
)
except Exception as e:
error_message = f"Errore durante l'elaborazione dell'istruzione all'indirizzo {i.address:08x}: {e}"
logging.error(error_message)
disassembly += error_message + "\n"
except CsError as e:
error_message = f"Errore del motore Capstone: {e}"
logging.error(error_message)
disassembly += error_message + "\n"
except Exception as e:
error_message = f"Si è verificato un errore durante la disassemblatura della sezione: {e}"
logging.error(error_message)
disassembly += error_message + "\n"
try:
with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
f.write(disassembly)
except IOError as io_error:
error_message = f"Errore IO durante la scrittura del file: {io_error}"
logging.error(error_message)
disassembly += error_message + "\n"
print(f"Disassemblatura della sezione '{section_name}' completata e salvata in '{section_name}_disassembly.txt'.")
def extract_imports(pe, output_dir):
import_info = ["Tabella degli Import:"]
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode()
import_info.append(f"DLL: {dll_name}")
for imp in entry.imports:
import_name = imp.name.decode() if imp.name else 'None'
import_info.append(f" {import_name} a RVA {hex(imp.address)} (Hint: {imp.hint})")
with open(os.path.join(output_dir, "imports.txt"), "w") as f:
f.write("\n".join(import_info))
if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
export_info = ["Tabella delle Esportazioni:"]
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
export_name = exp.name.decode() if exp.name else 'None'
export_info.append(f" {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")
with open(os.path.join(output_dir, "exports.txt"), "w") as f:
f.write("\n".join(export_info))
else:
with open(os.path.join(output_dir, "exports.txt"), "w") as f:
f.write("Nessuna tabella di esportazione trovata.")
print("Informazioni su import ed esportazioni estratte e salvate.")
def extract_compiler_linker_info(pe, output_dir):
try:
with open(os.path.join(output_dir, "compiler_linker_info.txt"), "w") as f:
if hasattr(pe, 'VS_VERSIONINFO') and hasattr(pe.VS_VERSIONINFO, 'StringTable'):
for string_table in pe.VS_VERSIONINFO.StringTable:
if 'ProductVersion' in string_table.entries:
f.write(f"Compilatore: {string_table.entries['ProductVersion']}\n")
if 'FileVersion' in string_table.entries:
f.write(f"Linker: {string_table.entries['FileVersion']}\n")
else:
f.write("Informazioni sul compilatore e linker non disponibili.\n")
except Exception as e:
error_message = f"Errore durante l'estrazione delle informazioni sul compilatore e linker: {e}"
logging.error(error_message)
def print_header(header):
print(f"\n{'-'*len(header)}")
print(header)
print(f"{'-'*len(header)}")
def print_info(title, data, description):
print(f"{title}: {data}")
print(f" {description}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Uso: python extract_pe_info_and_sections.py <percorso_del_file_pe>")
sys.exit(1)
file_path = sys.argv[1]
if not os.path.isfile(file_path):
print(f"Errore: Il file '{file_path}' non esiste.")
sys.exit(1)
extract_pe_info(file_path)
extract_sections(file_path)