import json import logging from typing import Callable import paho.mqtt.client as mqtt from paho.mqtt.client import ConnectFlags from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.properties import Properties from paho.mqtt.reasoncodes import ReasonCode from domain.exceptions import MessageBrokerError, InfrastructureError from domain.entities import UplinkEvent from ports import MessageBroker log = logging.getLogger(__name__) class PahoMqttBroker(MessageBroker): def __init__(self, host: str, port: int, topic: str): self._host = host self._port = port self._topic = topic def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None: _client = mqtt.Client(CallbackAPIVersion.VERSION2) def _on_connect( client: mqtt.Client, userdata: None, flags: ConnectFlags, reason_code: ReasonCode, props: Properties | None, ) -> None: log.info("MQTT connecté (code=%s)", reason_code) client.subscribe(self._topic, qos=1) def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None: try: body = json.loads(msg.payload) except json.JSONDecodeError as e: log.error("Payload JSON invalide sur %s : %s", msg.topic, e) return try: dev_eui = body["deviceInfo"]["devEui"] pulse_count = int(body["object"]["pulse_count"]) except (KeyError, ValueError, TypeError) as e: log.error("Champs manquants ou invalides : %s | body=%s", e, body) return try: on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count)) except InfrastructureError as e: log.error("Erreur infrastructure pour %s : %s", dev_eui, e) _client.on_connect = _on_connect _client.on_message = _on_message _client.reconnect_delay_set(min_delay=1, max_delay=30) try: _client.connect(self._host, self._port, keepalive=60) except OSError as e: raise MessageBrokerError(f"Impossible de joindre {self._host}:{self._port} : {e}") from e _client.loop_forever()