import gradio as gr import pixeltable as pxt from pixeltable.iterators import DocumentSplitter, FrameIterator, StringSplitter from pixeltable.functions.huggingface import sentence_transformer, clip_image, clip_text from pixeltable.functions.video import extract_audio from pixeltable.functions.audio import get_metadata from pixeltable.functions import openai import numpy as np import PIL.Image import os import getpass import requests import tempfile from datetime import datetime # Configuration PIXELTABLE_MEDIA_DIR = os.path.expanduser("~/.pixeltable/media") MAX_TOKENS_DEFAULT = 300 TEMPERATURE_DEFAULT = 0.7 CHUNK_SIZE_DEFAULT = 300 # Initialize API keys def init_api_keys(): if 'OPENAI_API_KEY' not in os.environ: os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API key:') # Embedding Functions @pxt.expr_udf def e5_embed(text: str) -> np.ndarray: return sentence_transformer(text, model_id='intfloat/e5-large-v2') @pxt.expr_udf def embed_image(img: PIL.Image.Image): return clip_image(img, model_id='openai/clip-vit-base-patch32') @pxt.expr_udf def str_embed(s: str): return clip_text(s, model_id='openai/clip-vit-base-patch32') # Common Utilities def initialize_pixeltable(dir_name='unified_app'): """Initialize Pixeltable directory""" pxt.drop_dir(dir_name, force=True) pxt.create_dir(dir_name) @pxt.udf def create_prompt(top_k_list: list[dict], question: str) -> str: """Create a standardized prompt format""" concat_top_k = '\n\n'.join(elt['text'] for elt in reversed(top_k_list)) return f''' PASSAGES: {concat_top_k} QUESTION: {question}''' @pxt.udf(return_type=pxt.AudioType()) def generate_audio(script: str, voice: str, api_key: str): """Generate audio from text using OpenAI's API""" if not script or not voice: return None try: response = requests.post( "https://api.openai.com/v1/audio/speech", headers={"Authorization": f"Bearer {api_key}"}, json={"model": "tts-1", "input": script, "voice": voice} ) if response.status_code == 200: temp_dir = os.path.join(os.getcwd(), "temp") os.makedirs(temp_dir, exist_ok=True) temp_file = os.path.join(temp_dir, f"audio_{os.urandom(8).hex()}.mp3") with open(temp_file, 'wb') as f: f.write(response.content) return temp_file except Exception as e: print(f"Error in audio synthesis: {e}") return None # Document Processing class DocumentProcessor: @staticmethod def process_documents(pdf_files, chunk_limit, chunk_separator): """Process uploaded documents for chatbot functionality""" initialize_pixeltable() docs = pxt.create_table( 'unified_app.documents', {'document': pxt.DocumentType(nullable=True)} ) docs.insert({'document': file.name} for file in pdf_files if file.name.endswith('.pdf')) chunks = pxt.create_view( 'unified_app.chunks', docs, iterator=DocumentSplitter.create( document=docs.document, separators=chunk_separator, limit=chunk_limit if chunk_separator in ["token_limit", "char_limit"] else None ) ) chunks.add_embedding_index('text', string_embed=e5_embed) return "Documents processed successfully. You can start asking questions." @staticmethod def get_document_answer(question): """Get answer from processed documents""" try: chunks = pxt.get_table('unified_app.chunks') sim = chunks.text.similarity(question) relevant_chunks = chunks.order_by(sim, asc=False).limit(5).select(chunks.text).collect() context = "\n\n".join(chunk['text'] for chunk in relevant_chunks) temp_table = pxt.create_table( 'unified_app.temp_response', { 'question': pxt.StringType(), 'context': pxt.StringType() } ) temp_table.insert([{'question': question, 'context': context}]) temp_table['response'] = openai.chat_completions( messages=[ { 'role': 'system', 'content': 'Answer the question based only on the provided context. If the context doesn\'t contain enough information, say so.' }, { 'role': 'user', 'content': f"Context:\n{context}\n\nQuestion: {question}" } ], model='gpt-4o-mini-2024-07-18' ) answer = temp_table.select( answer=temp_table.response.choices[0].message.content ).tail(1)['answer'][0] pxt.drop_table('unified_app.temp_response', force=True) return answer except Exception as e: return f"Error: {str(e)}" # Call Analysis class CallAnalyzer: @staticmethod def process_call(video_file): """Process and analyze call recordings""" try: initialize_pixeltable() calls = pxt.create_table( 'unified_app.calls', {"video": pxt.VideoType(nullable=True)} ) calls['audio'] = extract_audio(calls.video, format='mp3') calls['transcription'] = openai.transcriptions(audio=calls.audio, model='whisper-1') calls['text'] = calls.transcription.text sentences = pxt.create_view( 'unified_app.sentences', calls, iterator=StringSplitter.create(text=calls.text, separators='sentence') ) sentences.add_embedding_index('text', string_embed=e5_embed) @pxt.udf def generate_insights(text: str) -> list[dict]: return [ {'role': 'system', 'content': 'Analyze this call transcript and provide key insights:'}, {'role': 'user', 'content': text} ] calls['insights_prompt'] = generate_insights(calls.text) calls['insights'] = openai.chat_completions( messages=calls.insights_prompt, model='gpt-4o-mini-2024-07-18' ).choices[0].message.content calls.insert([{"video": video_file}]) result = calls.select(calls.text, calls.audio, calls.insights).tail(1) return result['text'][0], result['audio'][0], result['insights'][0] except Exception as e: return f"Error processing call: {str(e)}", None, None # Video Search class VideoSearcher: @staticmethod def process_video(video_file): """Process video for searching""" try: initialize_pixeltable() videos = pxt.create_table('unified_app.videos', {'video': pxt.VideoType()}) frames = pxt.create_view( 'unified_app.frames', videos, iterator=FrameIterator.create(video=videos.video, fps=1) ) frames.add_embedding_index('frame', string_embed=str_embed, image_embed=embed_image) videos.insert([{'video': video_file.name}]) return "Video processed and indexed for search." except Exception as e: return f"Error processing video: {str(e)}" @staticmethod def search_video(search_type, text_query=None, image_query=None): """Search processed video frames""" try: frames = pxt.get_table('unified_app.frames') if search_type == "Text" and text_query: sim = frames.frame.similarity(text_query) elif search_type == "Image" and image_query is not None: sim = frames.frame.similarity(image_query) else: return [] results = frames.order_by(sim, asc=False).limit(5).select(frames.frame).collect() return [row['frame'] for row in results] except Exception as e: print(f"Search error: {str(e)}") return [] # Gradio Interface def create_interface(): with gr.Blocks(theme=gr.themes.Base()) as demo: # Header gr.HTML( """
Pixeltable
""" ) gr.Markdown( """ # Multimodal Powerhouse """ ) gr.HTML( """

