diff --git a/app/consumer/Dockerfile b/app/consumer/Dockerfile index b8c8907..757c3ee 100644 --- a/app/consumer/Dockerfile +++ b/app/consumer/Dockerfile @@ -2,5 +2,5 @@ FROM python:3.13-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY main.py . +COPY ./src/. /app/ CMD ["python", "main.py"] \ No newline at end of file diff --git a/app/consumer/main.py b/app/consumer/main.py deleted file mode 100644 index ddc7199..0000000 --- a/app/consumer/main.py +++ /dev/null @@ -1,103 +0,0 @@ -import paho.mqtt.client as mqtt -import psycopg2 -import logging -import sys -import json -import os -import time - -MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto") -MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) -PG_DSN = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz") - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%Y-%m-%dT%H:%M:%S", - stream=sys.stdout, - force=True, -) -log = logging.getLogger(__name__) - - -def get_db(): - for _ in range(10): - try: - conn = psycopg2.connect(PG_DSN) - conn.autocommit = True - log.info("PostgreSQL connecté") - return conn - except Exception as e: - log.warning(f"Attente PostgreSQL... ({e})") - time.sleep(3) - raise RuntimeError("Impossible de se connecter à PostgreSQL") - - -def get_device_id(conn, dev_eui: str) -> str: - """ - Insère le device si inconnu - Retourne le device_id correspondant au dev_eui - """ - with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO device (device_eui) - VALUES (%s) - ON CONFLICT (device_eui) DO NOTHING - """, - (dev_eui,), - ) - - cur.execute("SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)) - row = cur.fetchone() - return str(row[0]) - - -def insert_reading(conn, device_id: str, pulses: int): - """Insère une télérelève liée au device""" - with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO reading (device_id, date, pulses) - VALUES (%s, NOW(), %s) - """, - (device_id, pulses), - ) - - -db_conn = None - - -def on_connect(client, userdata, flags, reason_code, properties): - log.info(f"MQTT connecté (code={reason_code})") - client.subscribe("application/+/device/+/event/up", qos=1) - - -def on_message(client, userdata, msg): - try: - body = json.loads(msg.payload) - dev_eui = body["deviceInfo"]["devEui"] - pulses = int(body["object"]["pulse_count"]) - - device_id = get_device_id(db_conn, dev_eui) - insert_reading(db_conn, device_id, pulses) - - log.info( - f"[UP] dev_eui={dev_eui} | device_id={device_id} | pulses={pulses}| relevé inséré" - ) - - except Exception as e: - log.exception(f"Erreur traitement message : {e}") - - -# ── Main ───────────────────────────────────────────────────────── -if __name__ == "__main__": - db_conn = get_db() - - mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) - mqttc.on_connect = on_connect - mqttc.on_message = on_message - mqttc.reconnect_delay_set(min_delay=1, max_delay=30) - - mqttc.connect(MQTT_HOST, MQTT_PORT, 60) - mqttc.loop_forever() diff --git a/app/consumer/src/adapters/__init__.py b/app/consumer/src/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/consumer/src/adapters/mqtt.py b/app/consumer/src/adapters/mqtt.py new file mode 100644 index 0000000..f7a19f5 --- /dev/null +++ b/app/consumer/src/adapters/mqtt.py @@ -0,0 +1,50 @@ +import json +import logging +from typing import Callable + +import paho.mqtt.client as mqtt +from paho.mqtt.client import ConnectFlags +from paho.mqtt.enums import CallbackAPIVersion +from paho.mqtt.properties import Properties +from paho.mqtt.reasoncodes import ReasonCode + +from entities import UplinkEvent +from ports import MessageBroker + +log = logging.getLogger(__name__) + + +class PahoMqttBroker(MessageBroker): + def __init__(self, host: str, port: int, topic: str): + self._host = host + self._port = port + self._topic = topic + + def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None: + _client = mqtt.Client(CallbackAPIVersion.VERSION2) + + def _on_connect( + client: mqtt.Client, + userdata: None, + flags: ConnectFlags, + reason_code: ReasonCode, + props: Properties | None, + ) -> None: + log.info("MQTT connecté (code=%s)", reason_code) + client.subscribe(self._topic, qos=1) + + def _on_message(client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage) -> None: + try: + body = json.loads(msg.payload) + dev_eui = body["deviceInfo"]["devEui"] + pulse_count = int(body["object"]["pulse_count"]) + + on_uplink(UplinkEvent(dev_eui=dev_eui, pulse_count=pulse_count)) + except Exception as e: + log.exception("Erreur parsing message MQTT : %s", e) + + _client.on_connect = _on_connect + _client.on_message = _on_message + _client.reconnect_delay_set(min_delay=1, max_delay=30) + _client.connect(self._host, self._port, keepalive=60) + _client.loop_forever() diff --git a/app/consumer/src/adapters/postgres.py b/app/consumer/src/adapters/postgres.py new file mode 100644 index 0000000..53aa3ae --- /dev/null +++ b/app/consumer/src/adapters/postgres.py @@ -0,0 +1,56 @@ +import logging +import time + +import psycopg2 +from psycopg2.extensions import connection +from ports import DeviceRepository, ReadingRepository + +log = logging.getLogger(__name__) + + +def connect(uri: str) -> connection: + for _ in range(10): + try: + conn = psycopg2.connect(uri) + conn.autocommit = True + log.info("PostgreSQL connecté") + return conn + except Exception as e: + log.warning("Attente PostgreSQL... (%s)", e) + time.sleep(3) + raise RuntimeError("Impossible de se connecter à PostgreSQL") + + +class PgDeviceRepository(DeviceRepository): + def __init__(self, conn: connection): + self._conn = conn + + def get_or_create_device_id(self, dev_eui: str) -> str: + with self._conn.cursor() as cur: + cur.execute( + """ + INSERT INTO device (device_eui) + VALUES (%s) + ON CONFLICT (device_eui) DO NOTHING + """, + (dev_eui,), + ) + cur.execute( + "SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,) + ) + return str(cur.fetchone()[0]) + + +class PgReadingRepository(ReadingRepository): + def __init__(self, conn: connection): + self._conn = conn + + def insert_reading(self, device_id: str, pulse_count: int) -> None: + with self._conn.cursor() as cur: + cur.execute( + """ + INSERT INTO reading (device_id, date, pulses) + VALUES (%s, NOW(), %s) + """, + (device_id, pulse_count), + ) diff --git a/app/consumer/src/entities/__init__.py b/app/consumer/src/entities/__init__.py new file mode 100644 index 0000000..9b2115f --- /dev/null +++ b/app/consumer/src/entities/__init__.py @@ -0,0 +1,3 @@ +from .uplink_event import UplinkEvent + +__all__ = ["UplinkEvent"] \ No newline at end of file diff --git a/app/consumer/src/entities/uplink_event.py b/app/consumer/src/entities/uplink_event.py new file mode 100644 index 0000000..793d481 --- /dev/null +++ b/app/consumer/src/entities/uplink_event.py @@ -0,0 +1,6 @@ +from dataclasses import dataclass + +@dataclass +class UplinkEvent: + dev_eui: str + pulse_count: int \ No newline at end of file diff --git a/app/consumer/src/main.py b/app/consumer/src/main.py new file mode 100644 index 0000000..47d95c3 --- /dev/null +++ b/app/consumer/src/main.py @@ -0,0 +1,31 @@ +import logging +import sys +import os + +from adapters.postgres import connect, PgDeviceRepository, PgReadingRepository +from adapters.mqtt import PahoMqttBroker +from services.uplink_service import UplinkService + +MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto") +MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) +MQTT_TOPIC = os.getenv("MQTT_TOPIC", "application/+/device/+/event/up") +DB_URI = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + stream=sys.stdout, + force=True, +) + +if __name__ == "__main__": + conn = connect(DB_URI) + broker = PahoMqttBroker(MQTT_HOST, MQTT_PORT, MQTT_TOPIC) + + devices = PgDeviceRepository(conn) + readings = PgReadingRepository(conn) + + uplink = UplinkService(devices, readings) + + broker.start(on_uplink=uplink.handle) diff --git a/app/consumer/src/ports/__init__.py b/app/consumer/src/ports/__init__.py new file mode 100644 index 0000000..641a619 --- /dev/null +++ b/app/consumer/src/ports/__init__.py @@ -0,0 +1,5 @@ +from .device_repository import DeviceRepository +from .reading_repository import ReadingRepository +from .message_broker import MessageBroker + +__all__ = ["DeviceRepository", "ReadingRepository", "MessageBroker"] diff --git a/app/consumer/src/ports/device_repository.py b/app/consumer/src/ports/device_repository.py new file mode 100644 index 0000000..e5c2d34 --- /dev/null +++ b/app/consumer/src/ports/device_repository.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + +class DeviceRepository(ABC): + @abstractmethod + def get_or_create_device_id(self, dev_eui: str) -> str: + """Retourne le device_id, crée le device s'il est inconnu""" + ... \ No newline at end of file diff --git a/app/consumer/src/ports/message_broker.py b/app/consumer/src/ports/message_broker.py new file mode 100644 index 0000000..af42035 --- /dev/null +++ b/app/consumer/src/ports/message_broker.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod +from typing import Callable + +from entities import UplinkEvent + +class MessageBroker(ABC): + @abstractmethod + def start(self, on_uplink: Callable[[UplinkEvent], None]) -> None: + """Démarre l'écoute et appelle on_uplink(UplinkEvent) à chaque message""" + ... diff --git a/app/consumer/src/ports/reading_repository.py b/app/consumer/src/ports/reading_repository.py new file mode 100644 index 0000000..0b03cde --- /dev/null +++ b/app/consumer/src/ports/reading_repository.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + +class ReadingRepository(ABC): + @abstractmethod + def insert_reading(self, device_id: str, pulse_count: int) -> None: + """Persiste un relevé""" + ... \ No newline at end of file diff --git a/app/consumer/src/services/__init__.py b/app/consumer/src/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/consumer/src/services/uplink_service.py b/app/consumer/src/services/uplink_service.py new file mode 100644 index 0000000..40ace3a --- /dev/null +++ b/app/consumer/src/services/uplink_service.py @@ -0,0 +1,18 @@ +import logging +from ports import DeviceRepository, ReadingRepository +from entities import UplinkEvent + +log = logging.getLogger(__name__) + +class UplinkService: + def __init__(self, devices: DeviceRepository, readings: ReadingRepository): + self._devices = devices + self._readings = readings + + def handle(self, event: UplinkEvent) -> None: + device_id = self._devices.get_or_create_device_id(event.dev_eui) + self._readings.insert_reading(device_id, event.pulse_count) + log.info( + "[UP] dev_eui=%s | device_id=%s | pulses=%d", + event.dev_eui, device_id, event.pulse_count + )