|
import streamlit as st |
|
from openai import OpenAI |
|
import json, os |
|
import requests |
|
|
|
|
|
|
|
|
|
|
|
@st.cache_resource |
|
def get_openai_client(): |
|
|
|
return True, OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
|
|
|
@st.cache_resource |
|
def get_backend_urls(): |
|
data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/" |
|
return data_extractor_url |
|
|
|
debug_mode, client = get_openai_client() |
|
data_extractor_url = get_backend_urls() |
|
|
|
def extract_data_from_product_image(image_links, data_extractor_url): |
|
|
|
url = data_extractor_url + "extract" |
|
data = { |
|
"image_links": image_links |
|
} |
|
try: |
|
response = requests.post(url, json=data) |
|
if response.status_code == 200 or response.status_code == 201: |
|
print("POST Response:", response.json()) |
|
return response.json() |
|
else: |
|
print(f"POST Request failed with status code: {response.status_code}") |
|
return {} |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error occurred: {e}") |
|
return {} |
|
|
|
def get_product_data_from_db(product_name, data_extractor_url): |
|
|
|
url = data_extractor_url + "product" |
|
params = {"name": product_name} |
|
|
|
try: |
|
response = requests.get(url, params = params) |
|
|
|
if response.status_code == 200: |
|
print("GET Response:", response.json()) |
|
return response.json() |
|
else: |
|
print(f"GET Request failed with status code: {response.status_code}") |
|
return {} |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error occurred: {e}") |
|
return {} |
|
|
|
def get_product_list(product_name_by_user, data_extractor_url): |
|
url = data_extractor_url + "find_product" |
|
params = {"name": product_name_by_user} |
|
|
|
try: |
|
response = requests.get(url, params = params) |
|
|
|
if response.status_code == 200: |
|
print("GET Response:", response.json()) |
|
return response.json() |
|
else: |
|
print(f"GET Request failed with status code: {response.status_code}") |
|
return {} |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error occurred: {e}") |
|
return {} |
|
|
|
|
|
|
|
@st.cache_resource |
|
def initialize_assistants_and_vector_stores(): |
|
|
|
global client |
|
assistant1 = client.beta.assistants.create( |
|
name="Processing Level", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the processing level of food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
assistant2 = client.beta.assistants.create( |
|
name="Harmful Ingredients", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the ingredients in food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
assistant3 = client.beta.assistants.create( |
|
name="Misleading Claims", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the misleading claims about food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec") |
|
|
|
|
|
file_paths = ["Processing_Level.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store1.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch1.status) |
|
print(file_batch1.file_counts) |
|
|
|
|
|
vector_store2 = client.beta.vector_stores.create(name="Harmful Ingredients Vec") |
|
|
|
|
|
file_paths = ["Ingredients.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch2 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store2.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch2.status) |
|
print(file_batch2.file_counts) |
|
|
|
|
|
vector_store3 = client.beta.vector_stores.create(name="Misleading Claims Vec") |
|
|
|
|
|
file_paths = ["MisLeading_Claims.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch3 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store3.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch3.status) |
|
print(file_batch3.file_counts) |
|
|
|
|
|
assistant1 = client.beta.assistants.update( |
|
assistant_id=assistant1.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}}, |
|
) |
|
|
|
|
|
assistant2 = client.beta.assistants.update( |
|
assistant_id=assistant2.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store2.id]}}, |
|
) |
|
|
|
|
|
assistant3 = client.beta.assistants.update( |
|
assistant_id=assistant3.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store3.id]}}, |
|
) |
|
return assistant1, assistant2, assistant3 |
|
|
|
assistant1, assistant2, assistant3 = initialize_assistants_and_vector_stores() |
|
|
|
def analyze_processing_level(ingredients, brand_name, product_name, assistant_id): |
|
global debug_mode, client |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Categorize food product that has following ingredients: " + ', '.join(ingredients) + " into Group A, Group B, or Group C based on the document. The output must only be the group category name (Group A, Group B, or Group C) alongwith the reason behind assigning that respective category to the product. If the group category cannot be determined, output 'NOT FOUND'.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
|
|
message_content = messages[0].content[0].text |
|
annotations = message_content.annotations |
|
|
|
for index, annotation in enumerate(annotations): |
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
|
|
|
|
|
|
if debug_mode: |
|
print(message_content.value) |
|
processing_level_str = message_content.value |
|
return processing_level_str |
|
|
|
def analyze_harmful_ingredients(ingredients, brand_name, product_name, assistant_id): |
|
global debug_mode, client |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Provide detailed information about food product that has following ingredients: " + ', '.join(ingredients) + ". The output must be in JSON format: {<ingredient_name>: <information from the document>}. If information about an ingredient is not found in the documents, the value for that ingredient must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
message_content = messages[0].content[0].text |
|
annotations = message_content.annotations |
|
|
|
|
|
|
|
|
|
|
|
for index, annotation in enumerate(annotations): |
|
if file_citation := getattr(annotation, "file_citation", None): |
|
|
|
|
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
if debug_mode: |
|
ingredients_not_found_in_doc = [] |
|
print(message_content.value) |
|
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): |
|
if value.startswith("(NOT FOUND IN DOCUMENT)"): |
|
ingredients_not_found_in_doc.append(key) |
|
print(f"Ingredients not found in the harmful ingredients doc are {','.join(ingredients_not_found_in_doc)}") |
|
harmful_ingredient_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) |
|
|
|
harmful_ingredient_analysis_str = "" |
|
for key, value in harmful_ingredient_analysis.items(): |
|
harmful_ingredient_analysis_str += f"{key}: {value}\n" |
|
return harmful_ingredient_analysis_str |
|
|
|
def analyze_claims(claims, assistant_id): |
|
global debug_mode, client |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Provide detailed information about the food product with following claims: " + ', '.join(claims) + ". The output must be in JSON format: {<claim_name>: <information from the document>}. If information about a claim is not found in the documents, the value for that claim must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
|
|
message_content = messages[0].content[0].text |
|
|
|
|
|
annotations = message_content.annotations |
|
|
|
|
|
|
|
|
|
|
|
for index, annotation in enumerate(annotations): |
|
if file_citation := getattr(annotation, "file_citation", None): |
|
|
|
|
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
if debug_mode: |
|
claims_not_found_in_doc = [] |
|
print(message_content.value) |
|
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): |
|
if value.startswith("(NOT FOUND IN DOCUMENT)"): |
|
claims_not_found_in_doc.append(key) |
|
print(f"Claims not found in the doc are {','.join(claims_not_found_in_doc)}") |
|
claims_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) |
|
|
|
claims_analysis_str = "" |
|
for key, value in claims_analysis.items(): |
|
claims_analysis_str += f"{key}: {value}\n" |
|
|
|
return claims_analysis_str |
|
|
|
def generate_final_analysis(brand_name, product_name, processing_level, harmful_ingredient_analysis, claims_analysis): |
|
global debug_mode, client |
|
system_prompt = """You are provided with a detailed analysis of a food product. Your task is to generate actionable insights to help the user decide whether to consume the product, at what frequency, and identify any potential harms or benefits. Consider the context of consumption to ensure the advice is personalized and practical. |
|
|
|
Use the following criteria to generate your response: |
|
|
|
1. **Nutrition Analysis:** |
|
- How processed is the product? |
|
|
|
2. **Harmful Ingredients:** |
|
- Identify any harmful or questionable ingredients. |
|
|
|
3. **Misleading Claims:** |
|
- Are there any misleading claims made by the brand? |
|
|
|
Additionally, consider the following while generating insights: |
|
|
|
1. **Consumption Context:** |
|
- Is the product being consumed for health reasons or as a treat? |
|
- Could the consumer be overlooking hidden harms? |
|
- If the product is something they could consume daily, should they? |
|
- If they are consuming it daily, what potential harm are they not noticing? |
|
- If the product is intended for health purposes, are there concerns the user might miss? |
|
|
|
**Output:** |
|
- Recommend whether the product should be consumed or avoided. |
|
- If recommended, specify the appropriate frequency and intended functionality (e.g., treat vs. health). |
|
- Highlight any risks or benefits at that level of consumption.""" |
|
|
|
user_prompt = f""" |
|
Product Name: {brand_name} {product_name} |
|
|
|
Processing Level: |
|
{processing_level} |
|
|
|
Ingredient Analysis: |
|
{harmful_ingredient_analysis} |
|
|
|
Claims Analysis: |
|
{claims_analysis} |
|
""" |
|
if debug_mode: |
|
print(f"\nuser_prompt : \n {user_prompt}") |
|
|
|
completion = client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_prompt} |
|
] |
|
) |
|
|
|
return completion.choices[0].message.content |
|
|
|
|
|
def analyze_product(product_info_raw): |
|
global assistant1, assistant2, assistant3 |
|
|
|
if product_info_raw != "{}": |
|
product_info_from_db = json.loads(product_info_raw) |
|
brand_name = product_info_from_db.get("brandName", "") |
|
product_name = product_info_from_db.get("productName", "") |
|
ingredients_list = [ingredient["name"] for ingredient in product_info_from_db.get("ingredients", [])] |
|
claims_list = product_info_from_db.get("claims", []) |
|
|
|
if len(ingredients_list) > 0: |
|
processing_level = analyze_processing_level(ingredients_list, brand_name, product_name, assistant1.id) if ingredients_list else "" |
|
harmful_ingredient_analysis = analyze_harmful_ingredients(ingredients_list, brand_name, product_name, assistant2.id) if ingredients_list else "" |
|
if len(claims_list) > 0: |
|
claims_analysis = analyze_claims(claims_list, assistant3.id) if claims_list else "" |
|
|
|
final_analysis = generate_final_analysis(brand_name,product_name,processing_level,harmful_ingredient_analysis,claims_analysis) |
|
return final_analysis |
|
else: |
|
return "I'm sorry, product information could not be extracted from the url." |
|
|
|
|
|
|
|
if 'messages' not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
def chatbot_response(image_urls_str, product_name_by_user, data_extractor_url, extract_info = True): |
|
|
|
processing_level = "" |
|
harmful_ingredient_analysis = "" |
|
claims_analysis = "" |
|
image_urls = [] |
|
if product_name_by_user != "": |
|
similar_product_list_json = get_product_list(product_name_by_user, data_extractor_url) |
|
if similar_product_list_json != {} and extract_info == False: |
|
with st.spinner("Fetching product information from our database... This may take a moment."): |
|
print(f"similar_product_list_json : {similar_product_list_json}") |
|
similar_product_list = similar_product_list_json['product_list'] |
|
return similar_product_list, "Product list found from our database" |
|
|
|
elif extract_info == True: |
|
with st.spinner("Analyzing the product... This may take a moment."): |
|
product_info_raw = get_product_data_from_db(product_name_by_user, data_extractor_url) |
|
print(f"DEBUG product_info_raw : {product_info_raw}") |
|
final_analysis = analyze_product(product_info_raw) |
|
return [], final_analysis |
|
|
|
else: |
|
return [], "Product not found in our database." |
|
|
|
elif "http:/" in image_urls_str.lower() or "https:/" in image_urls_str.lower(): |
|
|
|
if "," not in image_urls_str: |
|
image_urls.append(image_urls_str) |
|
else: |
|
for url in image_urls_str.split(","): |
|
if "http:/" in url.lower() or "https:/" in url.lower(): |
|
image_urls.append(url) |
|
|
|
with st.spinner("Analyzing the product... This may take a moment."): |
|
product_info_raw = extract_data_from_product_image(image_urls, data_extractor_url) |
|
print(f"DEBUG product_info_raw : {product_info_raw}") |
|
final_analysis = analyze_product(product_info_raw) |
|
return [], final_analysis |
|
|
|
else: |
|
return [], "I'm here to analyze food products. Please provide an image URL (Example : http://example.com/image.jpg) or product name (Example : Harvest Gold Bread)" |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
if "product_selected" not in st.session_state: |
|
st.session_state.product_selected = False |
|
|
|
st.title("ConsumeWise") |
|
|
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if product_name_by_user := st.chat_input("Enter name of the product you like to analyze? (Example : Marie Gold Biscuit)"): |
|
st.session_state.messages.append({"role": "user", "content": product_name_by_user}) |
|
with st.chat_message("user"): |
|
st.markdown(product_name_by_user) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
full_response = "" |
|
|
|
|
|
similar_products, msg = chatbot_response("", product_name_by_user, data_extractor_url, extract_info=False) |
|
for product in similar_products: |
|
full_response += product + " " |
|
message_placeholder.markdown(full_response + "▌") |
|
if len(similar_products) == 0: |
|
full_response = msg |
|
message_placeholder.markdown(full_response) |
|
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|
|
|
|
if similar_products and not st.session_state.product_selected: |
|
choice = st.radio("Select a product:", similar_products + ["None of the above"]) |
|
if choice != "None of the above": |
|
st.session_state.product_selected = True |
|
with st.chat_message("user"): |
|
st.markdown(f"I choose: {choice}") |
|
st.session_state.messages.append({"role": "user", "content": f"I choose: {choice}"}) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
_, detailed_response = chatbot_response("", choice, data_extractor_url, extract_info=True) |
|
st.markdown(detailed_response) |
|
st.session_state.messages.append({"role": "assistant", "content": detailed_response}) |
|
elif choice == "None of the above": |
|
st.text_input("Please provide image URLs separated by commas:") |
|
|
|
elif len(similar_products) == 0: |
|
|
|
|
|
image_urls_str = st.text_input("Please provide image URLs separated by commas:") |
|
|
|
|
|
st.session_state.messages.append({"role": "user", "content": image_urls_str}) |
|
|
|
with st.chat_message("user"): |
|
st.markdown(image_urls_str) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
_, detailed_response = chatbot_response(image_urls_str, "", data_extractor_url, extract_info=True) |
|
st.markdown(detailed_response) |
|
st.session_state.messages.append({"role": "assistant", "content": detailed_response}) |
|
|
|
|
|
if st.button("Clear Chat History"): |
|
st.session_state.messages = [] |
|
st.session_state.product_selected = False |
|
st.rerun() |