Pixeltable is a declarative interface for working with text, images, embeddings, and video, enabling you to store, transform, index, and iterate on data.

⚠️ Note: This app runs best with GPU. For optimal performance, consider duplicating this space to run locally or with better computing resources.
""" ) # Documentation Sections with gr.Row(): with gr.Column(): with gr.Accordion("🎯 What This App Does", open=False): gr.Markdown(""" 1. 📚 **Document Processing** * Chat with your documents using RAG * Process multiple document formats * Extract key insights 2. 🎥 **Video Analysis** * Text and image-based video search * Frame extraction and indexing * Visual content discovery 3. 🎙️ **Call Analysis** * Automatic transcription * Key insight extraction * Audio processing """) with gr.Column(): with gr.Accordion("⚙️ How It Works", open=False): gr.Markdown(""" 1. 🔄 **Data Processing** * Chunking and indexing documents * Embedding generation for search * Multi-modal data handling 2. 🤖 **AI Integration** * LLM-powered analysis * Speech-to-text conversion * Semantic search capabilities 3. 📊 **Storage & Retrieval** * Efficient data organization * Quick content retrieval * Structured data management """) with gr.Tabs(): # Document Chat Tab with gr.TabItem("📚 Document Chat"): with gr.Row(): with gr.Column(): doc_files = gr.File(label="Upload Documents", file_count="multiple") chunk_size = gr.Slider( minimum=100, maximum=500, value=CHUNK_SIZE_DEFAULT, label="Chunk Size" ) chunk_type = gr.Dropdown( choices=["token_limit", "char_limit", "sentence", "paragraph"], value="token_limit", label="Chunking Method" ) process_docs_btn = gr.Button("Process Documents") process_status = gr.Textbox(label="Status") with gr.Column(): chatbot = gr.Chatbot(label="Document Chat") msg = gr.Textbox(label="Ask a question") send_btn = gr.Button("Send") # Call Analysis Tab with gr.TabItem("🎙️ Call Analysis"): with gr.Row(): with gr.Column(): call_upload = gr.Video(label="Upload Call Recording") analyze_btn = gr.Button("Analyze Call") with gr.Column(): with gr.Tabs(): with gr.TabItem("📝 Transcript"): transcript = gr.Textbox(label="Transcript", lines=10) with gr.TabItem("💡 Insights"): insights = gr.Textbox(label="Key Insights", lines=10) with gr.TabItem("🔊 Audio"): audio_output = gr.Audio(label="Extracted Audio") # Video Search Tab with gr.TabItem("🎥 Video Search"): with gr.Row(): with gr.Column(): video_upload = gr.File(label="Upload Video") process_video_btn = gr.Button("Process Video") video_status = gr.Textbox(label="Processing Status") search_type = gr.Radio( choices=["Text", "Image"], label="Search Type", value="Text" ) text_input = gr.Textbox(label="Text Query") image_input = gr.Image(label="Image Query", type="pil", visible=False) search_btn = gr.Button("Search") with gr.Column(): results_gallery = gr.Gallery(label="Search Results") # Event Handlers def document_chat(message, chat_history): bot_message = DocumentProcessor.get_document_answer(message) chat_history.append((message, bot_message)) return "", chat_history def update_search_type(choice): return { text_input: gr.update(visible=choice=="Text"), image_input: gr.update(visible=choice=="Image") } # Connect Events process_docs_btn.click( DocumentProcessor.process_documents, inputs=[doc_files, chunk_size, chunk_type], outputs=[process_status] ) send_btn.click( document_chat, inputs=[msg, chatbot], outputs=[msg, chatbot] ) analyze_btn.click( CallAnalyzer.process_call, inputs=[call_upload], outputs=[transcript, audio_output, insights] ) process_video_btn.click( VideoSearcher.process_video, inputs=[video_upload], outputs=[video_status] ) search_type.change( update_search_type, search_type, [text_input, image_input] ) search_btn.click( VideoSearcher.search_video, inputs=[search_type, text_input, image_input], outputs=[results_gallery] ) # Related Pixeltable Spaces gr.Markdown("## 🌟 Explore More Pixeltable Apps") with gr.Row(): with gr.Column(): gr.HTML( """

📚 Document & Text Processing

""" ) with gr.Column(): gr.HTML( """

🎥 Video & Audio Processing

""" ) with gr.Column(): gr.HTML( """

🎮 Interactive Applications

""" ) gr.HTML( """

🚀 Built with Pixeltable

Open Source AI Data infrastructure.

🔗 Resources

💻 GitHub 📚 Documentation 🤗 Hugging Face

© 2024 Pixeltable | Apache License 2.0

""" ) return demo if __name__ == "__main__": init_api_keys() demo = create_interface() demo.launch( allowed_paths=[PIXELTABLE_MEDIA_DIR], show_api=False )