agreg-server/server/app/adapters/postgres.py
Alexis Fourmaux aa72971627 feat: use connection pool instead of a single shared connection
This is useful for multitask environments (like the API), to avoid
conflicts
2026-05-11 21:46:21 +02:00

48 lines
1.7 KiB
Python

import logging
import psycopg2
from ports import DeviceRepository, ReadingRepository
from domain.exceptions import DatabaseError
from infrastructure.db import get_conn
log = logging.getLogger(__name__)
class PgDeviceRepository(DeviceRepository):
def get_or_create_device_id(self, dev_eui: str) -> str:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO device (device_eui)
VALUES (%s)
ON CONFLICT (device_eui) DO NOTHING
""",
(dev_eui,),
)
cur.execute(
"SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,)
)
return str(cur.fetchone()[0]) # type: ignore
except psycopg2.DatabaseError as e:
raise DatabaseError(f"Erreur de création du device {dev_eui}") from e
class PgReadingRepository(ReadingRepository):
def insert_reading(self, device_id: str, pulse_count: int) -> None:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading (device_id, date, pulses)
VALUES (%s, NOW(), %s)
""",
(device_id, pulse_count),
)
except psycopg2.DatabaseError as e:
raise DatabaseError(
f"Erreur d'enregistrement de la télérelève sur le device {device_id}"
) from e