Mini anti-virus + scrittura regole YARA (no VirusTotal) - Rilevatore di backdoor/trojan statico

Netcat

Helper
17 Gennaio 2022
526
145
383
716
Ultima modifica:
La faccio breve, sennò qualcuno si annoia e chiude il thread.
Regole:
  • Entropia di almeno una sezione superiore a 7.0? VERDETTO: sospetto (bollino arancione) 🟠
  • Almeno 5 API calls sospette + Entropia alta? VERDETTO: Possibly a backdoor (bollino rosso) 🔴
  • Entropia bassa e meno di 5 API calls? VERDETTO: pulito/o infetto ma offuscato, potrebbe richiedere ulteriore analisi runtime (bollino giallo) 🟡
File PE testati (per testare l'accuratezza del mio strumento):
  • pktriot.exe = questo EXE è il client di Packetriot, un noto servizio di port forwarding. L'artefatto è intatto, ed è stato scaricato così com'è dal sito. Dato che è un client, condivide alcune funzioni importanti con le backdoor. VERDETTO: 🟡 (entropia bassa e meno di 5 API calls sospette rilevate)
  • 185.exe = payload 'raw' o puzzo (come piace definirlo a me) fatto con msfvenom. VERDETTO: 🔴 (Entropia a 7.02 + classiche API syscalls)
Da fare: implementare una sandbox per la scansione runtime, che permette di identificare informazioni molto più appetibili per generare regole Yara.
Lo script è inteso per Windows CMD/Powershell e richiede ssdeep.exe e strings.exe o strings64.exe nella working directory dello script. Per ovvie ragioni, non può lavorare con un ELF.

Lo script dopo aver terminato l'analisi, genera una regola Yara con i dati rilevati.

Python:
import os
import hashlib
import pefile
import subprocess
import math
import re
from datetime import datetime, timezone

suspect_api_calls = {
    'WSOCK32.dll': [
        'socket', 'connect', 'send', 'recv'  
    ],
    'WS2_32.dll': [
        'WSASocket', 'WSAConnect', 'WSASend', 'WSARecv'
    ],
    'Wininet.dll': [
        'InternetOpen', 'InternetConnect', 'HttpOpenRequest', 'HttpSendRequest'
    ],
    'Winhttp.dll': [
        'WinHttpOpen', 'WinHttpConnect', 'WinHttpSendRequest', 'WinHttpReceiveResponse'
    ],
    'KERNEL32.dll': [
        'CreateFileMapping', 'MapViewOfFile', 'UnmapViewOfFile',
        'CreateProcess', 'CreateProcessA', 'CreateProcessW',
        'VirtualAlloc', 'VirtualFree', 'VirtualProtect', 'GetProcAddress',
        'OpenFileMapping'
    ],
    'Shell32.dll': [
        'WinExec', 'ShellExecute', 'ShellExecuteEx'
    ],
    'Advapi32.dll': [
        'RegOpenKeyEx', 'RegSetValueEx', 'RegQueryValueEx',
        'CreateService', 'StartService', 'OpenService'
    ],
    'IpHlpApi.dll': [
        'GetAdaptersInfo', 'GetAdaptersAddresses'
    ]
}

def calculate_entropy(data):
    if not data:
        return 0.0
    entropy = 0
    freq = {}
    for byte in data:
        freq[byte] = freq.get(byte, 0) + 1
    for byte, count in freq.items():
        p_x = count / len(data)
        entropy -= p_x * math.log2(p_x)
    return entropy

def calculate_hashes(file_path):
    hashes = {}
 
    hash_md5 = hashlib.md5()
    hash_sha1 = hashlib.sha1()
    hash_sha256 = hashlib.sha256()

    with open(file_path, 'rb') as f:
        while chunk := f.read(8192):
            hash_md5.update(chunk)
            hash_sha1.update(chunk)
            hash_sha256.update(chunk)
 
    hashes['MD5'] = hash_md5.hexdigest()
    hashes['SHA-1'] = hash_sha1.hexdigest()
    hashes['SHA-256'] = hash_sha256.hexdigest()

    try:
        result = subprocess.run(['ssdeep', file_path], capture_output=True, text=True, check=True)
        output_lines = result.stdout.strip().split('\n')
     
        ssdeep_hash = None
        for line in output_lines:
            if line.startswith('ssdeep,1.1--'):
                continue
            if any(char.isdigit() for char in line.split(':', 1)[0]):
                parts = line.split(':', 2)
                if len(parts) > 2:
                    blocksize = parts[0].strip()
                    hash1 = parts[1].strip()
                    hash2 = parts[2].split(',')[0].strip()
                    ssdeep_hash = f"{blocksize}:{hash1}:{hash2}"
                    break
     
        hashes['SSDEEP'] = ssdeep_hash if ssdeep_hash else 'Not found'
    except subprocess.CalledProcessError as e:
        print(f"Errore durante l'esecuzione di ssdeep: {e}")
        hashes['SSDEEP'] = 'Error'

    return hashes

def analyze_sections(pe):
    suspicious_threshold = 7.0
    section_info = []
    for section in pe.sections:
        name = section.Name.decode().rstrip('\x00')
        entropy = calculate_entropy(section.get_data())
        suspicious = "Suspicious entropy" if entropy >= suspicious_threshold else "No suspicious entropy"
        section_info.append({
            'name': name,
            'entropy': entropy,
            'suspicious': suspicious
        })
    return section_info

def extract_strings(file_path, is_64bit, min_length=10):
    extracted_strings = []
    command = 'strings64' if is_64bit else 'strings'
 
    try:
        result = subprocess.run([command, '-n', str(min_length), file_path], capture_output=True, text=True, check=True)
        extracted_strings = result.stdout.strip().split('\n')
    except subprocess.CalledProcessError as e:
        print(f"Errore durante l'esecuzione di {command}: {e}")
 
    return extracted_strings

def parse_version_string(value):
    match = re.match(r'\d+(\.\d+){1,3}', value)
    return match.group(0) if match else value

def parse_comments_string(value):
    return value.strip()

def filter_metadata_strings(strings):
    metadata_fields = {
        "CompanyName": None,
        "FileDescription": None,
        "FileVersion": None,
        "InternalName": None,
        "LegalCopyright": None,
        "OriginalFilename": None,
        "ProductName": None,
        "ProductVersion": None,
        "VarFileInfo": None
    }
 
    for i, s in enumerate(strings):
        for field in metadata_fields.keys():
            if s.startswith(field):
                if i + 1 < len(strings):
                    value = strings[i + 1].strip()
                    if field in ["FileVersion", "ProductVersion"]:
                        value = parse_version_string(value)
                    elif field == "Comments":
                        value = parse_comments_string(value)
                    if value and not value.isdigit():
                        metadata_fields[field] = value
                    break

    return {k: v for k, v in metadata_fields.items() if v is not None}

def search_language_patterns(strings):
    language_fields = {
        "Language": "No Language",
        "Locale": "No Locale",
        "Lang": "No Lang",
        "Translation": "No Translation"
    }
 
    for s in strings:
        for field in language_fields.keys():
            if field in s:
                index = strings.index(s)
                if index + 1 < len(strings):
                    value = strings[index + 1].strip()
                    language_fields[field] = value
                    break

    return language_fields

def extract_static_urls(strings):
    urls = []
    url_pattern = re.compile(r'\b(?:http|https)://\S+\b')
    for s in strings:
        urls.extend(url_pattern.findall(s))
    return urls

def extract_imports(file_path):
    pe = pefile.PE(file_path)
    imports = {}
    suspect_count = 0

    if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in pe.DIRECTORY_ENTRY_IMPORT:
            dll_name = entry.dll.decode('utf-8')
            functions = []
            for imp in entry.imports:
                if imp.name:
                    func_name = imp.name.decode('utf-8')
                    functions.append(func_name)
                    if func_name in suspect_api_calls.get(dll_name, []):
                        suspect_count += 1
            imports[dll_name] = sorted(functions)
 
    return imports, suspect_count

def generate_yara_rule(file_path):
    pe = pefile.PE(file_path)
    hashes = calculate_hashes(file_path)
    file_size = os.path.getsize(file_path)
    timestamp = pe.FILE_HEADER.TimeDateStamp
    timestamp_formatted = datetime.fromtimestamp(timestamp, timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
    file_type = "EXE" if pe.is_exe() else "DLL" if pe.is_dll() else "Other"
    architecture = "32-bit" if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386'] else "64-bit"
    sections = analyze_sections(pe)
    extracted_strings = extract_strings(file_path, is_64bit=(architecture == "64-bit"))
    metadata_strings = filter_metadata_strings(extracted_strings)
    language_strings = search_language_patterns(extracted_strings)
    static_urls = extract_static_urls(extracted_strings)
    imports, suspect_count = extract_imports(file_path)
    high_entropy_detected = any(section['entropy'] >= 7.0 for section in sections)
    file_status = "File is clean or maybe using obfuscation. Dynamic analysis is required"
 
    if suspect_count >= 5:
        threat_status = "POSSIBLY A BACKDOOR"
    elif high_entropy_detected:
        threat_status = "Possible threat. High entropy detected"
    else:
        threat_status = file_status

    yara_rule = f"""
rule PEFileRule
{{
    meta:
        description = "YARA rule for {os.path.basename(file_path)} - File type: {file_type}, Architecture: {architecture}"
        author = "Netcat"
        md5 = "{hashes.get('MD5')}"
        sha1 = "{hashes.get('SHA-1')}"
        sha256 = "{hashes.get('SHA-256')}"
        ssdeep = "{hashes.get('SSDEEP')}"
        file_size = {file_size} bytes
        compiled at = "{timestamp_formatted}"
        type = "{threat_status}"
    strings:
"""

    for section in sorted(sections, key=lambda x: x['name']):
        yara_rule += f'        ${section["name"]}    = "{section["name"]}" // Entropy: {section["entropy"]:.2f}; {section["suspicious"]}\n'
 
    for key in sorted(metadata_strings):
        yara_rule += f'        ${key} = "{metadata_strings[key]}"\n'
 
    for key in sorted(language_strings):
        if language_strings[key] != "No Language":
            yara_rule += f'        ${key} = "{language_strings[key]}"\n'
 
    for i, url in enumerate(static_urls):
        yara_rule += f'        $StaticURL_{i} = "{url}"\n'
 
    for dll_name, funcs in sorted(imports.items()):
        yara_rule += f'        // Imports from {dll_name}\n'
        for func in funcs:
            comment = 'Suspect syscall detected' if func in suspect_api_calls.get(dll_name, []) else ''
            yara_rule += f'        ${func} = "{func}" {f"// {comment}" if comment else ""}\n'
 
    yara_rule += "    condition:\n"
    yara_rule += "        all of them\n"
    yara_rule += "}\n"
 
    return yara_rule

def main():
    file_path = input("Enter the path to the PE file: ")
    yara_rule = generate_yara_rule(file_path)
    print(yara_rule)

if __name__ == "__main__":
    main()
 
  • Mi piace
Reazioni: Reb0rned