refacto: move ReadingRepository where it logically belongs

This commit is contained in:
Alexis Fourmaux 2026-05-10 15:06:36 +02:00
parent aa72971627
commit 017092040d
4 changed files with 26 additions and 21 deletions

View file

@ -0,0 +1,91 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta
import psycopg2
from domain.entities import ConsumptionPoint
from domain.exceptions import DatabaseError
from domain.value_objects import Granularity
from ports.reading_query_repository import ReadingQueryRepository
from ports.reading_repository import ReadingRepository
from infrastructure.db import get_conn
_GRANULARITY_DELTA = {
"hour": relativedelta(hours=1),
"day": relativedelta(days=1),
"week": relativedelta(weeks=1),
"month": relativedelta(months=1),
}
class PgReadingQueryRepository(ReadingQueryRepository):
def get_consumption(
self,
dev_eui: str,
start: datetime,
end: datetime,
granularity: Granularity,
) -> list[ConsumptionPoint]:
if start == end:
end = start + _GRANULARITY_DELTA[granularity]
adjusted_start = start - _GRANULARITY_DELTA[granularity]
date_trunc = granularity
query = """
WITH periods AS (
SELECT
DATE_TRUNC(%s, r.date) AS period,
MAX(r.pulses) AS pulse_end
FROM reading r
JOIN device d ON d.device_id = r.device_id
WHERE d.device_eui = %s
AND r.date >= %s
AND r.date < %s
GROUP BY period
)
SELECT
period,
LAG(pulse_end) OVER (ORDER BY period) AS pulse_start,
pulse_end,
pulse_end - LAG(pulse_end) OVER (ORDER BY period) AS delta_pulses
FROM periods
ORDER BY period ASC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (date_trunc, dev_eui, adjusted_start, end))
rows = cur.fetchall()
except psycopg2.DatabaseError as e:
raise DatabaseError(f"Erreur requête consumption : {e}") from e
return [
ConsumptionPoint(
period=row[0],
pulse_count_start=row[1],
pulse_count_end=row[2],
delta_pulses=row[3],
delta_m3=round(row[3] * 0.010, 3),
)
for row in rows
if row[1] is not None
]
class PgReadingRepository(ReadingRepository):
def insert_reading(self, device_id: str, pulse_count: int) -> None:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading (device_id, date, pulses)
VALUES (%s, NOW(), %s)
""",
(device_id, pulse_count),
)
except psycopg2.DatabaseError as e:
raise DatabaseError(
f"Erreur d'enregistrement de la télérelève sur le device {device_id}"
) from e