Ultima modifica:
La faccio breve, sennò qualcuno si annoia e chiude il thread.
Regole:
Lo script è inteso per Windows CMD/Powershell e richiede
Lo script dopo aver terminato l'analisi, genera una regola Yara con i dati rilevati.
Regole:
- Entropia di almeno una sezione superiore a 7.0? VERDETTO: sospetto (bollino arancione)
- Almeno 5 API calls sospette + Entropia alta? VERDETTO: Possibly a backdoor (bollino rosso)
- Entropia bassa e meno di 5 API calls? VERDETTO: pulito/o infetto ma offuscato, potrebbe richiedere ulteriore analisi runtime (bollino giallo)
pktriot.exe
= questo EXE è il client di Packetriot, un noto servizio di port forwarding. L'artefatto è intatto, ed è stato scaricato così com'è dal sito. Dato che è un client, condivide alcune funzioni importanti con le backdoor. VERDETTO: (entropia bassa e meno di 5 API calls sospette rilevate)185.exe
= payload 'raw' o puzzo (come piace definirlo a me) fatto con msfvenom. VERDETTO: (Entropia a 7.02 + classiche API syscalls)
Lo script è inteso per Windows CMD/Powershell e richiede
ssdeep.exe
e strings.exe
o strings64.exe
nella working directory dello script. Per ovvie ragioni, non può lavorare con un ELF.Lo script dopo aver terminato l'analisi, genera una regola Yara con i dati rilevati.
Python:
import os
import hashlib
import pefile
import subprocess
import math
import re
from datetime import datetime, timezone
suspect_api_calls = {
'WSOCK32.dll': [
'socket', 'connect', 'send', 'recv'
],
'WS2_32.dll': [
'WSASocket', 'WSAConnect', 'WSASend', 'WSARecv'
],
'Wininet.dll': [
'InternetOpen', 'InternetConnect', 'HttpOpenRequest', 'HttpSendRequest'
],
'Winhttp.dll': [
'WinHttpOpen', 'WinHttpConnect', 'WinHttpSendRequest', 'WinHttpReceiveResponse'
],
'KERNEL32.dll': [
'CreateFileMapping', 'MapViewOfFile', 'UnmapViewOfFile',
'CreateProcess', 'CreateProcessA', 'CreateProcessW',
'VirtualAlloc', 'VirtualFree', 'VirtualProtect', 'GetProcAddress',
'OpenFileMapping'
],
'Shell32.dll': [
'WinExec', 'ShellExecute', 'ShellExecuteEx'
],
'Advapi32.dll': [
'RegOpenKeyEx', 'RegSetValueEx', 'RegQueryValueEx',
'CreateService', 'StartService', 'OpenService'
],
'IpHlpApi.dll': [
'GetAdaptersInfo', 'GetAdaptersAddresses'
]
}
def calculate_entropy(data):
if not data:
return 0.0
entropy = 0
freq = {}
for byte in data:
freq[byte] = freq.get(byte, 0) + 1
for byte, count in freq.items():
p_x = count / len(data)
entropy -= p_x * math.log2(p_x)
return entropy
def calculate_hashes(file_path):
hashes = {}
hash_md5 = hashlib.md5()
hash_sha1 = hashlib.sha1()
hash_sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hash_md5.update(chunk)
hash_sha1.update(chunk)
hash_sha256.update(chunk)
hashes['MD5'] = hash_md5.hexdigest()
hashes['SHA-1'] = hash_sha1.hexdigest()
hashes['SHA-256'] = hash_sha256.hexdigest()
try:
result = subprocess.run(['ssdeep', file_path], capture_output=True, text=True, check=True)
output_lines = result.stdout.strip().split('\n')
ssdeep_hash = None
for line in output_lines:
if line.startswith('ssdeep,1.1--'):
continue
if any(char.isdigit() for char in line.split(':', 1)[0]):
parts = line.split(':', 2)
if len(parts) > 2:
blocksize = parts[0].strip()
hash1 = parts[1].strip()
hash2 = parts[2].split(',')[0].strip()
ssdeep_hash = f"{blocksize}:{hash1}:{hash2}"
break
hashes['SSDEEP'] = ssdeep_hash if ssdeep_hash else 'Not found'
except subprocess.CalledProcessError as e:
print(f"Errore durante l'esecuzione di ssdeep: {e}")
hashes['SSDEEP'] = 'Error'
return hashes
def analyze_sections(pe):
suspicious_threshold = 7.0
section_info = []
for section in pe.sections:
name = section.Name.decode().rstrip('\x00')
entropy = calculate_entropy(section.get_data())
suspicious = "Suspicious entropy" if entropy >= suspicious_threshold else "No suspicious entropy"
section_info.append({
'name': name,
'entropy': entropy,
'suspicious': suspicious
})
return section_info
def extract_strings(file_path, is_64bit, min_length=10):
extracted_strings = []
command = 'strings64' if is_64bit else 'strings'
try:
result = subprocess.run([command, '-n', str(min_length), file_path], capture_output=True, text=True, check=True)
extracted_strings = result.stdout.strip().split('\n')
except subprocess.CalledProcessError as e:
print(f"Errore durante l'esecuzione di {command}: {e}")
return extracted_strings
def parse_version_string(value):
match = re.match(r'\d+(\.\d+){1,3}', value)
return match.group(0) if match else value
def parse_comments_string(value):
return value.strip()
def filter_metadata_strings(strings):
metadata_fields = {
"CompanyName": None,
"FileDescription": None,
"FileVersion": None,
"InternalName": None,
"LegalCopyright": None,
"OriginalFilename": None,
"ProductName": None,
"ProductVersion": None,
"VarFileInfo": None
}
for i, s in enumerate(strings):
for field in metadata_fields.keys():
if s.startswith(field):
if i + 1 < len(strings):
value = strings[i + 1].strip()
if field in ["FileVersion", "ProductVersion"]:
value = parse_version_string(value)
elif field == "Comments":
value = parse_comments_string(value)
if value and not value.isdigit():
metadata_fields[field] = value
break
return {k: v for k, v in metadata_fields.items() if v is not None}
def search_language_patterns(strings):
language_fields = {
"Language": "No Language",
"Locale": "No Locale",
"Lang": "No Lang",
"Translation": "No Translation"
}
for s in strings:
for field in language_fields.keys():
if field in s:
index = strings.index(s)
if index + 1 < len(strings):
value = strings[index + 1].strip()
language_fields[field] = value
break
return language_fields
def extract_static_urls(strings):
urls = []
url_pattern = re.compile(r'\b(?:http|https)://\S+\b')
for s in strings:
urls.extend(url_pattern.findall(s))
return urls
def extract_imports(file_path):
pe = pefile.PE(file_path)
imports = {}
suspect_count = 0
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode('utf-8')
functions = []
for imp in entry.imports:
if imp.name:
func_name = imp.name.decode('utf-8')
functions.append(func_name)
if func_name in suspect_api_calls.get(dll_name, []):
suspect_count += 1
imports[dll_name] = sorted(functions)
return imports, suspect_count
def generate_yara_rule(file_path):
pe = pefile.PE(file_path)
hashes = calculate_hashes(file_path)
file_size = os.path.getsize(file_path)
timestamp = pe.FILE_HEADER.TimeDateStamp
timestamp_formatted = datetime.fromtimestamp(timestamp, timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
file_type = "EXE" if pe.is_exe() else "DLL" if pe.is_dll() else "Other"
architecture = "32-bit" if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE['IMAGE_FILE_MACHINE_I386'] else "64-bit"
sections = analyze_sections(pe)
extracted_strings = extract_strings(file_path, is_64bit=(architecture == "64-bit"))
metadata_strings = filter_metadata_strings(extracted_strings)
language_strings = search_language_patterns(extracted_strings)
static_urls = extract_static_urls(extracted_strings)
imports, suspect_count = extract_imports(file_path)
high_entropy_detected = any(section['entropy'] >= 7.0 for section in sections)
file_status = "File is clean or maybe using obfuscation. Dynamic analysis is required"
if suspect_count >= 5:
threat_status = "POSSIBLY A BACKDOOR"
elif high_entropy_detected:
threat_status = "Possible threat. High entropy detected"
else:
threat_status = file_status
yara_rule = f"""
rule PEFileRule
{{
meta:
description = "YARA rule for {os.path.basename(file_path)} - File type: {file_type}, Architecture: {architecture}"
author = "Netcat"
md5 = "{hashes.get('MD5')}"
sha1 = "{hashes.get('SHA-1')}"
sha256 = "{hashes.get('SHA-256')}"
ssdeep = "{hashes.get('SSDEEP')}"
file_size = {file_size} bytes
compiled at = "{timestamp_formatted}"
type = "{threat_status}"
strings:
"""
for section in sorted(sections, key=lambda x: x['name']):
yara_rule += f' ${section["name"]} = "{section["name"]}" // Entropy: {section["entropy"]:.2f}; {section["suspicious"]}\n'
for key in sorted(metadata_strings):
yara_rule += f' ${key} = "{metadata_strings[key]}"\n'
for key in sorted(language_strings):
if language_strings[key] != "No Language":
yara_rule += f' ${key} = "{language_strings[key]}"\n'
for i, url in enumerate(static_urls):
yara_rule += f' $StaticURL_{i} = "{url}"\n'
for dll_name, funcs in sorted(imports.items()):
yara_rule += f' // Imports from {dll_name}\n'
for func in funcs:
comment = 'Suspect syscall detected' if func in suspect_api_calls.get(dll_name, []) else ''
yara_rule += f' ${func} = "{func}" {f"// {comment}" if comment else ""}\n'
yara_rule += " condition:\n"
yara_rule += " all of them\n"
yara_rule += "}\n"
return yara_rule
def main():
file_path = input("Enter the path to the PE file: ")
yara_rule = generate_yara_rule(file_path)
print(yara_rule)
if __name__ == "__main__":
main()