refacto: reorganize POC into clean architecture for modularity

This commit is contained in:
Alexis Fourmaux 2026-05-09 17:06:57 +02:00
parent 9dd53b1b24
commit 0830c2f182
14 changed files with 194 additions and 104 deletions

View file

@ -2,5 +2,5 @@ FROM python:3.13-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
COPY ./src/. /app/
CMD ["python", "main.py"]

View file

@ -1,103 +0,0 @@
import paho.mqtt.client as mqtt
import psycopg2
import logging
import sys
import json
import os
import time
MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto")
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
PG_DSN = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
stream=sys.stdout,
force=True,
)
log = logging.getLogger(__name__)
def get_db():
for _ in range(10):
try:
conn = psycopg2.connect(PG_DSN)
conn.autocommit = True
log.info("PostgreSQL connecté")
return conn
except Exception as e:
log.warning(f"Attente PostgreSQL... ({e})")
time.sleep(3)
raise RuntimeError("Impossible de se connecter à PostgreSQL")
def get_device_id(conn, dev_eui: str) -> str:
"""
Insère le device si inconnu
Retourne le device_id correspondant au dev_eui
"""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO device (device_eui)
VALUES (%s)
ON CONFLICT (device_eui) DO NOTHING
""",
(dev_eui,),
)
cur.execute("SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,))
row = cur.fetchone()
return str(row[0])
def insert_reading(conn, device_id: str, pulses: int):
"""Insère une télérelève liée au device"""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading (device_id, date, pulses)
VALUES (%s, NOW(), %s)
""",
(device_id, pulses),
)
db_conn = None
def on_connect(client, userdata, flags, reason_code, properties):
log.info(f"MQTT connecté (code={reason_code})")
client.subscribe("application/+/device/+/event/up", qos=1)
def on_message(client, userdata, msg):
try:
body = json.loads(msg.payload)
dev_eui = body["deviceInfo"]["devEui"]
pulses = int(body["object"]["pulse_count"])
device_id = get_device_id(db_conn, dev_eui)
insert_reading(db_conn, device_id, pulses)
log.info(
f"[UP] dev_eui={dev_eui} | device_id={device_id} | pulses={pulses}| relevé inséré"
)
except Exception as e:
log.exception(f"Erreur traitement message : {e}")
# ── Main ─────────────────────────────────────────────────────────
if __name__ == "__main__":
db_conn = get_db()
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.reconnect_delay_set(min_delay=1, max_delay=30)
mqttc.connect(MQTT_HOST, MQTT_PORT, 60)
mqttc.loop_forever()

View file

View file

@ -0,0 +1,50 @@
import json
import logging
from typing import Callable
import paho.mqtt.client as mqtt
from paho.mqtt.client import ConnectFlags
from paho.mqtt.enums import CallbackAPIVersion
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCode
from entities import UplinkEvent
from ports import MessageBroker
log = logging.getLogger(__name__)
class PahoMqttBroker(MessageBroker):
def __init__(self, host: str, port: int, topic: str):
self._host = host
self._port = port
self._topic = topic
def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None:
_client = mqtt.Client(CallbackAPIVersion.VERSION2)
def _on_connect(
client: mqtt.Client,
userdata: None,
flags: ConnectFlags,
reason_code: ReasonCode,
props: Properties | None,
) -> None:
log.info("MQTT connecté (code=%s)", reason_code)
client.subscribe(self._topic, qos=1)
def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None:
try:
body = json.loads(msg.payload)
dev_eui = body["deviceInfo"]["devEui"]
pulse_count = int(body["object"]["pulse_count"])
on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count))
except Exception as e:
log.exception("Erreur parsing message MQTT : %s", e)
_client.on_connect = _on_connect
_client.on_message = _on_message
_client.reconnect_delay_set(min_delay=1, max_delay=30)
_client.connect(self._host, self._port, keepalive=60)
_client.loop_forever()

View file

