import os import pinecone import openai import gradio as gr from dotenv import load_dotenv from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.docstore.document import Document import boto3 # Load environment variables load_dotenv() # Access secrets from environment variables openai.api_key = os.getenv("OPENAI_API_KEY") pinecone_api_key = os.getenv("PINECONE_API_KEY") aws_access_key = os.getenv("AWS_ACCESS_KEY_ID") aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") bucket_name = 'amtrak-superliner-ai-poc' txt_file_name = 'combined_extracted_text.txt' index_name = "amtrak-acela-ai-demo" # Initialize Pinecone using the new class-based method pc = pinecone.Pinecone(api_key=pinecone_api_key) # Initialize AWS S3 client s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name='us-east-1' ) # Initialize Pinecone index (check if it exists, otherwise create it) def initialize_pinecone_index(index_name, embedding_dim): available_indexes = pc.list_indexes().names() if index_name not in available_indexes: pc.create_index( name=index_name, dimension=embedding_dim, metric="cosine", spec=pinecone.ServerlessSpec( cloud="aws", region="us-east-1" ) ) return pc.Index(index_name) embedding_dim = 768 index = initialize_pinecone_index(index_name, embedding_dim) # Initialize HuggingFace embedding model embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/msmarco-distilbert-base-v4") # Download and load text from S3 def download_text_from_s3(s3_client, bucket_name, file_name): local_txt_path = os.path.join(os.getcwd(), file_name) s3_client.download_file(bucket_name, file_name, local_txt_path) with open(local_txt_path, 'r', encoding='utf-8') as f: return f.read() doc_text = download_text_from_s3(s3_client, bucket_name, txt_file_name) # Split and embed the document text def process_text_into_embeddings(doc_text): text_splitter = CharacterTextSplitter(separator='\n', chunk_size=3000, chunk_overlap=500) docs = text_splitter.split_documents([Document(page_content=doc_text)]) doc_embeddings = embedding_model.embed_documents([doc.page_content for doc in docs]) return docs, doc_embeddings # Check if embeddings already exist in Pinecone def check_embeddings_in_pinecone(index): try: stats = index.describe_index_stats() return stats['total_vector_count'] > 0 except Exception as e: print(f"Error checking Pinecone index: {e}") return False # Only process embeddings if they don't already exist in Pinecone if not check_embeddings_in_pinecone(index): split_docs, doc_embeddings = process_text_into_embeddings(doc_text) for i, doc in enumerate(split_docs): metadata = {'content': doc.page_content} index.upsert(vectors=[(str(i), doc_embeddings[i], metadata)]) else: print("Embeddings already exist in Pinecone. Skipping embedding process.") # Query Pinecone and OpenAI GPT-4 to generate a response def get_model_response(human_input, chat_history=None): try: # Embed the query using the embedding model query_embedding = embedding_model.embed_query(human_input) # Query Pinecone index to retrieve relevant content search_results = index.query(vector=query_embedding, top_k=3, include_metadata=True) # Prepare content and image data context_list = [] images = [] # Extract the content from Pinecone's search results for ind, result in enumerate(search_results['matches']): document_content = result.get('metadata', {}).get('content', 'No content found') image_url = result.get('metadata', {}).get('image_path', None) figure_desc = result.get('metadata', {}).get('figure_description', '') context_list.append(f"Document {ind+1}: {document_content}") if image_url and figure_desc: # Only append images that exist and have description images.append((figure_desc, image_url)) # Combine context from the search results context_string = '\n\n'.join(context_list) # Build messages list for OpenAI messages = [ {"role": "system", "content": "You are a helpful assistant."}, # System prompt {"role": "user", "content": f"Here is some context:\n{context_string}\n\nUser's question: {human_input}"} ] # Send the conversation to OpenAI's API, using GPT-3.5 instead of GPT-4 response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, max_tokens=500, temperature=0.5 ) # Get the model's response output_text = response['choices'][0]['message']['content'].strip() # Return both the output and any images found return output_text, images except Exception as e: return f"Error invoking model: {str(e)}", [] # Function to format text and images for display def get_model_response_with_history(human_input, chat_history=None): if chat_history is None: chat_history = [] output_text, chat_history = get_model_response(human_input, chat_history) # Handle image display def process_image(image_data): if isinstance(image_data, list): # If a list is passed, flatten it to a string return " ".join(str(item) for item in image_data) return str(image_data) if chat_history: # Ensure that any file/image alt_text is handled correctly for message in chat_history: if "alt_text" in message: message["alt_text"] = process_image(message.get("alt_text", "")) return output_text # Set up Gradio interface without share=True to avoid the error for now gr_interface = gr.ChatInterface( fn=get_model_response_with_history, title="Maintenance Assistant", description="Ask questions related to the RMM documents." ) # Launch the Gradio interface gr_interface.launch()