Issue
Code backup
This commit is contained in:
@@ -0,0 +1,180 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import serial
|
||||
import time
|
||||
import json
|
||||
import base64
|
||||
import tarfile
|
||||
import paho.mqtt.client as mqtt
|
||||
import tempfile
|
||||
import os
|
||||
import threading
|
||||
import socket
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import sys
|
||||
import struct
|
||||
|
||||
# === CONFIG LOGGING ===
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(levelname)s | %(message)s",
|
||||
stream=sys.stdout
|
||||
)
|
||||
|
||||
# === CONFIG ===
|
||||
SERIAL_PORTS = ["/dev/esp_1", "/dev/esp_2", "/dev/esp_3", "/dev/esp_4"] # aggiungi altre se servono
|
||||
BAUDRATE = 2000000
|
||||
DURATION = 300
|
||||
CLIENT_ID = socket.gethostname()
|
||||
BROKER = "kiot-mqtts.pal.it"
|
||||
PORT = 8883
|
||||
TOPIC = "paliot/devs"
|
||||
MAX_RETRIES = 30
|
||||
USERNAME = "svcEMQXPortal"
|
||||
PASSWORD = "n$IYAnkYCG7M#7zcJod23OS#"
|
||||
STARTDATETIME = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
# === BIN FRAME CONFIG ===
|
||||
FRAME_FMT = "<H I 6h 6h H"
|
||||
FRAME_SIZE = struct.calcsize(FRAME_FMT)
|
||||
MAGIC = 0xABCD
|
||||
|
||||
# === UTILS ===
|
||||
def crc16_xor(data: bytes) -> int:
|
||||
c = 0
|
||||
for b in data:
|
||||
c ^= b
|
||||
return c & 0xFFFF
|
||||
|
||||
def reboot_machine():
|
||||
logging.critical(">> Riavvio macchina per mancanza connessione")
|
||||
os.system("reboot")
|
||||
|
||||
# === SERIAL CAPTURE (BINARIO → LOG TESTO) ===
|
||||
def capture_serial(port, output_dir):
|
||||
log_path = os.path.join(output_dir, f"{os.path.basename(port)}.log")
|
||||
buffer = bytearray()
|
||||
|
||||
start_wall_time = None
|
||||
start_ts_us = None
|
||||
|
||||
try:
|
||||
with serial.Serial(port, BAUDRATE, timeout=1) as ser, \
|
||||
open(log_path, "w", encoding="utf-8") as f:
|
||||
|
||||
start = time.time()
|
||||
|
||||
while time.time() - start < DURATION:
|
||||
buffer += ser.read(1024)
|
||||
|
||||
while len(buffer) >= FRAME_SIZE:
|
||||
if buffer[0] != 0xCD or buffer[1] != 0xAB:
|
||||
buffer.pop(0)
|
||||
continue
|
||||
|
||||
frame = buffer[:FRAME_SIZE]
|
||||
buffer = buffer[FRAME_SIZE:]
|
||||
|
||||
magic, ts_us, *imu, crc = struct.unpack(FRAME_FMT, frame)
|
||||
|
||||
if crc16_xor(frame[:-2]) != crc:
|
||||
continue
|
||||
|
||||
if start_wall_time is None:
|
||||
start_wall_time = datetime.utcnow()
|
||||
start_ts_us = ts_us
|
||||
|
||||
delta_us = ts_us - start_ts_us
|
||||
ts = start_wall_time + timedelta(microseconds=delta_us)
|
||||
|
||||
imu1 = imu[:6]
|
||||
imu2 = imu[6:]
|
||||
|
||||
line = (
|
||||
f"{ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}"
|
||||
f"|1,{','.join(map(str, imu1))}"
|
||||
f"|2,{','.join(map(str, imu2))}\n"
|
||||
)
|
||||
|
||||
f.write(line)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"[{port}] Errore seriale: {e}")
|
||||
|
||||
return log_path
|
||||
|
||||
# === FILE UTILS ===
|
||||
def compress_directory(directory):
|
||||
tar_path = os.path.join(directory, "logs.tar.gz")
|
||||
with tarfile.open(tar_path, "w:gz") as tar:
|
||||
for filename in os.listdir(directory):
|
||||
if filename.endswith(".log"):
|
||||
tar.add(os.path.join(directory, filename), arcname=filename)
|
||||
return tar_path
|
||||
|
||||
def file_to_base64(file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
return base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
def build_payload(encoded_log):
|
||||
return {
|
||||
"clientId": CLIENT_ID,
|
||||
"dateTime": STARTDATETIME,
|
||||
"log": encoded_log
|
||||
}
|
||||
|
||||
# === MQTT SENDER ===
|
||||
def send_mqtt(payload):
|
||||
retries = 0
|
||||
while retries < MAX_RETRIES:
|
||||
try:
|
||||
client = mqtt.Client(client_id=CLIENT_ID)
|
||||
client.username_pw_set(USERNAME, PASSWORD)
|
||||
client.tls_set()
|
||||
client.connect(BROKER, PORT, 60)
|
||||
client.loop_start()
|
||||
client.publish(TOPIC, json.dumps(payload))
|
||||
client.loop_stop()
|
||||
client.disconnect()
|
||||
logging.info(">> Payload MQTT inviato con successo.")
|
||||
return True
|
||||
except Exception as e:
|
||||
retries += 1
|
||||
logging.warning(f"MQTT publish failed ({retries}/{MAX_RETRIES}): {e}")
|
||||
|
||||
if retries == MAX_RETRIES:
|
||||
reboot_machine()
|
||||
return False
|
||||
|
||||
time.sleep(5)
|
||||
return False
|
||||
|
||||
# === MAIN ===
|
||||
def main():
|
||||
temp_dir = tempfile.mkdtemp(prefix="serial_logs_")
|
||||
threads = []
|
||||
|
||||
for port in SERIAL_PORTS:
|
||||
t = threading.Thread(target=capture_serial, args=(port, temp_dir))
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
logging.info(">> Tutte le seriali hanno completato la registrazione.")
|
||||
|
||||
tar_file = compress_directory(temp_dir)
|
||||
encoded = file_to_base64(tar_file)
|
||||
payload = build_payload(encoded)
|
||||
send_mqtt(payload)
|
||||
|
||||
for filename in os.listdir(temp_dir):
|
||||
os.remove(os.path.join(temp_dir, filename))
|
||||
os.rmdir(temp_dir)
|
||||
|
||||
logging.info(">> Operazione completata.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user