import json import logging from typing import Callable import paho.mqtt.client as mqtt from paho.mqtt.client import ConnectFlags from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.properties import Properties from paho.mqtt.reasoncodes import ReasonCode from entities import UplinkEvent from ports import MessageBroker log = logging.getLogger(__name__) class PahoMqttBroker(MessageBroker): def __init__(self, host: str, port: int, topic: str): self._host = host self._port = port self._topic = topic def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None: _client = mqtt.Client(CallbackAPIVersion.VERSION2) def _on_connect( client: mqtt.Client, userdata: None, flags: ConnectFlags, reason_code: ReasonCode, props: Properties | None, ) -> None: log.info("MQTT connecté (code=%s)", reason_code) client.subscribe(self._topic, qos=1) def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None: try: body = json.loads(msg.payload) dev_eui = body["deviceInfo"]["devEui"] pulse_count = int(body["object"]["pulse_count"]) on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count)) except Exception as e: log.exception("Erreur parsing message MQTT : %s", e) _client.on_connect = _on_connect _client.on_message = _on_message _client.reconnect_delay_set(min_delay=1, max_delay=30) _client.connect(self._host, self._port, keepalive=60) _client.loop_forever()