@ -0,0 +1,56 @@
import logging
import time
import psycopg2
from psycopg2.extensions import connection
from ports import DeviceRepository, ReadingRepository
log = logging.getLogger(__name__)
def connect(uri: str) -> connection:
for _ in range(10):
try:
conn = psycopg2.connect(uri)
conn.autocommit = True
log.info("PostgreSQL connecté")
return conn
except Exception as e:
log.warning("Attente PostgreSQL... (%s)", e)
time.sleep(3)
raise RuntimeError("Impossible de se connecter à PostgreSQL")
class PgDeviceRepository(DeviceRepository):
def __init__(self, conn: connection):
self._conn = conn
def get_or_create_device_id(self, dev_eui: str) -> str:
with self._conn.cursor() as cur:
cur.execute(
"""
INSERT INTO device (device_eui)
VALUES (%s)
ON CONFLICT (device_eui) DO NOTHING
""",
(dev_eui,),
)
cur.execute(
"SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)
)
return str(cur.fetchone()[0])
class PgReadingRepository(ReadingRepository):
def __init__(self, conn: connection):
self._conn = conn
def insert_reading(self, device_id: str, pulse_count: int) -> None:
with self._conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading (device_id, date, pulses)
VALUES (%s, NOW(), %s)
""",
(device_id, pulse_count),
)

View file

@ -0,0 +1,3 @@
from .uplink_event import UplinkEvent
__all__ = ["UplinkEvent"]

View file

@ -0,0 +1,6 @@
from dataclasses import dataclass
@dataclass
class UplinkEvent:
dev_eui: str
pulse_count: int

31
app/consumer/src/main.py Normal file
View file

@ -0,0 +1,31 @@
import logging
import sys
import os
from adapters.postgres import connect, PgDeviceRepository, PgReadingRepository
from adapters.mqtt import PahoMqttBroker
from services.uplink_service import UplinkService
MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto")
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "application/+/device/+/event/up")
DB_URI = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
stream=sys.stdout,
force=True,
)
if __name__ == "__main__":
conn = connect(DB_URI)
broker = PahoMqttBroker(MQTT_HOST, MQTT_PORT, MQTT_TOPIC)
devices = PgDeviceRepository(conn)
readings = PgReadingRepository(conn)
uplink = UplinkService(devices, readings)
broker.start(on_uplink=uplink.handle)

View file

@ -0,0 +1,5 @@
from .device_repository import DeviceRepository
from .reading_repository import ReadingRepository
from .message_broker import MessageBroker
__all__ = ["DeviceRepository", "ReadingRepository", "MessageBroker"]

View file

@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class DeviceRepository(ABC):
@abstractmethod
def get_or_create_device_id(self, dev_eui: str) -> str:
"""Retourne le device_id, crée le device s'il est inconnu"""
...

View file

@ -0,0 +1,10 @@
from abc import ABC, abstractmethod
from typing import Callable
from entities import UplinkEvent
class MessageBroker(ABC):
@abstractmethod
def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None:
"""Démarre l'écoute et appelle on_uplink(UplinkEvent) à chaque message"""
...

View file

@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class ReadingRepository(ABC):
@abstractmethod
def insert_reading(self, device_id: str, pulse_count: int) -> None:
"""Persiste un relevé"""
...

View file

View file

@ -0,0 +1,18 @@
import logging
from ports import DeviceRepository, ReadingRepository
from entities import UplinkEvent
log = logging.getLogger(__name__)
class UplinkService:
def __init__(self, devices: DeviceRepository, readings: ReadingRepository):
self._devices = devices
self._readings = readings
def handle(self, event: UplinkEvent) -> None:
device_id = self._devices.get_or_create_device_id(event.dev_eui)
self._readings.insert_reading(device_id, event.pulse_count)
log.info(
"[UP] dev_eui=%s | device_id=%s | pulses=%d",
event.dev_eui, device_id, event.pulse_count
)