From 9dd53b1b24ecf079b96a50bc36931fa0774d68c5 Mon Sep 17 00:00:00 2001 From: Alexis Fourmaux Date: Sat, 9 May 2026 15:36:31 +0200 Subject: [PATCH] feat: improve draft of consumer app to write in db --- .vscode/settings.json | 3 ++ app/consumer/main.py | 89 +++++++++++++++++++++++++++++++---- app/consumer/requirements.txt | 3 +- 3 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a7a7852 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "editor.defaultFormatter": "charliermarsh.ruff" +} \ No newline at end of file diff --git a/app/consumer/main.py b/app/consumer/main.py index 53fb907..ddc7199 100644 --- a/app/consumer/main.py +++ b/app/consumer/main.py @@ -1,30 +1,103 @@ import paho.mqtt.client as mqtt +import psycopg2 import logging import sys +import json +import os +import time + +MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto") +MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) +PG_DSN = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", - stream=sys.stdout, # Docker lit stdout par défaut - force=True + stream=sys.stdout, + force=True, ) - log = logging.getLogger(__name__) +def get_db(): + for _ in range(10): + try: + conn = psycopg2.connect(PG_DSN) + conn.autocommit = True + log.info("PostgreSQL connecté") + return conn + except Exception as e: + log.warning(f"Attente PostgreSQL... ({e})") + time.sleep(3) + raise RuntimeError("Impossible de se connecter à PostgreSQL") + + +def get_device_id(conn, dev_eui: str) -> str: + """ + Insère le device si inconnu + Retourne le device_id correspondant au dev_eui + """ + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO device (device_eui) + VALUES (%s) + ON CONFLICT (device_eui) DO NOTHING + """, + (dev_eui,), + ) + + cur.execute("SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)) + row = cur.fetchone() + return str(row[0]) + + +def insert_reading(conn, device_id: str, pulses: int): + """Insère une télérelève liée au device""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO reading (device_id, date, pulses) + VALUES (%s, NOW(), %s) + """, + (device_id, pulses), + ) + + +db_conn = None + + def on_connect(client, userdata, flags, reason_code, properties): - log.info(f"Connected with result code {reason_code}") - client.subscribe("application/+/device/+/event/up") + log.info(f"MQTT connecté (code={reason_code})") + client.subscribe("application/+/device/+/event/up", qos=1) + def on_message(client, userdata, msg): - log.info(msg.topic+" "+str(msg.payload)) + try: + body = json.loads(msg.payload) + dev_eui = body["deviceInfo"]["devEui"] + pulses = int(body["object"]["pulse_count"]) + + device_id = get_device_id(db_conn, dev_eui) + insert_reading(db_conn, device_id, pulses) + + log.info( + f"[UP] dev_eui={dev_eui} | device_id={device_id} | pulses={pulses}| relevé inséré" + ) + + except Exception as e: + log.exception(f"Erreur traitement message : {e}") +# ── Main ───────────────────────────────────────────────────────── if __name__ == "__main__": + db_conn = get_db() + mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqttc.on_connect = on_connect mqttc.on_message = on_message + mqttc.reconnect_delay_set(min_delay=1, max_delay=30) - mqttc.connect("mosquitto", 1883, 60) - mqttc.loop_forever() \ No newline at end of file + mqttc.connect(MQTT_HOST, MQTT_PORT, 60) + mqttc.loop_forever() diff --git a/app/consumer/requirements.txt b/app/consumer/requirements.txt index 79791a2..8227b0a 100644 --- a/app/consumer/requirements.txt +++ b/app/consumer/requirements.txt @@ -1 +1,2 @@ -paho-mqtt==v2.1.0 \ No newline at end of file +paho-mqtt==v2.1.0 +psycopg2-binary \ No newline at end of file