agreg-server/server/backend/adapters/postgres/reading_repository.py

91 lines
3 KiB
Python
Raw Normal View History

2026-05-10 12:36:38 +02:00
from datetime import datetime
from dateutil.relativedelta import relativedelta
2026-05-10 12:36:38 +02:00
import psycopg2
from domain.entities import ConsumptionPoint
from domain.exceptions import DatabaseError
from domain.value_objects import Granularity
from ports.reading_query_repository import ReadingQueryRepository
from ports.reading_repository import ReadingRepository
from infrastructure.db import get_conn
2026-05-10 12:36:38 +02:00
_GRANULARITY_DELTA = {
"hour": relativedelta(hours=1),
"day": relativedelta(days=1),
"week": relativedelta(weeks=1),
"month": relativedelta(months=1),
}
2026-05-13 00:01:41 +02:00
PULSE_CONVERSION_FACTOR = 0.010
2026-05-10 12:36:38 +02:00
class PgReadingQueryRepository(ReadingQueryRepository):
def get_consumption(
self,
dev_eui: str,
start: datetime,
end: datetime,
granularity: Granularity,
) -> list[ConsumptionPoint]:
if start == end:
end = start + _GRANULARITY_DELTA[granularity]
adjusted_start = start - _GRANULARITY_DELTA[granularity]
2026-05-10 12:36:38 +02:00
date_trunc = granularity
query = """
WITH periods AS (
SELECT
DATE_TRUNC(%s, r.date) AS period,
MAX(r.pulses) AS pulse_end
FROM reading r
JOIN device d ON d.device_id = r.device_id
WHERE d.device_eui = %s
AND r.date >= %s
AND r.date < %s
GROUP BY period
)
2026-05-10 12:36:38 +02:00
SELECT
period,
LAG(pulse_end) OVER (ORDER BY period) AS pulse_start,
pulse_end,
pulse_end - LAG(pulse_end) OVER (ORDER BY period) AS delta_pulses
FROM periods
2026-05-10 12:36:38 +02:00
ORDER BY period ASC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (date_trunc, dev_eui, adjusted_start, end))
rows = cur.fetchall()
2026-05-10 12:36:38 +02:00
except psycopg2.DatabaseError as e:
raise DatabaseError(f"Erreur requête consumption : {e}") from e
return [
ConsumptionPoint(
period=row[0],
pulse_count_start=row[1],
pulse_count_end=row[2],
delta_pulses=row[3],
2026-05-13 00:01:41 +02:00
delta_m3=round(row[3] * PULSE_CONVERSION_FACTOR, 3),
2026-05-10 12:36:38 +02:00
)
for row in rows
if row[1] is not None
2026-05-10 12:36:38 +02:00
]
class PgReadingRepository(ReadingRepository):
def insert_reading(self, device_id: str, pulse_count: int) -> None:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading (device_id, date, pulses)
VALUES (%s, NOW(), %s)
""",
(device_id, pulse_count),
)
except psycopg2.DatabaseError as e:
raise DatabaseError(
f"Erreur d'enregistrement de la télérelève sur le device {device_id}"
2026-05-13 00:01:41 +02:00
) from e