from datetime import datetime import psycopg2 from psycopg2.extensions import connection from domain.entities import ConsumptionPoint from domain.exceptions import DatabaseError from domain.value_objects import Granularity from ports.reading_query_repository import ReadingQueryRepository class PgReadingQueryRepository(ReadingQueryRepository): def __init__(self, conn: connection) -> None: self._conn = conn def get_consumption( self, dev_eui: str, start: datetime, end: datetime, granularity: Granularity, ) -> list[ConsumptionPoint]: date_trunc = granularity query = """ SELECT DATE_TRUNC(%s, r.date) AS period, MIN(r.pulses) AS pulse_start, MAX(r.pulses) AS pulse_end, MAX(r.pulses) - MIN(r.pulses) AS delta_pulses FROM reading r JOIN device d ON d.device_id = r.device_id WHERE d.device_eui = %s AND r.date >= %s AND r.date < %s GROUP BY period ORDER BY period ASC """ try: with self._conn.cursor() as cur: cur.execute(query, (date_trunc, dev_eui, start, end)) rows = cur.fetchall() except psycopg2.DatabaseError as e: raise DatabaseError(f"Erreur requĂȘte consumption : {e}") from e return [ ConsumptionPoint( period=row[0], pulse_count_start=row[1], pulse_count_end=row[2], delta_pulses=row[3], delta_m3=round(row[3] * 0.010, 3), ) for row in rows ]