From 5c480db4104bf6fdcd1e47ce3bd11759088ece03 Mon Sep 17 00:00:00 2001 From: Alexis Fourmaux Date: Sat, 9 May 2026 17:47:37 +0200 Subject: [PATCH] fix: improve error management when decoding uplink messages --- app/consumer/src/adapters/mqtt.py | 23 +++++++++--- app/consumer/src/adapters/postgres.py | 50 +++++++++++++++------------ app/consumer/src/exceptions.py | 3 ++ 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/app/consumer/src/adapters/mqtt.py b/app/consumer/src/adapters/mqtt.py index f7a19f5..4dd66ef 100644 --- a/app/consumer/src/adapters/mqtt.py +++ b/app/consumer/src/adapters/mqtt.py @@ -8,6 +8,7 @@ from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.properties import Properties from paho.mqtt.reasoncodes import ReasonCode +from exceptions import MessageBrokerError, InfrastructureError from entities import UplinkEvent from ports import MessageBroker @@ -36,15 +37,29 @@ class PahoMqttBroker(MessageBroker): def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None: try: body = json.loads(msg.payload) + except json.JSONDecodeError as e: + log.error("Payload JSON invalide sur %s : %s", msg.topic, e) + return + + try: dev_eui = body["deviceInfo"]["devEui"] pulse_count = int(body["object"]["pulse_count"]) - + except (KeyError, ValueError, TypeError) as e: + log.error("Champs manquants ou invalides : %s | body=%s", e, body) + return + + try: on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count)) - except Exception as e: - log.exception("Erreur parsing message MQTT : %s", e) + except InfrastructureError as e: + log.error("Erreur infrastructure pour %s : %s", dev_eui, e) _client.on_connect = _on_connect _client.on_message = _on_message _client.reconnect_delay_set(min_delay=1, max_delay=30) - _client.connect(self._host, self._port, keepalive=60) + + try: + _client.connect(self._host, self._port, keepalive=60) + except OSError as e: + raise MessageBrokerError(f"Impossible de joindre {self._host}:{self._port} : {e}") from e + _client.loop_forever() diff --git a/app/consumer/src/adapters/postgres.py b/app/consumer/src/adapters/postgres.py index e90c189..73b4f29 100644 --- a/app/consumer/src/adapters/postgres.py +++ b/app/consumer/src/adapters/postgres.py @@ -4,7 +4,7 @@ import time import psycopg2 from psycopg2.extensions import connection from ports import DeviceRepository, ReadingRepository -from exceptions import DatabaseConnectionError +from exceptions import DatabaseConnectionError, DatabaseError log = logging.getLogger(__name__) @@ -27,19 +27,22 @@ class PgDeviceRepository(DeviceRepository): self._conn = conn def get_or_create_device_id(self, dev_eui: str) -> str: - with self._conn.cursor() as cur: - cur.execute( - """ - INSERT INTO device (device_eui) - VALUES (%s) - ON CONFLICT (device_eui) DO NOTHING - """, - (dev_eui,), - ) - cur.execute( - "SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,) - ) - return str(cur.fetchone()[0]) + try: + with self._conn.cursor() as cur: + cur.execute( + """ + INSERT INTO device (device_eui) + VALUES (%s) + ON CONFLICT (device_eui) DO NOTHING + """, + (dev_eui,), + ) + cur.execute( + "SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,) + ) + return str(cur.fetchone()[0]) + except psycopg2.DatabaseError as e: + raise DatabaseError(f"Erreur de création du device {dev_eui}") from e class PgReadingRepository(ReadingRepository): @@ -47,11 +50,14 @@ class PgReadingRepository(ReadingRepository): self._conn = conn def insert_reading(self, device_id: str, pulse_count: int) -> None: - with self._conn.cursor() as cur: - cur.execute( - """ - INSERT INTO reading (device_id, date, pulses) - VALUES (%s, NOW(), %s) - """, - (device_id, pulse_count), - ) + try: + with self._conn.cursor() as cur: + cur.execute( + """ + INSERT INTO reading (device_id, date, pulses) + VALUES (%s, NOW(), %s) + """, + (device_id, pulse_count), + ) + except psycopg2.DatabaseError as e: + raise DatabaseError(f"Erreur d'enregistrement de la télérelève sur le device {device_id}") from e diff --git a/app/consumer/src/exceptions.py b/app/consumer/src/exceptions.py index 7e78c34..fd60569 100644 --- a/app/consumer/src/exceptions.py +++ b/app/consumer/src/exceptions.py @@ -6,5 +6,8 @@ class InfrastructureError(Exception): class DatabaseConnectionError(InfrastructureError): """Impossible de se connecter à la db""" +class DatabaseError(InfrastructureError): + """Erreur lors d'une opération en base de données.""" + class MessageBrokerError(InfrastructureError): """Impossible de se connecter au broker MQTT""" \ No newline at end of file