Discussione Monitor Di Stato e msg Telegram

mrcamarium

Utente Silver
7 Gennaio 2022
111
27
6
61
E' da parecchio che non posto più nulla perché sono sempre impegnato, stavo lavorando a un progetto con Pi-Hole che ho concluso in questi giorni, grazia ad una manciata di led e un codice adesso posso controllare lo stato di Pi-Hole senza usare il PC o il cellulare. Posto il codice:
Python:
"""
Versione 0.6

- Verifica il funzionamento dei LED con un effetto a cascata.
- Attiva un LED al superamento della temperatura di soglia e manda un messaggio su Telegram.
- Verifica lo stato di Pi-hole con un LED e un messaggio su Telegram.
- Invia statistiche su Telegram e attiva un LED per i siti bloccati.
- Verifica la connessione a Internet accendendo un LED.
- Verifica il carico della CPU accendendo un LED e inviando una segnalazione su Telegram.

Installazione dei pacchetti richiesti su Raspberry Pi:
------------------------------------------------------
1. sudo apt update && sudo apt install sqlite3 -y
2. sudo tail -n 20 /var/log/pihole-FTL.log
3. sudo usermod -aG pihole $USER
4. newgrp pihole
5. sudo apt update && sudo apt install python3-pip -y
6. pip3 --version
7. pip3 install psutil
"""
import os
import requests
import time
import subprocess
import psutil
from gpiozero import LED

# Configurazione
TOKEN = " "  # Sostituiscilo con il tuo token
CHAT_ID = " "  # Sostituiscilo con il tuo chat ID
THRESHOLD_TEMP = 35  # Soglia temperatura (°C)
GPIO_PINS = [17, 18, 22, 23, 24]  # GPIO da usare
PIHOLE_LED_PIN = 18  # Pin per il LED di stato di Pi-hole
LOG_FILE = "/var/log/pihole.log"  # Percorso del log di Pi-hole
BLOCKED_LED_PIN = 22  # LED per le query bloccate
CHECK_INTERVAL = 10  # Ogni quanti secondi controllare il log
THRESHOLD_SLOW = 5  # Soglia per lampeggio lento
THRESHOLD_FAST = 15  # Soglia per lampeggio veloce
RETE_PIN = 23  # LED per indicare lo stato della connessione Internet
CPU_LOAD_LED_PIN = 24  # LED per il carico CPU

# Inizializzazione LED
pihole_led = LED(PIHOLE_LED_PIN)
blocked_led = LED(BLOCKED_LED_PIN)
rete_led = LED(RETE_PIN)
cpu_led = LED(CPU_LOAD_LED_PIN)

last_pihole_status = None  # Memorizza l'ultimo stato di Pi-hole
last_blocked_state = None  # Memorizza l'ultimo stato delle query bloccate
last_cpu_state = None  # Memorizza l'ultimo stato della CPU

def get_cpu_temperature():
    """Legge la temperatura della CPU."""
    temp = os.popen("vcgencmd measure_temp").readline()
    temp = float(temp.replace("temp=", "").replace("'C\n", ""))
    print(f"🌡 Temperatura letta: {temp}°C")
    return temp

def send_telegram_message(message):
    """Invia un messaggio su Telegram."""
    url = f"https://api.telegram.org/bot{TOKEN}/sendMessage"
    data = {"chat_id": CHAT_ID, "text": message}
    requests.post(url, data=data)

def check_pihole_status():
    """Controlla lo stato del servizio Pi-hole."""
    try:
        status = subprocess.check_output(['systemctl', 'is-active', 'pihole-FTL']).strip().decode('utf-8')
        return status if status in ['active', 'inactive', 'activating', 'deactivating'] else 'unknown'
    except subprocess.CalledProcessError:
        return 'unknown'

def control_pihole_led(status):
    """Controlla il comportamento del LED in base allo stato di Pi-hole."""
    if status == 'active':
        pihole_led.on()
    elif status == 'inactive':
        pihole_led.off()
    elif status == 'restarting':
        for _ in range(10):
            pihole_led.toggle()
            time.sleep(0.5)
        pihole_led.on()

