17 Gennaio 2022
In preparazione di una CTF ho creato un debugger per studiare Assembly. Questo strumento prende un PE, e fa le seguenti cose:

  1. Rileva dinamicamente tutte le sezioni di memoria di un PE (.exe);
  2. Vede se è compresso con un Packer. Di default controlla solo se c'è UPX, Themida e PECompact;
  3. Analisi generale (timestamp di creazione/modifica dell'artefatto, architettura, ecc;
  4. Enumerazione di chiamate API ed Import/Export: a tal proposito i RAT usano spesso troppe referenze all'API Winsock (WSOCK32.dll) come socket, connect, send, recv eccetera - Quindi se state esaminando un .exe che sta idle nel task manager mentre fa troppe chiamate a quest'API consiglio di approfondire l'analisi;
  5. Prova ad estrarre informazioni su linker e compiler, in seguito calcola l'entropia di ciascuna sezione di memoria. Solitamente le sezioni con entropia molto elevata sono indici di attività sospetta;
  6. Logging degli errori: utilissima per debuggare lo script stesso, in caso gli sia passato come argomento un'exe con caratteristiche particolari. Alcuni PE usano ad esempio l'API "IsDebuggerPresent" per non farsi reversare, e il mio script non tiene in conto di questo. Quindi se analizzate un PE con caratteristiche evasive, come l'antisandbox o l'antireversing dovrete fare degli aggiustamenti;
Ora veniamo alla parte HOT:
Il nostro debugger disassembla ciascuna sezione, e tramite la libreria Capstone genera un "disassembly.txt" per ciascuna sezione del file, rivelando l'Assembly del PE. Questa libreria converte i dati binari del PE in istruzioni Assembly, in plain text.

La sintassi del tool è python debugger2.py iltuofile.exe
In questo thread vado a caricare (zippato, disassemblato e disarmato) un payload puzzo generato con metasploit (che ovviamente punta a - Da notare come il peso del PE iniziale è 73 KB, dopo esser stato processato dal mio tool, estratto dalla ZIP pesa oltre 1 MB.

import pefile
import os
import hexdump
import re
import logging
import math
from capstone import Cs, CS_ARCH_X86, CS_MODE_32, CS_MODE_64, CsError
from datetime import datetime

logging.basicConfig(filename='disassembly_errors.log', level=logging.ERROR)

def get_file_timestamps(file_path):
        stat_info = os.stat(file_path)
        creation_time = datetime.fromtimestamp(stat_info.st_ctime)
        modification_time = datetime.fromtimestamp(stat_info.st_mtime)
        return creation_time, modification_time
    except Exception as e:
        print(f"Errore durante il recupero dei timestamp del file: {e}")
        return None, None

def print_timestamp_info(file_path):
    creation_time, modification_time = get_file_timestamps(file_path)
    if creation_time and modification_time:
        print_header("Timestamp del File")
        print_info("Data di Creazione", creation_time.strftime("%Y-%m-%d %H:%M:%S"),
                   "Data e ora in cui il file è stato creato")
        print_info("Data di Modifica", modification_time.strftime("%Y-%m-%d %H:%M:%S"),
                   "Data e ora dell'ultima modifica del file")

def get_architecture_description(machine_type):
    if machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386']:
        return "x86 (32-bit)"
    elif machine_type == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_AMD64']:
        return "x64 (64-bit)"
        return "Sconosciuta"

def calculate_entropy(data):
    if not data:
        return 0
    freq = [0] * 256
    for byte in data:
        freq[byte] += 1
    entropy = 0
    length = len(data)
    for count in freq:
        if count > 0:
            p = count / length
            entropy -= p * math.log2(p)
    return entropy

def detect_packer(data):
    packer_signatures = [
        b'UPX', b'Themida', b'PECompact'
    for sig in packer_signatures:
        if sig in data:
            return f"Il file è stato compresso con il packer: {sig.decode()}"
        print("Il file non è stato compresso con un Packer.")

def extract_pe_info(file_path):
        pe = pefile.PE(file_path)
    except FileNotFoundError:
        print(f"Errore: Il file '{file_path}' non è stato trovato.")
    except pefile.PEFormatError as e:
        print(f"Errore: Il formato del file PE non è valido. Dettagli: {e}")
    print_header("Informazioni PE")
    packer_detection_result = detect_packer(pe.__data__)
    print_header("Informazioni DOS")
    print_info("e_magic", f"{pe.DOS_HEADER.e_magic}",
               "Numero magico del DOS Header, deve essere 0x5A4D per i file PE32")
    print_info("e_lfanew", f"{pe.DOS_HEADER.e_lfanew}",
               "Offset dell'header PE dal DOS Header, indica dove inizia l'header PE")

    print_header("Informazioni File")
    architecture_description = get_architecture_description(pe.FILE_HEADER.Machine)
    print_info("Machine", f"{pe.FILE_HEADER.Machine} ({architecture_description})",
               "Tipo di macchina per la quale il file è compilato")
    print_info("Number of Sections", f"{pe.FILE_HEADER.NumberOfSections}",
               "Numero di sezioni nel file PE")
    print_info("TimeDateStamp", f"{pe.FILE_HEADER.TimeDateStamp}",
               "Timestamp della creazione del file (secondi dal 1 Gennaio 1970, Epoch)")
    print_info("Pointer to Symbol Table", f"{pe.FILE_HEADER.PointerToSymbolTable}",
               "Offset alla tabella dei simboli (per PE moderni questo valore è solitamente 0)")

    print_header("Informazioni Opzionali")
    print_info("AddressOfEntryPoint", f"{pe.OPTIONAL_HEADER.AddressOfEntryPoint}",
               "Indirizzo di ingresso del programma relativo alla base dell'immagine")
    print_info("ImageBase", f"{pe.OPTIONAL_HEADER.ImageBase}",
               "Indirizzo base dell'immagine quando il file è caricato in memoria")
    print_info("SectionAlignment", f"{pe.OPTIONAL_HEADER.SectionAlignment}",
               "Allineamento delle sezioni in memoria (spazio riservato tra le sezioni)")

    for section in pe.sections:
        name = section.Name.decode().strip('\x00')
        section_data = section.get_data()
        entropy = calculate_entropy(section_data)
        print(f"\nSezione '{name}':")
        print_info("Virtual Address", f"{section.VirtualAddress}",
                   "Indirizzo virtuale della sezione in memoria")
        print_info("Size of Raw Data", f"{section.SizeOfRawData}",
                   "Dimensione dei dati raw della sezione (dimensione sul disco)")
        print_info("Entropy", f"{entropy:.4f}",
                   "Entropia della sezione (misura della casualità dei dati)")

    print_header("Tabella degli Import")
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        dll_name = entry.dll.decode()
        print(f"\nDLL: {dll_name}")
        for imp in entry.imports:
            import_name = imp.name.decode() if imp.name else 'None'
            print(f"  {import_name} a RVA {hex(imp.address)}")

    if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        print_header("Tabella delle Esportazioni")
        for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
            export_name = exp.name.decode() if exp.name else 'None'
            print(f"  {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")

def extract_sections(file_path):
    pe = pefile.PE(file_path)
    output_dir = "sections"
    if not os.path.exists(output_dir):

    arch = CS_ARCH_X86
    mode = CS_MODE_32 if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386'] else CS_MODE_64

    for section in pe.sections:
        section_name = section.Name.decode().strip('\x00') or "unnamed_section"
        section_data = section.get_data()

        with open(os.path.join(output_dir, f"{section_name}.bin"), "wb") as f:

        with open(os.path.join(output_dir, f"{section_name}.txt"), "w") as f:
            f.write(hexdump.hexdump(section_data, result='return'))

        strings = extract_strings(section_data)
        if strings:
            with open(os.path.join(output_dir, f"{section_name}_strings.txt"), "w") as f:

        section_start = section.VirtualAddress
        disassemble(section_data, section_start, arch, mode, section_name, output_dir)

    extract_imports(pe, output_dir)
    extract_compiler_linker_info(pe, output_dir)

def extract_strings(data):
    strings = []
    for match in re.finditer(b'[ -~]{4,}', data):
        string = match.group().decode(errors='ignore')
        if len(string) > 4: #regola il valore a seconda delle tue esigenze
    return strings

def disassemble(section_data, section_start, arch, mode, section_name, output_dir):
    md = Cs(arch, mode)
    disassembly = ""

    if len(section_data) < 16: #regola il valore a seconda delle tue esigenze
        warning_message = "Attenzione: I dati della sezione sono troppo brevi per la disassemblatura."
        disassembly += warning_message + "\n"
            with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
        except IOError as io_error:
            error_message = f"Errore IO durante la scrittura del file: {io_error}"
            disassembly += error_message + "\n"

        for i in md.disasm(section_data, section_start):
                disassembly += (
                    f"    bytes: {' '.join(f'{b:02x}' for b in i.bytes)}\n"
                    f"    length: {i.size} bytes\n"
                    f"    RVA: {i.address - section_start + section_start:08x}\n"
            except Exception as e:
                error_message = f"Errore durante l'elaborazione dell'istruzione all'indirizzo {i.address:08x}: {e}"
                disassembly += error_message + "\n"

    except CsError as e:
        error_message = f"Errore del motore Capstone: {e}"
        disassembly += error_message + "\n"
    except Exception as e:
        error_message = f"Si è verificato un errore durante la disassemblatura della sezione: {e}"
        disassembly += error_message + "\n"

        with open(os.path.join(output_dir, f"{section_name}_disassembly.txt"), "w") as f:
    except IOError as io_error:
        error_message = f"Errore IO durante la scrittura del file: {io_error}"
        disassembly += error_message + "\n"

    print(f"Disassemblatura della sezione '{section_name}' completata e salvata in '{section_name}_disassembly.txt'.")

def extract_imports(pe, output_dir):
    import_info = ["Tabella degli Import:"]
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        dll_name = entry.dll.decode()
        import_info.append(f"DLL: {dll_name}")
        for imp in entry.imports:
            import_name = imp.name.decode() if imp.name else 'None'
            import_info.append(f"  {import_name} a RVA {hex(imp.address)} (Hint: {imp.hint})")
    with open(os.path.join(output_dir, "imports.txt"), "w") as f:
    if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        export_info = ["Tabella delle Esportazioni:"]
        for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
            export_name = exp.name.decode() if exp.name else 'None'
            export_info.append(f"  {export_name} a RVA {hex(exp.address)} (Ordinal: {exp.ordinal})")
        with open(os.path.join(output_dir, "exports.txt"), "w") as f:
        with open(os.path.join(output_dir, "exports.txt"), "w") as f:
            f.write("Nessuna tabella di esportazione trovata.")

    print("Informazioni su import ed esportazioni estratte e salvate.")

def extract_compiler_linker_info(pe, output_dir):
        with open(os.path.join(output_dir, "compiler_linker_info.txt"), "w") as f:
            if hasattr(pe, 'VS_VERSIONINFO') and hasattr(pe.VS_VERSIONINFO, 'StringTable'):
                for string_table in pe.VS_VERSIONINFO.StringTable:
                    if 'ProductVersion' in string_table.entries:
                        f.write(f"Compilatore: {string_table.entries['ProductVersion']}\n")
                    if 'FileVersion' in string_table.entries:
                        f.write(f"Linker: {string_table.entries['FileVersion']}\n")
                f.write("Informazioni sul compilatore e linker non disponibili.\n")
    except Exception as e:
        error_message = f"Errore durante l'estrazione delle informazioni sul compilatore e linker: {e}"

def print_header(header):

def print_info(title, data, description):
    print(f"{title}: {data}")
    print(f"  {description}")

if __name__ == "__main__":
    import sys
    if len(sys.argv) != 2:
        print("Uso: python extract_pe_info_and_sections.py <percorso_del_file_pe>")

    file_path = sys.argv[1]
    if not os.path.isfile(file_path):
        print(f"Errore: Il file '{file_path}' non esiste.")



  • Example_Output.zip
    234.4 KB · Visualizzazioni: 1
