fix: improve error management when decoding uplink messages
This commit is contained in:
parent
f77cf3dd75
commit
5c480db410
3 changed files with 50 additions and 26 deletions
|
|
@ -8,6 +8,7 @@ from paho.mqtt.enums import CallbackAPIVersion
|
||||||
from paho.mqtt.properties import Properties
|
from paho.mqtt.properties import Properties
|
||||||
from paho.mqtt.reasoncodes import ReasonCode
|
from paho.mqtt.reasoncodes import ReasonCode
|
||||||
|
|
||||||
|
from exceptions import MessageBrokerError, InfrastructureError
|
||||||
from entities import UplinkEvent
|
from entities import UplinkEvent
|
||||||
from ports import MessageBroker
|
from ports import MessageBroker
|
||||||
|
|
||||||
|
|
@ -36,15 +37,29 @@ class PahoMqttBroker(MessageBroker):
|
||||||
def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None:
|
def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None:
|
||||||
try:
|
try:
|
||||||
body = json.loads(msg.payload)
|
body = json.loads(msg.payload)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
log.error("Payload JSON invalide sur %s : %s", msg.topic, e)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
dev_eui = body["deviceInfo"]["devEui"]
|
dev_eui = body["deviceInfo"]["devEui"]
|
||||||
pulse_count = int(body["object"]["pulse_count"])
|
pulse_count = int(body["object"]["pulse_count"])
|
||||||
|
except (KeyError, ValueError, TypeError) as e:
|
||||||
|
log.error("Champs manquants ou invalides : %s | body=%s", e, body)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count))
|
on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count))
|
||||||
except Exception as e:
|
except InfrastructureError as e:
|
||||||
log.exception("Erreur parsing message MQTT : %s", e)
|
log.error("Erreur infrastructure pour %s : %s", dev_eui, e)
|
||||||
|
|
||||||
_client.on_connect = _on_connect
|
_client.on_connect = _on_connect
|
||||||
_client.on_message = _on_message
|
_client.on_message = _on_message
|
||||||
_client.reconnect_delay_set(min_delay=1, max_delay=30)
|
_client.reconnect_delay_set(min_delay=1, max_delay=30)
|
||||||
|
|
||||||
|
try:
|
||||||
_client.connect(self._host, self._port, keepalive=60)
|
_client.connect(self._host, self._port, keepalive=60)
|
||||||
|
except OSError as e:
|
||||||
|
raise MessageBrokerError(f"Impossible de joindre {self._host}:{self._port} : {e}") from e
|
||||||
|
|
||||||
_client.loop_forever()
|
_client.loop_forever()
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import time
|
||||||
import psycopg2
|
import psycopg2
|
||||||
from psycopg2.extensions import connection
|
from psycopg2.extensions import connection
|
||||||
from ports import DeviceRepository, ReadingRepository
|
from ports import DeviceRepository, ReadingRepository
|
||||||
from exceptions import DatabaseConnectionError
|
from exceptions import DatabaseConnectionError, DatabaseError
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -27,6 +27,7 @@ class PgDeviceRepository(DeviceRepository):
|
||||||
self._conn = conn
|
self._conn = conn
|
||||||
|
|
||||||
def get_or_create_device_id(self, dev_eui: str) -> str:
|
def get_or_create_device_id(self, dev_eui: str) -> str:
|
||||||
|
try:
|
||||||
with self._conn.cursor() as cur:
|
with self._conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
|
|
@ -40,6 +41,8 @@ class PgDeviceRepository(DeviceRepository):
|
||||||
"SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)
|
"SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)
|
||||||
)
|
)
|
||||||
return str(cur.fetchone()[0])
|
return str(cur.fetchone()[0])
|
||||||
|
except psycopg2.DatabaseError as e:
|
||||||
|
raise DatabaseError(f"Erreur de création du device {dev_eui}") from e
|
||||||
|
|
||||||
|
|
||||||
class PgReadingRepository(ReadingRepository):
|
class PgReadingRepository(ReadingRepository):
|
||||||
|
|
@ -47,6 +50,7 @@ class PgReadingRepository(ReadingRepository):
|
||||||
self._conn = conn
|
self._conn = conn
|
||||||
|
|
||||||
def insert_reading(self, device_id: str, pulse_count: int) -> None:
|
def insert_reading(self, device_id: str, pulse_count: int) -> None:
|
||||||
|
try:
|
||||||
with self._conn.cursor() as cur:
|
with self._conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
|
|
@ -55,3 +59,5 @@ class PgReadingRepository(ReadingRepository):
|
||||||
""",
|
""",
|
||||||
(device_id, pulse_count),
|
(device_id, pulse_count),
|
||||||
)
|
)
|
||||||
|
except psycopg2.DatabaseError as e:
|
||||||
|
raise DatabaseError(f"Erreur d'enregistrement de la télérelève sur le device {device_id}") from e
|
||||||
|
|
|
||||||
|
|
@ -6,5 +6,8 @@ class InfrastructureError(Exception):
|
||||||
class DatabaseConnectionError(InfrastructureError):
|
class DatabaseConnectionError(InfrastructureError):
|
||||||
"""Impossible de se connecter à la db"""
|
"""Impossible de se connecter à la db"""
|
||||||
|
|
||||||
|
class DatabaseError(InfrastructureError):
|
||||||
|
"""Erreur lors d'une opération en base de données."""
|
||||||
|
|
||||||
class MessageBrokerError(InfrastructureError):
|
class MessageBrokerError(InfrastructureError):
|
||||||
"""Impossible de se connecter au broker MQTT"""
|
"""Impossible de se connecter au broker MQTT"""
|
||||||
Loading…
Add table
Add a link
Reference in a new issue