def count_blocked_queries():
    """Conta il numero di query bloccate nel log di Pi-hole."""
    try:
        with open(LOG_FILE, "r") as log:
            lines = log.readlines()[-100:]
        return sum(1 for line in lines if "blocked" in line.lower())
    except Exception as e:
        print(f"Errore nella lettura del log: {e}")
        return 0

def update_blocked_led_status(blocked_queries):
    """Aggiorna lo stato del LED in base al numero di query bloccate."""
    if blocked_queries == 0:
        blocked_led.off()
    elif blocked_queries < THRESHOLD_SLOW:  # (Meno di 5 query)
        blocked_led.blink(on_time=1, off_time=1)  # Lampeggio lento
    elif blocked_queries < THRESHOLD_FAST:  # (Da 5 a 14 query)
        blocked_led.blink(on_time=0.5, off_time=0.5)  # Lampeggio medio
    elif blocked_queries < 50:  # (Da 15 a 49 query)
        blocked_led.blink(on_time=0.3, off_time=0.3)  # Lampeggio veloce
    else:  # (50 o più query)
        blocked_led.blink(on_time=0.1, off_time=0.1)  # Lampeggio rapidissimo

def check_connection():
    """Verifica la connessione a Internet pingando Google DNS."""
    try:
        subprocess.run(["ping", "-c", "1", "8.8.8.8"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2)
        return True
    except subprocess.TimeoutExpired:
        return False

def get_cpu_load():
    """Restituisce l'uso della CPU."""
    return psutil.cpu_percent(interval=1)

def control_led(cpu_load):
    """Gestisce lo stato del LED in base al carico della CPU e invia avvisi solo se cambia stato."""
    global last_cpu_state
    if cpu_load < 30:
        new_state = "Basso Carico"
        cpu_led.off()
    elif 30 <= cpu_load < 70:
        new_state = "Carico Medio"
        cpu_led.on()
    else:
        new_state = "Carico Elevato"
        for _ in range(5):
            cpu_led.on()
            time.sleep(0.2)
            cpu_led.off()
            time.sleep(0.2)

    # Invia il messaggio solo se cambia lo stato
    if new_state != last_cpu_state:
        print(f"🔔 Stato CPU: {new_state}")
        send_telegram_message(f"🔔 Stato CPU: {new_state}")
        last_cpu_state = new_state  # Aggiorna lo stato precedente

# TEST INIZIALE: Verifica accensione LED
print("🔄 Test LED all'avvio...")
for pin in GPIO_PINS:
    os.system(f"raspi-gpio set {pin} op")
    os.system(f"raspi-gpio set {pin} dh")
time.sleep(3)
for pin in GPIO_PINS:
    os.system(f"raspi-gpio set {pin} dl")
    time.sleep(1)
print("✅ Test LED completato. Avvio monitoraggio...")

while True:
    temp = get_cpu_temperature()
    if temp > THRESHOLD_TEMP:
        print(f"⚠️ ATTENZIONE! Temperatura sopra i {THRESHOLD_TEMP}°C: {temp}°C")
        send_telegram_message(f"⚠️ Attenzione! CPU sopra i {THRESHOLD_TEMP}°C: {temp}°C")

    pihole_status = check_pihole_status()
    if pihole_status != last_pihole_status:
        control_pihole_led(pihole_status)
        send_telegram_message(f"✅ Stato Pi-hole: {pihole_status}")
        last_pihole_status = pihole_status

    blocked_queries = count_blocked_queries()
    if blocked_queries != last_blocked_state:
        update_blocked_led_status(blocked_queries)
        send_telegram_message(f"📊 Siti Bloccati: {blocked_queries}")
        last_blocked_state = blocked_queries

    if check_connection():
        print("✅ Connessione OK")
        rete_led.on()
    else:
        print("❌ Nessuna connessione")
        rete_led.off()

    cpu_load = get_cpu_load()
    control_led(cpu_load)

    time.sleep(60)