Spaces:
Running
Running
import os | |
from PIL import Image | |
import sys | |
from omegaconf import OmegaConf | |
import requests | |
from typing import Tuple | |
from bs4 import BeautifulSoup | |
import streamlit as st | |
from streamlit_pills import pills | |
from dotenv import load_dotenv | |
load_dotenv(override=True) | |
from pydantic import Field, BaseModel | |
from vectara_agent.agent import Agent, AgentStatusType | |
from vectara_agent.tools import ToolsFactory | |
from vectara_agent.tools_catalog import summarize_text | |
initial_prompt = "How can I help you today?" | |
get_headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Accept-Encoding": "gzip, deflate", | |
"Connection": "keep-alive", | |
} | |
def create_tools(cfg): | |
class QueryHackerNews(BaseModel): | |
query: str = Field(..., description="The user query.") | |
tools_factory = ToolsFactory(vectara_api_key=cfg.api_key, | |
vectara_customer_id=cfg.customer_id, | |
vectara_corpus_id=cfg.corpus_id) | |
ask_hackernews = tools_factory.create_rag_tool( | |
tool_name = "ask_hackernews", | |
tool_description = """ | |
Responds to query based on information and stories in hacker news from the last 6-9 months. | |
""", | |
tool_args_schema = QueryHackerNews, | |
reranker = "multilingual_reranker_v1", rerank_k = 100, | |
n_sentences_before = 2, n_sentences_after = 2, lambda_val = 0.005, | |
summary_num_results = 10, | |
vectara_summarizer = 'vectara-summary-ext-24-05-med-omni', | |
include_citations = True, | |
) | |
def get_top_stories( | |
n_stories: int = Field(default=10, description="The number of top stories to return.") | |
) -> list[str]: | |
""" | |
Get the top stories from hacker news. | |
Returns a list of story IDS for the top stories right now. These are the top stories on hacker news. | |
""" | |
db_url = 'https://hacker-news.firebaseio.com/v0/' | |
top_stories = requests.get(f"{db_url}topstories.json").json() | |
return top_stories[:n_stories] | |
def get_show_stories( | |
n_stories: int = Field(default=10, description="The number of top SHOW HN stories to return.") | |
) -> list[str]: | |
""" | |
Get the top SHOW HN stories from hacker news. | |
Returns a list of story IDS for the top SHOW HN stories right now. These are stories where users show their projects. | |
""" | |
db_url = 'https://hacker-news.firebaseio.com/v0/' | |
top_stories = requests.get(f"{db_url}showstories.json").json() | |
return top_stories[:n_stories] | |
def get_ask_stories( | |
n_stories: int = Field(default=10, description="The number of top ASK HN stories to return.") | |
) -> list[str]: | |
""" | |
Get the top ASK HN stories from hacker news. | |
Returns a list of story IDS for the top ASK HN stories right now. These are stories where users ask questions to the community. | |
""" | |
db_url = 'https://hacker-news.firebaseio.com/v0/' | |
top_stories = requests.get(f"{db_url}askstories.json").json() | |
return top_stories[:n_stories] | |
def get_story_details( | |
story_id: str = Field(..., description="The story ID.") | |
) -> Tuple[str, str]: | |
""" | |
Get the title of a story from hacker news. | |
Returns: | |
- The title of the story (str) | |
- The main URL of the story (str) | |
- The external link pointed to in the story (str) | |
""" | |
db_url = 'https://hacker-news.firebaseio.com/v0/' | |
story = requests.get(f"{db_url}item/{story_id}.json").json() | |
story_url = f'https://news.ycombinator.com/item?id={story_id}' | |
return story['title'], story_url, story['url'], | |
def get_story_text( | |
story_id: str = Field(..., description="The story ID.") | |
) -> str: | |
""" | |
Get the text of the story from hacker news (original text + all comments) | |
Returns the extracted text of the story as a string. | |
""" | |
url = f'https://news.ycombinator.com/item?id={story_id}' | |
html = requests.get(url, headers=get_headers).text | |
soup = BeautifulSoup(html, 'html5lib') | |
for element in soup.find_all(['script', 'style']): | |
element.decompose() | |
text = soup.get_text(" ", strip=True).replace('\n', ' ') | |
return text | |
def whats_new( | |
n_stories: int = Field(default=10, description="The number of new stories to return.") | |
) -> list[str]: | |
""" | |
Provides a succint summary of what is new in the hackernews community | |
by summarizing the content and comments of top stories. | |
Returns a string with the summary. | |
""" | |
stories = get_top_stories(n_stories) | |
texts = [get_story_text(story_id) for story_id in stories[:n_stories]] | |
all_stories = '---------\n\n'.join(texts) | |
return summarize_text(all_stories) | |
return ( | |
tools_factory.get_tools( | |
[ | |
get_top_stories, | |
get_show_stories, | |
get_ask_stories, | |
get_story_details, | |
get_story_text, | |
whats_new | |
] | |
) + | |
tools_factory.standard_tools() + | |
tools_factory.guardrail_tools() + | |
[ask_hackernews] | |
) | |
def initialize_agent(_cfg): | |
if 'agent' in st.session_state: | |
return st.session_state.agent | |
bot_instructions = """ | |
- You are a helpful assistant, with expertise in answering user questions about Hacker News stories and comments. | |
- Never discuss politics, and always respond politely. | |
- This is important: when you include links to Hacker News stories, use the actual title of the story as the link's displayed text. | |
Don't use text like "Source" which doesn't tell the user what the link is about. | |
- Don't include external links in your responses unless the user asks for them. | |
- Give slight preference to newer stories when answering questions. | |
""" | |
def update_func(status_type: AgentStatusType, msg: str): | |
if status_type != AgentStatusType.AGENT_UPDATE: | |
output = f"{status_type.value} - {msg}" | |
st.session_state.log_messages.append(output) | |
agent = Agent( | |
tools=create_tools(_cfg), | |
topic="hacker news", | |
custom_instructions=bot_instructions, | |
update_func=update_func | |
) | |
agent.report() | |
return agent | |
def toggle_logs(): | |
st.session_state.show_logs = not st.session_state.show_logs | |
def show_example_questions(): | |
if len(st.session_state.example_messages) > 0 and st.session_state.first_turn: | |
selected_example = pills("Queries to Try:", st.session_state.example_messages, index=None) | |
if selected_example: | |
st.session_state.ex_prompt = selected_example | |
st.session_state.first_turn = False | |
return True | |
return False | |
def launch_bot(): | |
def reset(): | |
st.session_state.messages = [{"role": "assistant", "content": initial_prompt, "avatar": "π¦"}] | |
st.session_state.thinking_message = "Agent at work..." | |
st.session_state.log_messages = [] | |
st.session_state.prompt = None | |
st.session_state.first_turn = True | |
st.session_state.show_logs = False | |
st.set_page_config(page_title="Hacker News Bot", layout="wide") | |
if 'cfg' not in st.session_state: | |
cfg = OmegaConf.create({ | |
'customer_id': str(os.environ['VECTARA_CUSTOMER_ID']), | |
'corpus_id': str(os.environ['VECTARA_CORPUS_ID']), | |
'api_key': str(os.environ['VECTARA_API_KEY']), | |
'examples': os.environ.get('QUERY_EXAMPLES', None) | |
}) | |
st.session_state.cfg = cfg | |
st.session_state.ex_prompt = None | |
example_messages = [example.strip() for example in cfg.examples.split(",")] if cfg.examples else [] | |
st.session_state.example_messages = [em for em in example_messages if len(em)>0] | |
reset() | |
cfg = st.session_state.cfg | |
if 'agent' not in st.session_state: | |
st.session_state.agent = initialize_agent(cfg) | |
# left side content | |
with st.sidebar: | |
image = Image.open('Vectara-logo.png') | |
st.image(image, width=175) | |
st.markdown("## Welcome to the hacker news assistant demo.\n\n\n") | |
st.markdown("\n\n") | |
bc1, _ = st.columns([1, 1]) | |
with bc1: | |
if st.button('Start Over'): | |
reset() | |
st.markdown("---") | |
st.markdown( | |
"## How this works?\n" | |
"This app was built with [Vectara](https://vectara.com).\n\n" | |
"It demonstrates the use of Agentic RAG functionality with Vectara" | |
) | |
st.markdown("---") | |
if "messages" not in st.session_state.keys(): | |
reset() | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"], avatar=message["avatar"]): | |
st.write(message["content"]) | |
example_container = st.empty() | |
with example_container: | |
if show_example_questions(): | |
example_container.empty() | |
st.rerun() | |
# User-provided prompt | |
if st.session_state.ex_prompt: | |
prompt = st.session_state.ex_prompt | |
else: | |
prompt = st.chat_input() | |
if prompt: | |
st.session_state.messages.append({"role": "user", "content": prompt, "avatar": 'π§βπ»'}) | |
st.session_state.prompt = prompt # Save the prompt in session state | |
st.session_state.log_messages = [] | |
st.session_state.show_logs = False | |
with st.chat_message("user", avatar='π§βπ»'): | |
print(f"Starting new question: {prompt}\n") | |
st.write(prompt) | |
st.session_state.ex_prompt = None | |
# Generate a new response if last message is not from assistant | |
if st.session_state.prompt: | |
with st.chat_message("assistant", avatar='π€'): | |
with st.spinner(st.session_state.thinking_message): | |
res = st.session_state.agent.chat(st.session_state.prompt) | |
res = res.replace('$', '\\$') # escape dollar sign for markdown | |
message = {"role": "assistant", "content": res, "avatar": 'π€'} | |
st.session_state.messages.append(message) | |
st.markdown(res) | |
st.session_state.ex_prompt = None | |
st.session_state.prompt = None | |
st.rerun() | |
log_placeholder = st.empty() | |
with log_placeholder.container(): | |
if st.session_state.show_logs: | |
st.button("Hide Logs", on_click=toggle_logs) | |
for msg in st.session_state.log_messages: | |
st.text(msg) | |
else: | |
if len(st.session_state.log_messages) > 0: | |
st.button("Show Logs", on_click=toggle_logs) | |
sys.stdout.flush() | |
if __name__ == "__main__": | |
launch_bot() | |