shivanis14's picture
Update app.py
83cf2b8 verified
raw
history blame
22.2 kB
import streamlit as st
from openai import OpenAI
import json, os
import requests
#Used the @st.cache_resource decorator on this function.
#This Streamlit decorator ensures that the function is only executed once and its result (the OpenAI client) is cached.
#Subsequent calls to this function will return the cached client, avoiding unnecessary recreation.
@st.cache_resource
def get_openai_client():
#Enable debug mode for testing only
return True, OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
@st.cache_resource
def get_backend_urls():
data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/"
return data_extractor_url
debug_mode, client = get_openai_client()
data_extractor_url = get_backend_urls()
def extract_data_from_product_image(image_links, data_extractor_url):
#Send product label image url to data extractor
url = data_extractor_url + "extract"
data = {
"image_links": image_links
}
try:
response = requests.post(url, json=data)
if response.status_code == 200 or response.status_code == 201:
print("POST Response:", response.json()) # Assuming JSON response
return response.json()
else:
print(f"POST Request failed with status code: {response.status_code}")
return {}
except requests.exceptions.RequestException as e:
print(f"Error occurred: {e}")
return {}
def get_product_data_from_db(product_name, data_extractor_url):
#Extract data for a product by calling data extractor's API : https://data-extractor-3cn8or2tc-sonikas-projects-9936eaad.vercel.app/
url = data_extractor_url + "product"
params = {"name": product_name}
try:
response = requests.get(url, params = params)
# Check if the request was successful
if response.status_code == 200:
print("GET Response:", response.json()) # Assuming the response is JSON
return response.json()
else:
print(f"GET Request failed with status code: {response.status_code}")
return {}
except requests.exceptions.RequestException as e:
print(f"Error occurred: {e}")
return {}
def get_product_list(product_name_by_user, data_extractor_url):
url = data_extractor_url + "find_product"
params = {"name": product_name_by_user}
try:
response = requests.get(url, params = params)
# Check if the request was successful
if response.status_code == 200:
print("GET Response:", response.json()) # Assuming the response is JSON
return response.json()
else:
print(f"GET Request failed with status code: {response.status_code}")
return {}
except requests.exceptions.RequestException as e:
print(f"Error occurred: {e}")
return {}
# Initialize assistants and vector stores
# Function to initialize vector stores and assistants
@st.cache_resource
def initialize_assistants_and_vector_stores():
#Processing Level
global client
assistant1 = client.beta.assistants.create(
name="Processing Level",
instructions="You are an expert dietician. Use you knowledge base to answer questions about the processing level of food product.",
model="gpt-4o",
tools=[{"type": "file_search"}],
temperature=0,
top_p = 0.85
)
#Harmful Ingredients
assistant2 = client.beta.assistants.create(
name="Harmful Ingredients",
instructions="You are an expert dietician. Use you knowledge base to answer questions about the ingredients in food product.",
model="gpt-4o",
tools=[{"type": "file_search"}],
temperature=0,
top_p = 0.85
)
#Harmful Ingredients
assistant3 = client.beta.assistants.create(
name="Misleading Claims",
instructions="You are an expert dietician. Use you knowledge base to answer questions about the misleading claims about food product.",
model="gpt-4o",
tools=[{"type": "file_search"}],
temperature=0,
top_p = 0.85
)
# Create a vector store
vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec")
# Ready the files for upload to OpenAI
file_paths = ["Processing_Level.docx"]
file_streams = [open(path, "rb") for path in file_paths]
# Use the upload and poll SDK helper to upload the files, add them to the vector store,
# and poll the status of the file batch for completion.
file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store1.id, files=file_streams
)
# You can print the status and the file counts of the batch to see the result of this operation.
print(file_batch1.status)
print(file_batch1.file_counts)
# Create a vector store
vector_store2 = client.beta.vector_stores.create(name="Harmful Ingredients Vec")
# Ready the files for upload to OpenAI
file_paths = ["Ingredients.docx"]
file_streams = [open(path, "rb") for path in file_paths]
# Use the upload and poll SDK helper to upload the files, add them to the vector store,
# and poll the status of the file batch for completion.
file_batch2 = client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store2.id, files=file_streams
)
# You can print the status and the file counts of the batch to see the result of this operation.
print(file_batch2.status)
print(file_batch2.file_counts)
# Create a vector store
vector_store3 = client.beta.vector_stores.create(name="Misleading Claims Vec")
# Ready the files for upload to OpenAI
file_paths = ["MisLeading_Claims.docx"]
file_streams = [open(path, "rb") for path in file_paths]
# Use the upload and poll SDK helper to upload the files, add them to the vector store,
# and poll the status of the file batch for completion.
file_batch3 = client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store3.id, files=file_streams
)
# You can print the status and the file counts of the batch to see the result of this operation.
print(file_batch3.status)
print(file_batch3.file_counts)
#Processing Level
assistant1 = client.beta.assistants.update(
assistant_id=assistant1.id,
tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}},
)
#harmful Ingredients
assistant2 = client.beta.assistants.update(
assistant_id=assistant2.id,
tool_resources={"file_search": {"vector_store_ids": [vector_store2.id]}},
)
#Misleading Claims
assistant3 = client.beta.assistants.update(
assistant_id=assistant3.id,
tool_resources={"file_search": {"vector_store_ids": [vector_store3.id]}},
)
return assistant1, assistant2, assistant3
assistant1, assistant2, assistant3 = initialize_assistants_and_vector_stores()
def analyze_processing_level(ingredients, brand_name, product_name, assistant_id):
global debug_mode, client
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "Categorize food product that has following ingredients: " + ', '.join(ingredients) + " into Group A, Group B, or Group C based on the document. The output must only be the group category name (Group A, Group B, or Group C) alongwith the reason behind assigning that respective category to the product. If the group category cannot be determined, output 'NOT FOUND'.",
}
]
)
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant_id,
include=["step_details.tool_calls[*].file_search.results[*].content"]
)
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id))
message_content = messages[0].content[0].text
annotations = message_content.annotations
#citations = []
for index, annotation in enumerate(annotations):
message_content.value = message_content.value.replace(annotation.text, "")
#if file_citation := getattr(annotation, "file_citation", None):
# cited_file = client.files.retrieve(file_citation.file_id)
# citations.append(f"[{index}] {cited_file.filename}")
if debug_mode:
print(message_content.value)
processing_level_str = message_content.value
return processing_level_str
def analyze_harmful_ingredients(ingredients, brand_name, product_name, assistant_id):
global debug_mode, client
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "Provide detailed information about food product that has following ingredients: " + ', '.join(ingredients) + ". The output must be in JSON format: {<ingredient_name>: <information from the document>}. If information about an ingredient is not found in the documents, the value for that ingredient must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.",
}
]
)
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant_id,
include=["step_details.tool_calls[*].file_search.results[*].content"]
)
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id))
message_content = messages[0].content[0].text
annotations = message_content.annotations
#citations = []
#print(f"Length of annotations is {len(annotations)}")
for index, annotation in enumerate(annotations):
if file_citation := getattr(annotation, "file_citation", None):
#cited_file = client.files.retrieve(file_citation.file_id)
#citations.append(f"[{index}] {cited_file.filename}")
message_content.value = message_content.value.replace(annotation.text, "")
if debug_mode:
ingredients_not_found_in_doc = []
print(message_content.value)
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items():
if value.startswith("(NOT FOUND IN DOCUMENT)"):
ingredients_not_found_in_doc.append(key)
print(f"Ingredients not found in the harmful ingredients doc are {','.join(ingredients_not_found_in_doc)}")
harmful_ingredient_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", ""))
harmful_ingredient_analysis_str = ""
for key, value in harmful_ingredient_analysis.items():
harmful_ingredient_analysis_str += f"{key}: {value}\n"
return harmful_ingredient_analysis_str
def analyze_claims(claims, assistant_id):
global debug_mode, client
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "Provide detailed information about the food product with following claims: " + ', '.join(claims) + ". The output must be in JSON format: {<claim_name>: <information from the document>}. If information about a claim is not found in the documents, the value for that claim must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.",
}
]
)
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant_id,
include=["step_details.tool_calls[*].file_search.results[*].content"]
)
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id))
message_content = messages[0].content[0].text
annotations = message_content.annotations
#citations = []
#print(f"Length of annotations is {len(annotations)}")
for index, annotation in enumerate(annotations):
if file_citation := getattr(annotation, "file_citation", None):
#cited_file = client.files.retrieve(file_citation.file_id)
#citations.append(f"[{index}] {cited_file.filename}")
message_content.value = message_content.value.replace(annotation.text, "")
if debug_mode:
claims_not_found_in_doc = []
print(message_content.value)
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items():
if value.startswith("(NOT FOUND IN DOCUMENT)"):
claims_not_found_in_doc.append(key)
print(f"Claims not found in the doc are {','.join(claims_not_found_in_doc)}")
claims_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", ""))
claims_analysis_str = ""
for key, value in claims_analysis.items():
claims_analysis_str += f"{key}: {value}\n"
return claims_analysis_str
def generate_final_analysis(brand_name, product_name, processing_level, harmful_ingredient_analysis, claims_analysis):
global debug_mode, client
system_prompt = """You are provided with a detailed analysis of a food product. Your task is to generate actionable insights to help the user decide whether to consume the product, at what frequency, and identify any potential harms or benefits. Consider the context of consumption to ensure the advice is personalized and practical.
Use the following criteria to generate your response:
1. **Nutrition Analysis:**
- How processed is the product?
2. **Harmful Ingredients:**
- Identify any harmful or questionable ingredients.
3. **Misleading Claims:**
- Are there any misleading claims made by the brand?
Additionally, consider the following while generating insights:
1. **Consumption Context:**
- Is the product being consumed for health reasons or as a treat?
- Could the consumer be overlooking hidden harms?
- If the product is something they could consume daily, should they?
- If they are consuming it daily, what potential harm are they not noticing?
- If the product is intended for health purposes, are there concerns the user might miss?
**Output:**
- Recommend whether the product should be consumed or avoided.
- If recommended, specify the appropriate frequency and intended functionality (e.g., treat vs. health).
- Highlight any risks or benefits at that level of consumption."""
user_prompt = f"""
Product Name: {brand_name} {product_name}
Processing Level:
{processing_level}
Ingredient Analysis:
{harmful_ingredient_analysis}
Claims Analysis:
{claims_analysis}
"""
if debug_mode:
print(f"\nuser_prompt : \n {user_prompt}")
completion = client.chat.completions.create(
model="gpt-4o", # Make sure to use an appropriate model
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return completion.choices[0].message.content
def analyze_product(product_info_raw):
global assistant1, assistant2, assistant3
if product_info_raw != "{}":
product_info_from_db = json.loads(product_info_raw)
brand_name = product_info_from_db.get("brandName", "")
product_name = product_info_from_db.get("productName", "")
ingredients_list = [ingredient["name"] for ingredient in product_info_from_db.get("ingredients", [])]
claims_list = product_info_from_db.get("claims", [])
if len(ingredients_list) > 0:
processing_level = analyze_processing_level(ingredients_list, brand_name, product_name, assistant1.id) if ingredients_list else ""
harmful_ingredient_analysis = analyze_harmful_ingredients(ingredients_list, brand_name, product_name, assistant2.id) if ingredients_list else ""
if len(claims_list) > 0:
claims_analysis = analyze_claims(claims_list, assistant3.id) if claims_list else ""
final_analysis = generate_final_analysis(brand_name,product_name,processing_level,harmful_ingredient_analysis,claims_analysis)
return final_analysis
else:
return "I'm sorry, product information could not be extracted from the url."
# Streamlit app
# Initialize session state
if 'messages' not in st.session_state:
st.session_state.messages = []
def chatbot_response(image_urls_str, product_name_by_user, data_extractor_url, extract_info = True):
# Process the user input and generate a response
processing_level = ""
harmful_ingredient_analysis = ""
claims_analysis = ""
image_urls = []
if product_name_by_user != "":
similar_product_list_json = get_product_list(product_name_by_user, data_extractor_url)
if similar_product_list_json != {} and extract_info == False:
with st.spinner("Fetching product information from our database... This may take a moment."):
print(f"similar_product_list_json : {similar_product_list_json}")
similar_product_list = similar_product_list_json['product_list']
return similar_product_list, "Product list found from our database"
elif extract_info == True:
with st.spinner("Analyzing the product... This may take a moment."):
product_info_raw = get_product_data_from_db(product_name_by_user, data_extractor_url)
print(f"DEBUG product_info_raw : {product_info_raw}")
final_analysis = analyze_product(product_info_raw)
return [], final_analysis
else:
return [], "Product not found in our database."
elif "http:/" in image_urls_str.lower() or "https:/" in image_urls_str.lower():
# Extract image URL from user input
if "," not in image_urls_str:
image_urls.append(image_urls_str)
else:
for url in image_urls_str.split(","):
if "http:/" in url.lower() or "https:/" in url.lower():
image_urls.append(url)
with st.spinner("Analyzing the product... This may take a moment."):
product_info_raw = extract_data_from_product_image(image_urls, data_extractor_url)
print(f"DEBUG product_info_raw : {product_info_raw}")
final_analysis = analyze_product(product_info_raw)
return [], final_analysis
else:
return [], "I'm here to analyze food products. Please provide an image URL (Example : http://example.com/image.jpg) or product name (Example : Harvest Gold Bread)"
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "product_selected" not in st.session_state:
st.session_state.product_selected = False
st.title("ConsumeWise")
#st.write("Hello! I'm your food product analysis assistant.")
# Chat interface
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# User input
if product_name_by_user := st.chat_input("Enter name of the product you like to analyze? (Example : Marie Gold Biscuit)"):
st.session_state.messages.append({"role": "user", "content": product_name_by_user})
with st.chat_message("user"):
st.markdown(product_name_by_user)
# Generate bot response
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
# Simulate stream of response with milliseconds delay
similar_products, msg = chatbot_response("", product_name_by_user, data_extractor_url, extract_info=False)
for product in similar_products:
full_response += product + " "
message_placeholder.markdown(full_response + "▌")
if len(similar_products) == 0:
full_response = msg
message_placeholder.markdown(full_response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})
# Product selection (if needed)
if similar_products and not st.session_state.product_selected:
choice = st.radio("Select a product:", similar_products + ["None of the above"])
if choice != "None of the above":
st.session_state.product_selected = True
with st.chat_message("user"):
st.markdown(f"I choose: {choice}")
st.session_state.messages.append({"role": "user", "content": f"I choose: {choice}"})
# Generate detailed response for selected product
with st.chat_message("assistant"):
_, detailed_response = chatbot_response("", choice, data_extractor_url, extract_info=True)
st.markdown(detailed_response)
st.session_state.messages.append({"role": "assistant", "content": detailed_response})
elif choice == "None of the above":
st.text_input("Please provide image URLs separated by commas:")
elif len(similar_products) == 0:
# Save response of the user to the last message by assistant (full_response)
# Get image URLs from user input
image_urls_str = st.text_input("Please provide image URLs separated by commas:")
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": image_urls_str})
with st.chat_message("user"):
st.markdown(image_urls_str)
# Generate detailed response for the image URLs
with st.chat_message("assistant"):
_, detailed_response = chatbot_response(image_urls_str, "", data_extractor_url, extract_info=True)
st.markdown(detailed_response)
st.session_state.messages.append({"role": "assistant", "content": detailed_response})
# Option to clear chat history
if st.button("Clear Chat History"):
st.session_state.messages = []
st.session_state.product_selected = False
st.rerun()