Discussione PE debugger "statico" Debugger2.py

Netcat

Helper
17 Gennaio 2022
532
147
390
716
In preparazione di una CTF ho creato un debugger per studiare Assembly. Questo strumento prende un PE, e fa le seguenti cose:

  1. Rileva dinamicamente tutte le sezioni di memoria di un PE (.exe);
  2. Vede se è compresso con un Packer. Di default controlla solo se c'è UPX, Themida e PECompact;
  3. Analisi generale (timestamp di creazione/modifica dell'artefatto, architettura, ecc;
  4. Enumerazione di chiamate API ed Import/Export: a tal proposito i RAT usano spesso troppe referenze all'API Winsock (WSOCK32.dll) come socket, connect, send, recv eccetera - Quindi se state esaminando un .exe che sta idle nel task manager mentre fa troppe chiamate a quest'API consiglio di approfondire l'analisi;
  5. Prova ad estrarre informazioni su linker e compiler, in seguito calcola l'entropia di ciascuna sezione di memoria. Solitamente le sezioni con entropia molto elevata sono indici di attività sospetta;
  6. Logging degli errori: utilissima per debuggare lo script stesso, in caso gli sia passato come argomento un'exe con caratteristiche particolari. Alcuni PE usano ad esempio l'API "IsDebuggerPresent" per non farsi reversare, e il mio script non tiene in conto di questo. Quindi se analizzate un PE con caratteristiche evasive, come l'antisandbox o l'antireversing dovrete fare degli aggiustamenti;
Ora veniamo alla parte HOT:
Il nostro debugger disassembla ciascuna sezione, e tramite la libreria Capstone genera un "disassembly.txt" per ciascuna sezione del file, rivelando l'Assembly del PE. Questa libreria converte i dati binari del PE in istruzioni Assembly, in plain text.

La sintassi del tool è python debugger2.py iltuofile.exe
In questo thread vado a caricare (zippato, disassemblato e disarmato) un payload puzzo generato con metasploit (che ovviamente punta a 127.0.0.1) - Da notare come il peso del PE iniziale è 73 KB, dopo esser stato processato dal mio tool, estratto dalla ZIP pesa oltre 1 MB.

Python:
import pefile
import os
import hexdump
import re
import logging
import math
from capstone import Cs, CS_ARCH_X86, CS_MODE_32, CS_MODE_64, CsError
from datetime import datetime

logging.basicConfig(filename='disassembly_errors.log', level=logging.ERROR)

def get_file_timestamps(file_path):
    try:
        stat_info = os.stat(file_path)
        creation_time = datetime.fromtimestamp(stat_info.st_ctime)
        modification_time = datetime.fromtimestamp(stat_info.st_mtime)
        return creation_time, modification_time
    except Exception as e:
        print(f"Errore durante il recupero dei timestamp del file: {e}")
        return None, None

def print_timestamp_info(file_path):
    creation_time, modification_time = get_file_timestamps(file_path)
    if creation_time and modification_time:
        print_header("Timestamp del File")
        print_info("Data di Creazione", creation_time.strftime("%Y-%m-%d %H:%M:%S"),
                   "Data e ora in cui il file è stato creato")
        print_info("Data di Modifica", modification_time.strftime("%Y-%m-%d %H:%M:%S"),
                   "Data e ora dell'ultima modifica del file")

def get_architecture_description(machine_type):
    if machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386']:
        return "x86 (32-bit)"
    elif machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_AMD64']:
        return "x64 (64-bit)"
    else:
        return "Sconosciuta"

def calculate_entropy(data):
    if not data:
        return 0
    freq = [0] * 256
    for byte in data:
        freq[byte] += 1
    entropy = 0
    length = len(data)
    for count in freq:
        if count > 0:
            p = count / length
            entropy -= p * math.log2(p)
    return entropy

def detect_packer(data):
    packer_signatures = [
        b'UPX', b'Themida', b'PECompact'
    ]
    
    for sig in packer_signatures:
        if sig in data:
            return f"Il file è stato compresso con il packer: {sig.decode()}"
    
    else:
        print("Il file non è stato compresso con un Packer.")


def extract_pe_info(file_path):
    try:
        pe = pefile.PE(file_path)
    except FileNotFoundError:
        print(f"Errore: Il file '{file_path}' non è stato trovato.")
        return
    except pefile.PEFormatError as e:
        print(f"Errore: Il formato del file PE non è valido. Dettagli: {e}")
        return
    
    print_header("Informazioni PE")
    print_timestamp_info(file_path)
    
    packer_detection_result = detect_packer(pe.__data__)
    print(packer_detection_result)
    
    print_header("Informazioni DOS")
    print_info("e_magic", f"{pe.DOS_HEADER.e_magic}",
               "Numero magico del DOS Header, deve essere 0x5A4D per i file PE32")
    print_info("e_lfanew", f"{pe.DOS_HEADER.e_lfanew}",
               "Offset dell'header PE dal DOS Header, indica dove inizia l'header PE")

    print_header("Informazioni File")
    architecture_description = get_architecture_description(pe.FILE_HEADER.Machine)
    print_info("Machine", f"{pe.FILE_HEADER.Machine} ({architecture_description})",
               "Tipo di macchina per la quale il file è compilato")
    print_info("Number of Sections", f"{pe.FILE_HEADER.NumberOfSections}",
               "Numero di sezioni nel file PE")
    print_info("TimeDateStamp", f"{pe.FILE_HEADER.TimeDateStamp}",
               "Timestamp della creazione del file (secondi dal 1 Gennaio 1970, Epoch)")
    print_info("Pointer to Symbol Table", f"{pe.FILE_HEADER.PointerToSymbolTable}",
               "Offset alla tabella dei simboli (per PE moderni questo valore è solitamente 0)")

    print_header("Informazioni Opzionali")
    print_info("AddressOfEntryPoint", f"{pe.OPTIONAL_HEADER.AddressOfEntryPoint}",
               "Indirizzo di ingresso del programma relativo alla base dell'immagine")
    print_info("ImageBase", f"{pe.OPTIONAL_HEADER.ImageBase}",
               "Indirizzo base dell'immagine quando il file è caricato in memoria")
    print_info("SectionAlignment", f"{pe.OPTIONAL_HEADER.SectionAlignment}",
               "Allineamento delle sezioni in memoria (spazio riservato tra le sezioni)")

    print_header("Sezioni")
    for section in pe.sections:
        name = section.Name.decode().strip('\x00')
        section_data = section.get_data()
        
        entropy = calculate_entropy(section_data)
        print(f"\nSezione '{name}':")
        print_info("Virtual Address", f"{section.VirtualAddress}",
                   "Indirizzo virtuale della sezione in memoria")
        print_info("Size of Raw Data", f"{section.SizeOfRawData}",
                   "Dimensione dei dati raw della sezione (dimensione sul disco)")
        print_info("Entropy", f"{entropy:.4f}",
                   "Entropia della sezione (misura della casualità dei dati)")

    print_header("Tabella degli Import")
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        dll_name = entry.dll.decode()
        print(f"\nDLL: {dll_name}")
        for imp in entry.imports:
            import_name = imp.name.decode() if imp.name else 'None'
            print(f"  {import_name} a RVA {hex(imp.address)}")

    if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        print_header("Tabella delle Esportazioni")
        for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
            export_name = exp.name.decode() if exp.name else 'None'
            print(f"  {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")

def extract_sections(file_path):
    pe = pefile.PE(file_path)
    output_dir = "sections"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    arch = CS_ARCH_X86
    mode = CS_MODE_32 if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386'] else CS_MODE_64

    for section in pe.sections:
        section_name = section.Name.decode().strip('\x00') or "unnamed_section"
        section_data = section.get_data()

        with open(os.path.join(output_dir, f"{section_name}.bin"), "wb") as f:
            f.write(section_data)

        with open(os.path.join(output_dir, f"{section_name}.txt"), "w") as f:
            f.write(hexdump.hexdump(section_data, result='return'))

        strings = extract_strings(section_data)
        if strings:
            with open(os.path.join(output_dir, f"{section_name}_strings.txt"), "w") as f:
                f.write("\n".join(strings))

        section_start = section.VirtualAddress
        disassemble(section_data, section_start, arch, mode, section_name, output_dir)

    extract_imports(pe, output_dir)
    extract_compiler_linker_info(pe, output_dir)

def extract_strings(data):
    strings = []
    for match in re.finditer(b'[ -~]{4,}', data):
        string = match.group().decode(errors='ignore')
        if len(string) > 4: #regola il valore a seconda delle tue esigenze
            strings.append(string)
    return strings

def disassemble(section_data, section_start, arch, mode, section_name, output_dir):
    md = Cs(arch, mode)
    disassembly = ""

    if len(section_data) < 16: #regola il valore a seconda delle tue esigenze
        warning_message = "Attenzione: I dati della sezione sono troppo brevi per la disassemblatura."
        print(warning_message)
        disassembly += warning_message + "\n"
        try:
            with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
                f.write(disassembly)
        except IOError as io_error:
            error_message = f"Errore IO durante la scrittura del file: {io_error}"
            logging.error(error_message)
            disassembly += error_message + "\n"
        return

    try:
        for i in md.disasm(section_data, section_start):
            try:
                disassembly += (
                    f"{i.address:08x}:\t{i.mnemonic}\t{i.op_str}\n"
                    f"    bytes: {' '.join(f'{b:02x}' for b in i.bytes)}\n"
                    f"    length: {i.size} bytes\n"
                    f"    RVA: {i.address - section_start + section_start:08x}\n"
                )
            except Exception as e:
                error_message = f"Errore durante l'elaborazione dell'istruzione all'indirizzo {i.address:08x}: {e}"
                logging.error(error_message)
                disassembly += error_message + "\n"

    except CsError as e:
        error_message = f"Errore del motore Capstone: {e}"
        logging.error(error_message)
        disassembly += error_message + "\n"
    except Exception as e:
        error_message = f"Si è verificato un errore durante la disassemblatura della sezione: {e}"
        logging.error(error_message)
        disassembly += error_message + "\n"

    try:
        with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
            f.write(disassembly)
    except IOError as io_error:
        error_message = f"Errore IO durante la scrittura del file: {io_error}"
        logging.error(error_message)
        disassembly += error_message + "\n"

    print(f"Disassemblatura della sezione '{section_name}' completata e salvata in '{section_name}_disassembly.txt'.")

def extract_imports(pe, output_dir):
    import_info = ["Tabella degli Import:"]
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        dll_name = entry.dll.decode()
        import_info.append(f"DLL: {dll_name}")
        for imp in entry.imports:
            import_name = imp.name.decode() if imp.name else 'None'
            import_info.append(f"  {import_name} a RVA {hex(imp.address)} (Hint: {imp.hint})")
    with open(os.path.join(output_dir, "imports.txt"), "w") as f:
        f.write("\n".join(import_info))
    
    if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        export_info = ["Tabella delle Esportazioni:"]
        for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
            export_name = exp.name.decode() if exp.name else 'None'
            export_info.append(f"  {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")
        with open(os.path.join(output_dir, "exports.txt"), "w") as f:
            f.write("\n".join(export_info))
    else:
        with open(os.path.join(output_dir, "exports.txt"), "w") as f:
            f.write("Nessuna tabella di esportazione trovata.")

    print("Informazioni su import ed esportazioni estratte e salvate.")

def extract_compiler_linker_info(pe, output_dir):
    try:
        with open(os.path.join(output_dir, "compiler_linker_info.txt"), "w") as f:
            if hasattr(pe, 'VS_VERSIONINFO') and hasattr(pe.VS_VERSIONINFO, 'StringTable'):
                for string_table in pe.VS_VERSIONINFO.StringTable:
                    if 'ProductVersion' in string_table.entries:
                        f.write(f"Compilatore: {string_table.entries['ProductVersion']}\n")
                    if 'FileVersion' in string_table.entries:
                        f.write(f"Linker: {string_table.entries['FileVersion']}\n")
            else:
                f.write("Informazioni sul compilatore e linker non disponibili.\n")
    except Exception as e:
        error_message = f"Errore durante l'estrazione delle informazioni sul compilatore e linker: {e}"
        logging.error(error_message)


def print_header(header):
    print(f"\n{'-'*len(header)}")
    print(header)
    print(f"{'-'*len(header)}")

def print_info(title, data, description):
    print(f"{title}: {data}")
    print(f"  {description}")

if __name__ == "__main__":
    import sys
    if len(sys.argv) != 2:
        print("Uso: python extract_pe_info_and_sections.py <percorso_del_file_pe>")
        sys.exit(1)

    file_path = sys.argv[1]
    if not os.path.isfile(file_path):
        print(f"Errore: Il file '{file_path}' non esiste.")
        sys.exit(1)

    extract_pe_info(file_path)
    extract_sections(file_path)
 

Allegati

  • Example_Output.zip
    234.4 KB · Visualizzazioni: 1
  • Mi piace
Reazioni: NSteel