# database_functions.py import psycopg2 import random import string from urllib.parse import urlparse from datetime import datetime import json from dotenv import load_dotenv import os # Load environment variables from .env file load_dotenv() # Get the database URL from the environment variables url = os.getenv("DATABASE_URL") if not url: raise ValueError("DATABASE_URL is not set in the environment variables") parsed_url = urlparse(url) # Extract connection parameters db_config = { 'user': parsed_url.username, 'password': parsed_url.password, 'host': parsed_url.hostname, 'port': parsed_url.port, 'database': parsed_url.path.lstrip('/') } # Since we define password in schema, we will just generate password def generate_password(): characters = string.ascii_letters + string.digits + string.punctuation password = ''.join(random.choice(characters) for _ in range(8)) return password # add student method (privacy with only class & indexNo) def add_user_privacy(class_name, index_no): connection = psycopg2.connect(**db_config) cursor = connection.cursor() password = generate_password() dbMsg = "" try: # Check if user with the same email already exists cursor.execute("SELECT id FROM oc_students WHERE class = %s and index_no = %s", (class_name,index_no)) existing_user = cursor.fetchone() if existing_user: user_id = existing_user[0] dbMsg = "User already exists" else: # If user doesn't exist, insert a new user cursor.execute("INSERT INTO oc_students (index_no, class, hashPassword) VALUES (%s, %s, %s) RETURNING id", (index_no, class_name, password)) user_id = cursor.fetchone()[0] # Fetch the ID of the newly inserted user connection.commit() # without this, data is not persist on db! dbMsg = "User Created" return user_id, dbMsg except psycopg2.Error as e: return "Error adding user:" + str(e) def add_submission(userid, transcribed_text, ai_responses, scores, feedback, questionNo): connection = psycopg2.connect(**db_config) cursor = connection.cursor() dbMsg = "" try: current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cursor.execute("INSERT INTO oc_submissions (userid, datetime, Transcribed_text, AI_conversation_responses, Scores, Feedback, questionNo) " "VALUES (%s, %s, %s, %s, %s, %s, %s)", (userid, current_datetime, transcribed_text, ai_responses, scores, feedback, questionNo)) connection.commit() dbMsg = "Submission added" except psycopg2.Error as e: print("Error adding submission:", e) finally: if connection: cursor.close() connection.close() print("PostgreSQL connection is closed") def get_submissions_by_date_and_class(from_date, to_date, class_name, display_ai_feedback): # Connect to the database conn = psycopg2.connect(**db_config) cursor = conn.cursor() try: print(f"From Date: {from_date}") print(f"To Date: {to_date}") print(f"Class Name: {class_name}") # Swap from_date and to_date if from_date is later than to_date if from_date > to_date: from_date, to_date = to_date, from_date query = """ SELECT s.index_no, s.class, sub.datetime, sub.questionNo, sub.transcribed_text, CASE WHEN %s THEN sub.ai_conversation_responses ELSE NULL END AS ai_conversation_responses, sub.userid FROM oc_students AS s JOIN oc_submissions AS sub ON s.id = sub.userid WHERE TO_DATE(sub.datetime::text, 'YYYY-MM-DD') BETWEEN TO_DATE(%s, 'YYYY-MM-DD') AND TO_DATE(%s, 'YYYY-MM-DD') AND s.class = %s ORDER BY sub.userid, sub.questionNo, sub.datetime DESC """ cursor.execute(query, (display_ai_feedback, from_date, to_date, class_name)) results = cursor.fetchall() if results: return generate_report_as_json(results, display_ai_feedback) else: return [{"Email": "No data found for the selected date range and class", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] except Exception as e: print(f"An error occurred: {e}") return [{"Email": "Error occurred while fetching data", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] finally: cursor.close() conn.close() def generate_report_as_json(results, display_ai_feedback): user_ids_info = [] # To store tuples of (UserID, Name, Class) user_question_map = {} # To map UserID to answered questions if results: for result in results: user_id = result[6] # Assuming UserID is at index 6 # Storing tuples of (UserID, Name, Class) user_info = (user_id, result[0], result[1]) # (UserID, Name, Class) if user_info not in user_ids_info: user_ids_info.append(user_info) # Creating a map of UserIDs to answered questions question = result[3] # Assuming Question number is at index 3 details = { "Datetime": result[2].strftime("%Y-%m-%d %H:%M:%S") if result[2] else "", "Question": question, "Student Response": result[4], "AI Feedback": result[5] if display_ai_feedback else "Not displayed" } if user_id in user_question_map: user_question_map[user_id].append(details) else: user_question_map[user_id] = [details] report_data = [] for user_info in user_ids_info: user_id, name, class_ = user_info user_dict = { "Index No": name, "Class": class_, "Questions": [] } question_numbers = [1, 2, 3] # List of required question numbers if user_id in user_question_map: user_questions = user_question_map[user_id] for question_details in user_questions: question_data = { "Question": question_details["Question"], "Datetime": question_details["Datetime"], "Student Response": question_details["Student Response"], "AI Feedback": question_details["AI Feedback"] } user_dict["Questions"].append(question_data) # Remove answered question number from the (fixed list) if question_data["Question"] in question_numbers: question_numbers.remove(question_data["Question"]) # Add NA entries for unanswered questions for missing_question in question_numbers: missing_question_data = { "Question": missing_question, "Datetime": "NA", "Student Response": "NA", "AI Feedback": "NA" if display_ai_feedback else "Not displayed" } user_dict["Questions"].append(missing_question_data) # Sort the user's questions by question number before appending to report user_dict["Questions"] = sorted(user_dict["Questions"], key=lambda x: x['Question']) report_data.append(user_dict) return json.dumps(report_data, indent=4) def getUniqueSubmitDate(): # Connect to the database conn = psycopg2.connect(**db_config) cursor = conn.cursor() try: # Fetch all submissions on the provided date cursor.execute(""" SELECT DISTINCT DATE(datetime) AS unique_date FROM public.oc_submissions ORDER BY unique_date desc LIMIT 14; """) dates = [str(row[0]) for row in cursor.fetchall()] return dates except Exception as e: print(f"An error occurred: {e}") return [{"Error": "Error occurred while fetching data"}] finally: cursor.close() conn.close() def getUniqueClass(): # Connect to the database conn = psycopg2.connect(**db_config) cursor = conn.cursor() try: # Fetch all submissions on the provided date cursor.execute(""" SELECT DISTINCT s.class FROM oc_students AS s JOIN oc_submissions AS sub ON s.id = sub.userid ORDER BY s.class """) listClass = [str(row[0]) for row in cursor.fetchall()] return listClass except Exception as e: print(f"An error occurred: {e}") return [{"Error": "Error occurred while fetching data"}] finally: cursor.close() conn.close()