from sqlalchemy import select, delete, update from sqlalchemy.exc import SQLAlchemyError from utils.error_handlers import handle_error, not_found_error, no_entries_found, handle_exception from fastapi.responses import JSONResponse from typing import List, Type, Optional class BaseQuery: def __init__(self, user): self.user = user self.user_id = user.get("id") def _fetch(self, db, query, not_found_message, multiple: bool = False): """Fetch a single or multiple results based on the 'multiple' flag.""" try: if multiple: results = db.execute(query).all() if not results: return [] return results else: result = db.execute(query).scalar_one_or_none() if not result: return [] return result except Exception as e: return handle_error( e, "Failed to fetch entry" if not multiple else "Failed to fetch entries", ) def _handle_commit(self, db): try: db.commit() except SQLAlchemyError as e: db.rollback() return handle_exception(e) except Exception as e: db.rollback() return handle_error(e, "Operation failed") def add(self, db, instance): """Add a new entry.""" db.add(instance) return self._handle_commit(db) def insert_entries(self, db, entries): """Insert multiple entries.""" db.add_all(entries) return self._handle_commit(db) def delete(self, db, model, id=None, filter_conditions=None): """Delete an entry by ID with optional filter conditions.""" # Build the query to select the entry query = select(model) if id: query = query.where(model.id == id) if filter_conditions: query = query.where(*filter_conditions) # Fetch the entry entry = self._fetch(db, query, f"Entry with ID {id} not found.", multiple=False) if isinstance(entry, JSONResponse): return entry # Build the delete query delete_query = delete(model).where(model.id == id) if filter_conditions: delete_query = delete_query.where(*filter_conditions) # Execute the delete query and commit db.execute(delete_query) return self._handle_commit(db) def delete_all(self, db, model, filter_conditions=None): """Delete all entries or based on filters.""" query = delete(model) if filter_conditions: query = query.where(*filter_conditions) db.execute(query) return self._handle_commit(db) def update(self, db, model, id, update_data, filter_conditions=None): """Update an entry by ID.""" # Define the initial query to fetch the entry query = select(model).where(model.id == id) # Append additional filter conditions if provided if filter_conditions: query = query.where(model.id == id, *filter_conditions) # Attempt to fetch the entry not_found_message = f"Entry with ID {id} not found." entry = self._fetch(db, query, not_found_message, multiple=False) # Check if the entry was found if isinstance(entry, JSONResponse): return entry # Prepare the update statement stmt = update(model).where(model.id == id).values(update_data) db.execute(stmt) # If filter conditions were provided, apply them to the update as well if filter_conditions: filter_stmt = update(model).where(model.id == id, *filter_conditions).values(update_data) db.execute(filter_stmt) return self._handle_commit(db) def update_entries(self, db, model, update_data, filter_conditions=None): """Update multiple entries with optional filtering.""" query = select(model) if filter_conditions: query = query.where(*filter_conditions) not_found_message = "No entries found matching the filter conditions." results = self._fetch( db, query, not_found_message , multiple=True ) if isinstance(results, JSONResponse): return results db.execute(update(model).where(*filter_conditions).values(update_data)) return self._handle_commit(db) def get( self, db, model: Type = None, id: Optional[int] = None, filter_conditions=None, columns: Optional[List[str]] = None, multiple: bool = False, ): """Get one or multiple entries, filtered by ID or conditions.""" if columns: query = select(*columns) else: query = select(model) if id: query = query.where(model.id == id) if filter_conditions: query = query.where(*filter_conditions) return self._fetch( db, query, "Entry not found." if not multiple else "No entries found.", multiple=multiple, ) def get_with_joins( self, db, join_models: List[Type], join_conditions: List=None, model: Type=None, filter_conditions=None, columns: Optional[List[str]] = None, multiple: bool = False, ): """Get one or multiple entries with joins and optional filters.""" if columns: query = select(*columns) else: query = select(model, *join_models).select_from(model) # Apply joins if join_conditions: for join_model, join_condition in zip(join_models, join_conditions): query = query.join(join_model, join_condition) else: query = query.join(*join_models) # Apply filtering by user ID and optional conditions if filter_conditions: query = query.where(*filter_conditions) return self._fetch( db, query, "Entry not found." if not multiple else "No entries found.", multiple=multiple, ) def get_columns( self, db, columns: List[str], model=None, filter_conditions=None, id: Optional[int] = None, multiple: bool = False, ): """Get specific columns by ID or filtering.""" query = select(*columns).select_from(model) if id: query = query.where(model.id == id) if filter_conditions: query = query.where(*filter_conditions) return self._fetch( db, query, "Entry not found." if not multiple else "No entries found.", multiple=multiple, )