32 lines
1.2 KiB
Python
32 lines
1.2 KiB
Python
from datetime import datetime
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, Query, HTTPException, Path
|
|
from pydantic import constr
|
|
|
|
from domain.value_objects import Granularity
|
|
from domain.exceptions import ValidationError, DatabaseError
|
|
from services.consumption_service import ConsumptionService
|
|
from dependencies import get_consumption_service
|
|
|
|
from ._readings_schemas import ConsumptionResponseSchema
|
|
|
|
readings_router = APIRouter(prefix="/readings", tags=["readings"])
|
|
|
|
DevEUI = Annotated[constr(max_length=64, pattern=r'^[0-9A-Fa-f]+$'), Path()]
|
|
|
|
@readings_router.get("/{dev_eui}", response_model=ConsumptionResponseSchema)
|
|
def get_consumption(
|
|
dev_eui: DevEUI,
|
|
start: Annotated[datetime, Query()],
|
|
end: Annotated[datetime, Query()],
|
|
granularity: Annotated[Granularity, Query()] = "day",
|
|
service: ConsumptionService = Depends(get_consumption_service),
|
|
):
|
|
try:
|
|
result = service.get_consumption(dev_eui, start, end, granularity)
|
|
return ConsumptionResponseSchema.from_domain(result)
|
|
except ValidationError as e:
|
|
raise HTTPException(status_code=422, detail=str(e))
|
|
except DatabaseError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|