feat: improve draft of consumer app to write in db
This commit is contained in:
parent
d210a2651c
commit
9dd53b1b24
3 changed files with 86 additions and 9 deletions
3
.vscode/settings.json
vendored
Normal file
3
.vscode/settings.json
vendored
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
{
|
||||
"editor.defaultFormatter": "charliermarsh.ruff"
|
||||
}
|
||||
|
|
@ -1,30 +1,103 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
import psycopg2
|
||||
import logging
|
||||
import sys
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto")
|
||||
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
|
||||
PG_DSN = os.getenv("DATABASE_URL", "postgresql://simugaz:simugaz@db/simugaz")
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
stream=sys.stdout, # Docker lit stdout par défaut
|
||||
force=True
|
||||
stream=sys.stdout,
|
||||
force=True,
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_db():
|
||||
for _ in range(10):
|
||||
try:
|
||||
conn = psycopg2.connect(PG_DSN)
|
||||
conn.autocommit = True
|
||||
log.info("PostgreSQL connecté")
|
||||
return conn
|
||||
except Exception as e:
|
||||
log.warning(f"Attente PostgreSQL... ({e})")
|
||||
time.sleep(3)
|
||||
raise RuntimeError("Impossible de se connecter à PostgreSQL")
|
||||
|
||||
|
||||
def get_device_id(conn, dev_eui: str) -> str:
|
||||
"""
|
||||
Insère le device si inconnu
|
||||
Retourne le device_id correspondant au dev_eui
|
||||
"""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO device (device_eui)
|
||||
VALUES (%s)
|
||||
ON CONFLICT (device_eui) DO NOTHING
|
||||
""",
|
||||
(dev_eui,),
|
||||
)
|
||||
|
||||
cur.execute("SELECT device_id FROM device WHERE device_eui = %s", (dev_eui,))
|
||||
row = cur.fetchone()
|
||||
return str(row[0])
|
||||
|
||||
|
||||
def insert_reading(conn, device_id: str, pulses: int):
|
||||
"""Insère une télérelève liée au device"""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO reading (device_id, date, pulses)
|
||||
VALUES (%s, NOW(), %s)
|
||||
""",
|
||||
(device_id, pulses),
|
||||
)
|
||||
|
||||
|
||||
db_conn = None
|
||||
|
||||
|
||||
def on_connect(client, userdata, flags, reason_code, properties):
|
||||
log.info(f"Connected with result code {reason_code}")
|
||||
client.subscribe("application/+/device/+/event/up")
|
||||
log.info(f"MQTT connecté (code={reason_code})")
|
||||
client.subscribe("application/+/device/+/event/up", qos=1)
|
||||
|
||||
|
||||
def on_message(client, userdata, msg):
|
||||
log.info(msg.topic+" "+str(msg.payload))
|
||||
try:
|
||||
body = json.loads(msg.payload)
|
||||
dev_eui = body["deviceInfo"]["devEui"]
|
||||
pulses = int(body["object"]["pulse_count"])
|
||||
|
||||
device_id = get_device_id(db_conn, dev_eui)
|
||||
insert_reading(db_conn, device_id, pulses)
|
||||
|
||||
log.info(
|
||||
f"[UP] dev_eui={dev_eui} | device_id={device_id} | pulses={pulses}| relevé inséré"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.exception(f"Erreur traitement message : {e}")
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────
|
||||
if __name__ == "__main__":
|
||||
db_conn = get_db()
|
||||
|
||||
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
mqttc.on_connect = on_connect
|
||||
mqttc.on_message = on_message
|
||||
mqttc.reconnect_delay_set(min_delay=1, max_delay=30)
|
||||
|
||||
mqttc.connect("mosquitto", 1883, 60)
|
||||
mqttc.connect(MQTT_HOST, MQTT_PORT, 60)
|
||||
mqttc.loop_forever()
|
||||
|
|
@ -1 +1,2 @@
|
|||
paho-mqtt==v2.1.0
|
||||
psycopg2-binary
|
||||
Loading…
Add table
Add a link
Reference in a new issue