murmel-biblio / app.py
jgrivolla's picture
improve messages and error handling
6bbca28
#!/usr/bin/env python
# coding: utf-8
# # Bibliothek
#!pip install PyMySQL
import pprint
import json
import pymysql.cursors
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, HTMLResponse
import time
from dbutils.pooled_db import PooledDB
import os
import logging
from fastapi.templating import Jinja2Templates
AUSLEIHE_TABLE = os.environ.get('AUSLEIHE_TABLE', 'ausleihe_test')
PAGE_TITLE = os.environ.get('PAGE_TITLE', 'Murmel Bibliothek TEST')
# Create a connection pool
db_config = {
'host': os.environ['MURMEL_DB_HOST'],
'user': os.environ['MURMEL_DB_USER'],
'password': os.environ['MURMEL_DB_PASSWORD'],
'database': 'murmel',
'cursorclass': pymysql.cursors.DictCursor
}
pool = PooledDB(pymysql, maxconnections=5, **db_config)
app = FastAPI()
LOG = logging.getLogger('uvicorn.error')
def execute_query(sql, params=None, max_retries=3, retry_delay=1):
LOG.debug("executing query " + sql + "with params" + str(params))
for attempt in range(max_retries):
try:
connection = pool.connection()
with connection.cursor() as cursor:
cursor.execute(sql, params or ())
result = cursor.fetchall()
connection.commit()
return result
except pymysql.OperationalError as e:
if attempt == max_retries - 1:
raise HTTPException(status_code=500, detail="Database connection error")
time.sleep(retry_delay)
finally:
if 'connection' in locals():
connection.close()
templates = Jinja2Templates(directory=".")
@app.get("/groups")
def get_groups():
sql = "SELECT Gruppe, idGruppe FROM `gruppe` WHERE aktuell is True ORDER BY idGruppe ASC;"
return execute_query(sql)
@app.get("/students/{idGruppe}")
def get_students(idGruppe):
if idGruppe == 'all':
sql = """
SELECT DISTINCT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind`
FROM `kind` `ki`
JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind`
JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr`
WHERE `sch`.`aktuell` = 1
ORDER BY 1
"""
return execute_query(sql)
else:
sql = """
SELECT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind`
FROM `kind` `ki`
JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind`
JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr`
JOIN `gruppe` `gr` ON `gr`.`idGruppe` = `kgs`.`x_gruppe`
WHERE `sch`.`aktuell` = 1 AND (`gr`.`Gruppe` = %s OR `gr`.`idGruppe` = %s)
ORDER BY 1
"""
return execute_query(sql, (idGruppe, idGruppe))
@app.get("/return/{idBuch}")
def rueckgabe(idBuch, grund='rueckgabe'):
"""
Updates the database to mark a book as returned.
Parameters:
- idBuch (int): The ID of the book to be returned.
- grund (str): The reason for the return (default: 'rueckgabe').
Returns:
- dict: Information about the return, including the students who had the book.
"""
sql = f"""
SELECT a.`idBuch`, a.`idKind`, a.`ausleihe`, a.`rueckgabe`, a.`rueckGrund`,
CONCAT(k.`Vorname`, ' ', k.`Nachnamen`) AS student_name
FROM `{AUSLEIHE_TABLE}` a
JOIN `kind` k ON a.`idKind` = k.`idKind`
WHERE a.`idBuch` = %s AND a.`rueckgabe` is NULL;
"""
result = execute_query(sql, (idBuch,))
if len(result) == 0:
return {"success": False, "message": "Buch nicht gefunden oder bereits zurückgegeben"}
# return the book
sql = f"UPDATE `{AUSLEIHE_TABLE}` SET `rueckgabe` = NOW(), `rueckGrund` = %s WHERE `idBuch` = %s AND `rueckgabe` is NULL;"
execute_query(sql, (grund, idBuch))
student_names = [row['student_name'] for row in result]
students_str = ", ".join(student_names)
return {
"success": True,
"message": f"Buch zurückgegeben von: {students_str}",
"student_names": student_names
}
@app.get("/borrow/{idBuch}/{idKind}")
def ausleihe(idBuch, idKind):
"""
Performs a book loan operation by inserting a new record into 'ausleihe_test' or the table defined by the AUSLEIHE_TABLE environment variable.
Parameters:
- idBuch (int): The ID of the book being loaned.
- idKind (int): The ID of the child borrowing the book.
Returns:
- dict: A dictionary containing the result of the borrowing operation.
"""
try:
rueckgabe_result = rueckgabe(idBuch, grund="neu-ausleihe")
message = "Buch erfolgreich ausgeliehen"
if rueckgabe_result.get("success", False):
# Get the name of the previous borrower
prev_borrower_id = rueckgabe_result["student_names"][0]
sql = "SELECT CONCAT(Vorname, ' ', Nachnamen) AS full_name FROM kind WHERE idKind = %s;"
prev_borrower_name = execute_query(sql, (prev_borrower_id,))[0]['full_name']
message += f". Zuvor ausgeliehen von {prev_borrower_name}"
# Insert new borrowing record
sql = f"INSERT INTO `{AUSLEIHE_TABLE}` (`idBuch`, `idKind`, `ausleihe`) VALUES (%s, %s, NOW());"
execute_query(sql, (idBuch, idKind))
return {"success": True, "message": message}
except Exception as e:
LOG.error(f"Error in ausleihe: {str(e)}")
return {"success": False, "message": f"Fehler beim Ausleihen des Buches: {str(e)}"}
@app.get("/borrowed/{idKind}")
def ausgeliehen(idKind):
"""
Retrieves the books that are currently borrowed by a specific child.
Args:
idKind (int): The ID of the child.
Returns:
list: A list of tuples containing the book ID and the borrowing date for each book that is currently borrowed by the child.
"""
sql = f"SELECT `idBuch`, `ausleihe` FROM `{AUSLEIHE_TABLE}` WHERE `idKind` = %s AND `rueckgabe` IS NULL;"
result = execute_query(sql, (idKind,))
#pprint.pprint(result)
return result
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "page_title": PAGE_TITLE})
# run the app
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='localhost', port=5000)
# %%