Spaces:
Running
Running
#!/usr/bin/env python | |
# coding: utf-8 | |
# # Bibliothek | |
#!pip install PyMySQL | |
import pprint | |
import json | |
import pymysql.cursors | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import JSONResponse, HTMLResponse | |
import time | |
from dbutils.pooled_db import PooledDB | |
import os | |
import logging | |
from fastapi.templating import Jinja2Templates | |
AUSLEIHE_TABLE = os.environ.get('AUSLEIHE_TABLE', 'ausleihe_test') | |
PAGE_TITLE = os.environ.get('PAGE_TITLE', 'Murmel Bibliothek TEST') | |
# Create a connection pool | |
db_config = { | |
'host': os.environ['MURMEL_DB_HOST'], | |
'user': os.environ['MURMEL_DB_USER'], | |
'password': os.environ['MURMEL_DB_PASSWORD'], | |
'database': 'murmel', | |
'cursorclass': pymysql.cursors.DictCursor | |
} | |
pool = PooledDB(pymysql, maxconnections=5, **db_config) | |
app = FastAPI() | |
LOG = logging.getLogger('uvicorn.error') | |
def execute_query(sql, params=None, max_retries=3, retry_delay=1): | |
LOG.debug("executing query " + sql + "with params" + str(params)) | |
for attempt in range(max_retries): | |
try: | |
connection = pool.connection() | |
with connection.cursor() as cursor: | |
cursor.execute(sql, params or ()) | |
result = cursor.fetchall() | |
connection.commit() | |
return result | |
except pymysql.OperationalError as e: | |
if attempt == max_retries - 1: | |
raise HTTPException(status_code=500, detail="Database connection error") | |
time.sleep(retry_delay) | |
finally: | |
if 'connection' in locals(): | |
connection.close() | |
templates = Jinja2Templates(directory=".") | |
def get_groups(): | |
sql = "SELECT Gruppe, idGruppe FROM `gruppe` WHERE aktuell is True ORDER BY idGruppe ASC;" | |
return execute_query(sql) | |
def get_students(idGruppe): | |
if idGruppe == 'all': | |
sql = """ | |
SELECT DISTINCT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` | |
FROM `kind` `ki` | |
JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` | |
JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` | |
WHERE `sch`.`aktuell` = 1 | |
ORDER BY 1 | |
""" | |
return execute_query(sql) | |
else: | |
sql = """ | |
SELECT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` | |
FROM `kind` `ki` | |
JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` | |
JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` | |
JOIN `gruppe` `gr` ON `gr`.`idGruppe` = `kgs`.`x_gruppe` | |
WHERE `sch`.`aktuell` = 1 AND (`gr`.`Gruppe` = %s OR `gr`.`idGruppe` = %s) | |
ORDER BY 1 | |
""" | |
return execute_query(sql, (idGruppe, idGruppe)) | |
def rueckgabe(idBuch, grund='rueckgabe'): | |
""" | |
Updates the database to mark a book as returned. | |
Parameters: | |
- idBuch (int): The ID of the book to be returned. | |
- grund (str): The reason for the return (default: 'rueckgabe'). | |
Returns: | |
- dict: Information about the return, including the students who had the book. | |
""" | |
sql = f""" | |
SELECT a.`idBuch`, a.`idKind`, a.`ausleihe`, a.`rueckgabe`, a.`rueckGrund`, | |
CONCAT(k.`Vorname`, ' ', k.`Nachnamen`) AS student_name | |
FROM `{AUSLEIHE_TABLE}` a | |
JOIN `kind` k ON a.`idKind` = k.`idKind` | |
WHERE a.`idBuch` = %s AND a.`rueckgabe` is NULL; | |
""" | |
result = execute_query(sql, (idBuch,)) | |
if len(result) == 0: | |
return {"success": False, "message": "Buch nicht gefunden oder bereits zurückgegeben"} | |
# return the book | |
sql = f"UPDATE `{AUSLEIHE_TABLE}` SET `rueckgabe` = NOW(), `rueckGrund` = %s WHERE `idBuch` = %s AND `rueckgabe` is NULL;" | |
execute_query(sql, (grund, idBuch)) | |
student_names = [row['student_name'] for row in result] | |
students_str = ", ".join(student_names) | |
return { | |
"success": True, | |
"message": f"Buch zurückgegeben von: {students_str}", | |
"student_names": student_names | |
} | |
def ausleihe(idBuch, idKind): | |
""" | |
Performs a book loan operation by inserting a new record into 'ausleihe_test' or the table defined by the AUSLEIHE_TABLE environment variable. | |
Parameters: | |
- idBuch (int): The ID of the book being loaned. | |
- idKind (int): The ID of the child borrowing the book. | |
Returns: | |
- dict: A dictionary containing the result of the borrowing operation. | |
""" | |
try: | |
rueckgabe_result = rueckgabe(idBuch, grund="neu-ausleihe") | |
message = "Buch erfolgreich ausgeliehen" | |
if rueckgabe_result.get("success", False): | |
# Get the name of the previous borrower | |
prev_borrower_id = rueckgabe_result["student_names"][0] | |
sql = "SELECT CONCAT(Vorname, ' ', Nachnamen) AS full_name FROM kind WHERE idKind = %s;" | |
prev_borrower_name = execute_query(sql, (prev_borrower_id,))[0]['full_name'] | |
message += f". Zuvor ausgeliehen von {prev_borrower_name}" | |
# Insert new borrowing record | |
sql = f"INSERT INTO `{AUSLEIHE_TABLE}` (`idBuch`, `idKind`, `ausleihe`) VALUES (%s, %s, NOW());" | |
execute_query(sql, (idBuch, idKind)) | |
return {"success": True, "message": message} | |
except Exception as e: | |
LOG.error(f"Error in ausleihe: {str(e)}") | |
return {"success": False, "message": f"Fehler beim Ausleihen des Buches: {str(e)}"} | |
def ausgeliehen(idKind): | |
""" | |
Retrieves the books that are currently borrowed by a specific child. | |
Args: | |
idKind (int): The ID of the child. | |
Returns: | |
list: A list of tuples containing the book ID and the borrowing date for each book that is currently borrowed by the child. | |
""" | |
sql = f"SELECT `idBuch`, `ausleihe` FROM `{AUSLEIHE_TABLE}` WHERE `idKind` = %s AND `rueckgabe` IS NULL;" | |
result = execute_query(sql, (idKind,)) | |
#pprint.pprint(result) | |
return result | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request, "page_title": PAGE_TITLE}) | |
# run the app | |
if __name__ == '__main__': | |
import uvicorn | |
uvicorn.run(app, host='localhost', port=5000) | |
# %% | |