import gradio as gr import pixeltable as pxt from pixeltable.iterators import DocumentSplitter, FrameIterator, StringSplitter from pixeltable.functions.huggingface import sentence_transformer, clip_image, clip_text from pixeltable.functions.video import extract_audio from pixeltable.functions.audio import get_metadata from pixeltable.functions import openai import numpy as np import PIL.Image import os import getpass import requests import tempfile from datetime import datetime # Configuration PIXELTABLE_MEDIA_DIR = os.path.expanduser("~/.pixeltable/media") MAX_TOKENS_DEFAULT = 300 TEMPERATURE_DEFAULT = 0.7 CHUNK_SIZE_DEFAULT = 300 # Initialize API keys def init_api_keys(): if 'OPENAI_API_KEY' not in os.environ: os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API key:') # Embedding Functions @pxt.expr_udf def e5_embed(text: str) -> np.ndarray: return sentence_transformer(text, model_id='intfloat/e5-large-v2') @pxt.expr_udf def embed_image(img: PIL.Image.Image): return clip_image(img, model_id='openai/clip-vit-base-patch32') @pxt.expr_udf def str_embed(s: str): return clip_text(s, model_id='openai/clip-vit-base-patch32') # Common Utilities def initialize_pixeltable(dir_name='unified_app'): """Initialize Pixeltable directory""" pxt.drop_dir(dir_name, force=True) pxt.create_dir(dir_name) @pxt.udf def create_prompt(top_k_list: list[dict], question: str) -> str: """Create a standardized prompt format""" concat_top_k = '\n\n'.join(elt['text'] for elt in reversed(top_k_list)) return f''' PASSAGES: {concat_top_k} QUESTION: {question}''' @pxt.udf(return_type=pxt.AudioType()) def generate_audio(script: str, voice: str, api_key: str): """Generate audio from text using OpenAI's API""" if not script or not voice: return None try: response = requests.post( "https://api.openai.com/v1/audio/speech", headers={"Authorization": f"Bearer {api_key}"}, json={"model": "tts-1", "input": script, "voice": voice} ) if response.status_code == 200: temp_dir = os.path.join(os.getcwd(), "temp") os.makedirs(temp_dir, exist_ok=True) temp_file = os.path.join(temp_dir, f"audio_{os.urandom(8).hex()}.mp3") with open(temp_file, 'wb') as f: f.write(response.content) return temp_file except Exception as e: print(f"Error in audio synthesis: {e}") return None # Document Processing class DocumentProcessor: @staticmethod def process_documents(pdf_files, chunk_limit, chunk_separator): """Process uploaded documents for chatbot functionality""" initialize_pixeltable() docs = pxt.create_table( 'unified_app.documents', {'document': pxt.DocumentType(nullable=True)} ) docs.insert({'document': file.name} for file in pdf_files if file.name.endswith('.pdf')) chunks = pxt.create_view( 'unified_app.chunks', docs, iterator=DocumentSplitter.create( document=docs.document, separators=chunk_separator, limit=chunk_limit if chunk_separator in ["token_limit", "char_limit"] else None ) ) chunks.add_embedding_index('text', string_embed=e5_embed) return "Documents processed successfully. You can start asking questions." @staticmethod def get_document_answer(question): """Get answer from processed documents""" try: chunks = pxt.get_table('unified_app.chunks') sim = chunks.text.similarity(question) relevant_chunks = chunks.order_by(sim, asc=False).limit(5).select(chunks.text).collect() context = "\n\n".join(chunk['text'] for chunk in relevant_chunks) temp_table = pxt.create_table( 'unified_app.temp_response', { 'question': pxt.StringType(), 'context': pxt.StringType() } ) temp_table.insert([{'question': question, 'context': context}]) temp_table['response'] = openai.chat_completions( messages=[ { 'role': 'system', 'content': 'Answer the question based only on the provided context. If the context doesn\'t contain enough information, say so.' }, { 'role': 'user', 'content': f"Context:\n{context}\n\nQuestion: {question}" } ], model='gpt-4o-mini-2024-07-18' ) answer = temp_table.select( answer=temp_table.response.choices[0].message.content ).tail(1)['answer'][0] pxt.drop_table('unified_app.temp_response', force=True) return answer except Exception as e: return f"Error: {str(e)}" # Call Analysis class CallAnalyzer: @staticmethod def process_call(video_file): """Process and analyze call recordings""" try: calls = pxt.create_table( 'unified_app.calls', {"video": pxt.VideoType(nullable=True)} ) calls['audio'] = extract_audio(calls.video, format='mp3') calls['transcription'] = openai.transcriptions(audio=calls.audio, model='whisper-1') calls['text'] = calls.transcription.text sentences = pxt.create_view( 'unified_app.sentences', calls, iterator=StringSplitter.create(text=calls.text, separators='sentence') ) sentences.add_embedding_index('text', string_embed=e5_embed) @pxt.udf def generate_insights(text: str) -> list[dict]: return [ {'role': 'system', 'content': 'Analyze this call transcript and provide key insights:'}, {'role': 'user', 'content': text} ] calls['insights_prompt'] = generate_insights(calls.text) calls['insights'] = openai.chat_completions( messages=calls.insights_prompt, model='gpt-4o-mini-2024-07-18' ).choices[0].message.content calls.insert([{"video": video_file}]) result = calls.select(calls.text, calls.audio, calls.insights).tail(1) return result['text'][0], result['audio'][0], result['insights'][0] except Exception as e: return f"Error processing call: {str(e)}", None, None # Video Search class VideoSearcher: @staticmethod def process_video(video_file): """Process video for searching""" try: initialize_pixeltable() videos = pxt.create_table('unified_app.videos', {'video': pxt.VideoType()}) frames = pxt.create_view( 'unified_app.frames', videos, iterator=FrameIterator.create(video=videos.video, fps=1) ) frames.add_embedding_index('frame', string_embed=str_embed, image_embed=embed_image) videos.insert([{'video': video_file.name}]) return "Video processed and indexed for search." except Exception as e: return f"Error processing video: {str(e)}" @staticmethod def search_video(search_type, text_query=None, image_query=None): """Search processed video frames""" try: frames = pxt.get_table('unified_app.frames') if search_type == "Text" and text_query: sim = frames.frame.similarity(text_query) elif search_type == "Image" and image_query is not None: sim = frames.frame.similarity(image_query) else: return [] results = frames.order_by(sim, asc=False).limit(5).select(frames.frame).collect() return [row['frame'] for row in results] except Exception as e: print(f"Search error: {str(e)}") return [] # Gradio Interface def create_interface(): with gr.Blocks(theme=gr.themes.Base()) as demo: # Header gr.HTML( """
Pixeltable is a declarative interface for working with text, images, embeddings, and video, enabling you to store, transform, index, and iterate on data.
Open Source AI Data infrastructure.
© 2024 Pixeltable | Apache License 2.0