2026-05-09 17:06:57 +02:00
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Callable
|
|
|
|
|
|
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
|
from paho.mqtt.client import ConnectFlags
|
|
|
|
|
from paho.mqtt.enums import CallbackAPIVersion
|
|
|
|
|
from paho.mqtt.properties import Properties
|
|
|
|
|
from paho.mqtt.reasoncodes import ReasonCode
|
|
|
|
|
|
2026-05-09 18:21:45 +02:00
|
|
|
from domain.exceptions import MessageBrokerError, InfrastructureError
|
|
|
|
|
from domain.entities import UplinkEvent
|
2026-05-09 17:06:57 +02:00
|
|
|
from ports import MessageBroker
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PahoMqttBroker(MessageBroker):
|
|
|
|
|
def __init__(self, host: str, port: int, topic: str):
|
|
|
|
|
self._host = host
|
|
|
|
|
self._port = port
|
|
|
|
|
self._topic = topic
|
|
|
|
|
|
|
|
|
|
def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None:
|
|
|
|
|
_client = mqtt.Client(CallbackAPIVersion.VERSION2)
|
|
|
|
|
|
|
|
|
|
def _on_connect(
|
|
|
|
|
client: mqtt.Client,
|
|
|
|
|
userdata: None,
|
|
|
|
|
flags: ConnectFlags,
|
|
|
|
|
reason_code: ReasonCode,
|
|
|
|
|
props: Properties | None,
|
|
|
|
|
) -> None:
|
|
|
|
|
log.info("MQTT connecté (code=%s)", reason_code)
|
|
|
|
|
client.subscribe(self._topic, qos=1)
|
|
|
|
|
|
|
|
|
|
def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None:
|
|
|
|
|
try:
|
|
|
|
|
body = json.loads(msg.payload)
|
2026-05-09 17:47:37 +02:00
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
log.error("Payload JSON invalide sur %s : %s", msg.topic, e)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
2026-05-09 17:06:57 +02:00
|
|
|
dev_eui = body["deviceInfo"]["devEui"]
|
|
|
|
|
pulse_count = int(body["object"]["pulse_count"])
|
2026-05-09 17:47:37 +02:00
|
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
|
|
|
log.error("Champs manquants ou invalides : %s | body=%s", e, body)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
2026-05-09 17:06:57 +02:00
|
|
|
on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count))
|
2026-05-09 17:47:37 +02:00
|
|
|
except InfrastructureError as e:
|
|
|
|
|
log.error("Erreur infrastructure pour %s : %s", dev_eui, e)
|
2026-05-09 17:06:57 +02:00
|
|
|
|
|
|
|
|
_client.on_connect = _on_connect
|
|
|
|
|
_client.on_message = _on_message
|
|
|
|
|
_client.reconnect_delay_set(min_delay=1, max_delay=30)
|
2026-05-09 17:47:37 +02:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
_client.connect(self._host, self._port, keepalive=60)
|
|
|
|
|
except OSError as e:
|
|
|
|
|
raise MessageBrokerError(f"Impossible de joindre {self._host}:{self._port} : {e}") from e
|
|
|
|
|
|
2026-05-09 17:06:57 +02:00
|
|
|
_client.loop_forever()
|