import streamlit as st from openai import OpenAI import json, os import requests #Used the @st.cache_resource decorator on this function. #This Streamlit decorator ensures that the function is only executed once and its result (the OpenAI client) is cached. #Subsequent calls to this function will return the cached client, avoiding unnecessary recreation. @st.cache_resource def get_openai_client(): #Enable debug mode for testing only return True, OpenAI(api_key=os.getenv("OPENAI_API_KEY")) @st.cache_resource def get_backend_urls(): data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/" return data_extractor_url debug_mode, client = get_openai_client() data_extractor_url = get_backend_urls() def extract_data_from_product_image(image_links, data_extractor_url): #Send product label image url to data extractor url = data_extractor_url + "extract" data = { "image_links": image_links } try: response = requests.post(url, json=data) if response.status_code == 200 or response.status_code == 201: print("POST Response:", response.json()) # Assuming JSON response return response.json() else: print(f"POST Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} def get_product_data_from_db(product_name, data_extractor_url): #Extract data for a product by calling data extractor's API : https://data-extractor-3cn8or2tc-sonikas-projects-9936eaad.vercel.app/ url = data_extractor_url + "product" params = {"name": product_name} try: response = requests.get(url, params = params) # Check if the request was successful if response.status_code == 200: print("GET Response:", response.json()) # Assuming the response is JSON return response.json() else: print(f"GET Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} def get_product_list(product_name_by_user, data_extractor_url): url = data_extractor_url + "find_product" params = {"name": product_name_by_user} try: response = requests.get(url, params = params) # Check if the request was successful if response.status_code == 200: print("GET Response:", response.json()) # Assuming the response is JSON return response.json() else: print(f"GET Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} # Initialize assistants and vector stores # Function to initialize vector stores and assistants @st.cache_resource def initialize_assistants_and_vector_stores(): #Processing Level global client assistant1 = client.beta.assistants.create( name="Processing Level", instructions="You are an expert dietician. Use you knowledge base to answer questions about the processing level of food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) #Harmful Ingredients assistant2 = client.beta.assistants.create( name="Harmful Ingredients", instructions="You are an expert dietician. Use you knowledge base to answer questions about the ingredients in food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) #Harmful Ingredients assistant3 = client.beta.assistants.create( name="Misleading Claims", instructions="You are an expert dietician. Use you knowledge base to answer questions about the misleading claims about food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) # Create a vector store vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec") # Ready the files for upload to OpenAI file_paths = ["Processing_Level.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store1.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch1.status) print(file_batch1.file_counts) # Create a vector store vector_store2 = client.beta.vector_stores.create(name="Harmful Ingredients Vec") # Ready the files for upload to OpenAI file_paths = ["Ingredients.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch2 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store2.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch2.status) print(file_batch2.file_counts) # Create a vector store vector_store3 = client.beta.vector_stores.create(name="Misleading Claims Vec") # Ready the files for upload to OpenAI file_paths = ["MisLeading_Claims.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch3 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store3.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch3.status) print(file_batch3.file_counts) #Processing Level assistant1 = client.beta.assistants.update( assistant_id=assistant1.id, tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}}, ) #harmful Ingredients assistant2 = client.beta.assistants.update( assistant_id=assistant2.id, tool_resources={"file_search": {"vector_store_ids": [vector_store2.id]}}, ) #Misleading Claims assistant3 = client.beta.assistants.update( assistant_id=assistant3.id, tool_resources={"file_search": {"vector_store_ids": [vector_store3.id]}}, ) return assistant1, assistant2, assistant3 assistant1, assistant2, assistant3 = initialize_assistants_and_vector_stores() def analyze_processing_level(ingredients, brand_name, product_name, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Categorize food product that has following ingredients: " + ', '.join(ingredients) + " into Group A, Group B, or Group C based on the document. The output must only be the group category name (Group A, Group B, or Group C) alongwith the reason behind assigning that respective category to the product. If the group category cannot be determined, output 'NOT FOUND'.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] for index, annotation in enumerate(annotations): message_content.value = message_content.value.replace(annotation.text, "") #if file_citation := getattr(annotation, "file_citation", None): # cited_file = client.files.retrieve(file_citation.file_id) # citations.append(f"[{index}] {cited_file.filename}") if debug_mode: print(message_content.value) processing_level_str = message_content.value return processing_level_str def analyze_harmful_ingredients(ingredients, brand_name, product_name, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Provide detailed information about food product that has following ingredients: " + ', '.join(ingredients) + ". The output must be in JSON format: {: }. If information about an ingredient is not found in the documents, the value for that ingredient must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] #print(f"Length of annotations is {len(annotations)}") for index, annotation in enumerate(annotations): if file_citation := getattr(annotation, "file_citation", None): #cited_file = client.files.retrieve(file_citation.file_id) #citations.append(f"[{index}] {cited_file.filename}") message_content.value = message_content.value.replace(annotation.text, "") if debug_mode: ingredients_not_found_in_doc = [] print(message_content.value) for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): if value.startswith("(NOT FOUND IN DOCUMENT)"): ingredients_not_found_in_doc.append(key) print(f"Ingredients not found in the harmful ingredients doc are {','.join(ingredients_not_found_in_doc)}") harmful_ingredient_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) harmful_ingredient_analysis_str = "" for key, value in harmful_ingredient_analysis.items(): harmful_ingredient_analysis_str += f"{key}: {value}\n" return harmful_ingredient_analysis_str def analyze_claims(claims, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Provide detailed information about the food product with following claims: " + ', '.join(claims) + ". The output must be in JSON format: {: }. If information about a claim is not found in the documents, the value for that claim must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] #print(f"Length of annotations is {len(annotations)}") for index, annotation in enumerate(annotations): if file_citation := getattr(annotation, "file_citation", None): #cited_file = client.files.retrieve(file_citation.file_id) #citations.append(f"[{index}] {cited_file.filename}") message_content.value = message_content.value.replace(annotation.text, "") if debug_mode: claims_not_found_in_doc = [] print(message_content.value) for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): if value.startswith("(NOT FOUND IN DOCUMENT)"): claims_not_found_in_doc.append(key) print(f"Claims not found in the doc are {','.join(claims_not_found_in_doc)}") claims_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) claims_analysis_str = "" for key, value in claims_analysis.items(): claims_analysis_str += f"{key}: {value}\n" return claims_analysis_str def generate_final_analysis(brand_name, product_name, processing_level, harmful_ingredient_analysis, claims_analysis): global debug_mode, client system_prompt = """You are provided with a detailed analysis of a food product. Your task is to generate actionable insights to help the user decide whether to consume the product, at what frequency, and identify any potential harms or benefits. Consider the context of consumption to ensure the advice is personalized and practical. Use the following criteria to generate your response: 1. **Nutrition Analysis:** - How processed is the product? 2. **Harmful Ingredients:** - Identify any harmful or questionable ingredients. 3. **Misleading Claims:** - Are there any misleading claims made by the brand? Additionally, consider the following while generating insights: 1. **Consumption Context:** - Is the product being consumed for health reasons or as a treat? - Could the consumer be overlooking hidden harms? - If the product is something they could consume daily, should they? - If they are consuming it daily, what potential harm are they not noticing? - If the product is intended for health purposes, are there concerns the user might miss? **Output:** - Recommend whether the product should be consumed or avoided. - If recommended, specify the appropriate frequency and intended functionality (e.g., treat vs. health). - Highlight any risks or benefits at that level of consumption.""" user_prompt = f""" Product Name: {brand_name} {product_name} Processing Level: {processing_level} Ingredient Analysis: {harmful_ingredient_analysis} Claims Analysis: {claims_analysis} """ if debug_mode: print(f"\nuser_prompt : \n {user_prompt}") completion = client.chat.completions.create( model="gpt-4o", # Make sure to use an appropriate model messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] ) return completion.choices[0].message.content def analyze_product(product_info_raw): global assistant1, assistant2, assistant3 if product_info_raw != "{}": product_info_from_db = json.loads(product_info_raw) brand_name = product_info_from_db.get("brandName", "") product_name = product_info_from_db.get("productName", "") ingredients_list = [ingredient["name"] for ingredient in product_info_from_db.get("ingredients", [])] claims_list = product_info_from_db.get("claims", []) if len(ingredients_list) > 0: processing_level = analyze_processing_level(ingredients_list, brand_name, product_name, assistant1.id) if ingredients_list else "" harmful_ingredient_analysis = analyze_harmful_ingredients(ingredients_list, brand_name, product_name, assistant2.id) if ingredients_list else "" if len(claims_list) > 0: claims_analysis = analyze_claims(claims_list, assistant3.id) if claims_list else "" final_analysis = generate_final_analysis(brand_name,product_name,processing_level,harmful_ingredient_analysis,claims_analysis) return final_analysis else: return "I'm sorry, product information could not be extracted from the url." # Streamlit app # Initialize session state if 'messages' not in st.session_state: st.session_state.messages = [] def chatbot_response(image_urls_str, product_name_by_user, data_extractor_url, extract_info = True): # Process the user input and generate a response processing_level = "" harmful_ingredient_analysis = "" claims_analysis = "" image_urls = [] if product_name_by_user != "": similar_product_list_json = get_product_list(product_name_by_user, data_extractor_url) if similar_product_list_json != {} and extract_info == False: print(f"similar_product_list_json : {similar_product_list_json}") similar_product_list = similar_product_list_json['product_list'] return similar_product_list, "Product list found" elif extract_info == True: with st.spinner("Analyzing the product... This may take a moment."): product_info_raw = get_product_data_from_db(product_name_by_user, data_extractor_url) print(f"DEBUG product_info_raw : {product_info_raw}") final_analysis = analyze_product(product_info_raw) return [], final_analysis else: return [], "Product not found" elif "http:/" in image_urls_str.lower() or "https:/" in image_urls_str.lower(): # Extract image URL from user input if "," not in image_urls_str: image_urls.append(image_urls_str) else: for url in image_urls_str.split(","): if "http:/" in url.lower() or "https:/" in url.lower(): image_urls.append(url) with st.spinner("Analyzing the product... This may take a moment."): product_info_raw = extract_data_from_product_image(image_urls, data_extractor_url) print(f"DEBUG product_info_raw : {product_info_raw}") final_analysis = analyze_product(product_info_raw) return [], final_analysis else: return [], "I'm here to analyze food products. Please provide an image URL (Example : http://example.com/image.jpg) or product name (Example : Harvest Gold Bread)" # Initialize session state if "messages" not in st.session_state: st.session_state.messages = [] if "product_selected" not in st.session_state: st.session_state.product_selected = False st.title("ConsumeWise") st.write("Hello! I'm your food product analysis assistant.") # Chat interface for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # User input if prompt := st.chat_input("What product would you like to analyze?"): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Generate bot response with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" # Simulate stream of response with milliseconds delay similar_products, _ = chatbot_response(prompt, "", data_extractor_url, extract_info=False): for product in similar_products: full_response += product + " " message_placeholder.markdown(full_response + "▌") message_placeholder.markdown(full_response) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": full_response}) # Product selection (if needed) if similar_products and not st.session_state.product_selected: choice = st.radio("Select a product:", similar_products + ["None of the above"]) if choice != "None of the above": st.session_state.product_selected = True with st.chat_message("user"): st.markdown(f"I choose: {choice}") st.session_state.messages.append({"role": "user", "content": f"I choose: {choice}"}) # Generate detailed response for selected product with st.chat_message("assistant"): detailed_response = chatbot_response("", choice, data_extractor_url, extract_info=True) st.markdown(detailed_response) st.session_state.messages.append({"role": "assistant", "content": detailed_response}) elif choice == "None of the above": st.text_input("Please provide image URLs separated by commas:") # Option to clear chat history if st.button("Clear Chat History"): st.session_state.messages = [] st.session_state.product_selected = False st.rerun()