#!/usr/bin/python3 import serial import time import json import base64 import tarfile import paho.mqtt.client as mqtt import tempfile import os import threading import socket from datetime import datetime, timedelta import logging import sys import struct # === CONFIG LOGGING === logging.basicConfig( level=logging.INFO, format="%(levelname)s | %(message)s", stream=sys.stdout ) # === CONFIG === SERIAL_PORTS = ["/dev/esp_1", "/dev/esp_2", "/dev/esp_3", "/dev/esp_4"] # aggiungi altre se servono BAUDRATE = 2000000 DURATION = 300 CLIENT_ID = socket.gethostname() BROKER = "kiot-mqtts.pal.it" PORT = 8883 TOPIC = "paliot/devs" MAX_RETRIES = 30 USERNAME = "svcEMQXPortal" PASSWORD = "n$IYAnkYCG7M#7zcJod23OS#" STARTDATETIME = datetime.utcnow().isoformat() + "Z" # === BIN FRAME CONFIG === FRAME_FMT = " int: c = 0 for b in data: c ^= b return c & 0xFFFF def reboot_machine(): logging.critical(">> Riavvio macchina per mancanza connessione") os.system("reboot") # === SERIAL CAPTURE (BINARIO → LOG TESTO) === def capture_serial(port, output_dir): log_path = os.path.join(output_dir, f"{os.path.basename(port)}.log") buffer = bytearray() start_wall_time = None start_ts_us = None try: with serial.Serial(port, BAUDRATE, timeout=1) as ser, \ open(log_path, "w", encoding="utf-8") as f: start = time.time() while time.time() - start < DURATION: buffer += ser.read(1024) while len(buffer) >= FRAME_SIZE: if buffer[0] != 0xCD or buffer[1] != 0xAB: buffer.pop(0) continue frame = buffer[:FRAME_SIZE] buffer = buffer[FRAME_SIZE:] magic, ts_us, *imu, crc = struct.unpack(FRAME_FMT, frame) if crc16_xor(frame[:-2]) != crc: continue if start_wall_time is None: start_wall_time = datetime.utcnow() start_ts_us = ts_us delta_us = ts_us - start_ts_us ts = start_wall_time + timedelta(microseconds=delta_us) imu1 = imu[:6] imu2 = imu[6:] line = ( f"{ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}" f"|1,{','.join(map(str, imu1))}" f"|2,{','.join(map(str, imu2))}\n" ) f.write(line) except Exception as e: logging.error(f"[{port}] Errore seriale: {e}") return log_path # === FILE UTILS === def compress_directory(directory): tar_path = os.path.join(directory, "logs.tar.gz") with tarfile.open(tar_path, "w:gz") as tar: for filename in os.listdir(directory): if filename.endswith(".log"): tar.add(os.path.join(directory, filename), arcname=filename) return tar_path def file_to_base64(file_path): with open(file_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def build_payload(encoded_log): return { "clientId": CLIENT_ID, "dateTime": STARTDATETIME, "log": encoded_log } # === MQTT SENDER === def send_mqtt(payload): retries = 0 while retries < MAX_RETRIES: try: client = mqtt.Client(client_id=CLIENT_ID) client.username_pw_set(USERNAME, PASSWORD) client.tls_set() client.connect(BROKER, PORT, 60) client.loop_start() client.publish(TOPIC, json.dumps(payload)) client.loop_stop() client.disconnect() logging.info(">> Payload MQTT inviato con successo.") return True except Exception as e: retries += 1 logging.warning(f"MQTT publish failed ({retries}/{MAX_RETRIES}): {e}") if retries == MAX_RETRIES: reboot_machine() return False time.sleep(5) return False # === MAIN === def main(): temp_dir = tempfile.mkdtemp(prefix="serial_logs_") threads = [] for port in SERIAL_PORTS: t = threading.Thread(target=capture_serial, args=(port, temp_dir)) t.start() threads.append(t) for t in threads: t.join() logging.info(">> Tutte le seriali hanno completato la registrazione.") tar_file = compress_directory(temp_dir) encoded = file_to_base64(tar_file) payload = build_payload(encoded) send_mqtt(payload) for filename in os.listdir(temp_dir): os.remove(os.path.join(temp_dir, filename)) os.rmdir(temp_dir) logging.info(">> Operazione completata.") if __name__ == "__main__": main()