from omegaconf import OmegaConf import streamlit as st import os from PIL import Image import sys import datetime import requests from dotenv import load_dotenv from typing import Tuple from bs4 import BeautifulSoup from pydantic import Field, BaseModel from vectara_agent.agent import Agent, AgentStatusType from vectara_agent.tools import ToolsFactory from vectara_agent.tools_catalog import summarize_text initial_prompt = "How can I help you today?" load_dotenv(override=True) get_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", } def create_tools(cfg): class QueryHackerNews(BaseModel): query: str = Field(..., description="The user query.") tools_factory = ToolsFactory(vectara_api_key=cfg.api_key, vectara_customer_id=cfg.customer_id, vectara_corpus_id=cfg.corpus_id) ask_hackernews_semantic = tools_factory.create_rag_tool( tool_name = "ask_hackernews_semantic", tool_description = """ Responds to query based on information in hacker news from the last 6 months. Performs a semantic search to find relevant information. Use this tool to perform pure semantic search. """, tool_args_schema = QueryHackerNews, reranker = "multilingual_reranker_v1", rerank_k = 100, n_sentences_before = 2, n_sentences_after = 2, lambda_val = 0.0, summary_num_results = 10, vectara_summarizer = 'vectara-summary-ext-24-05-med-omni', include_citations = True, ) ask_hackernews_hybrid = tools_factory.create_rag_tool( tool_name = "ask_hackernews_keyword", tool_description = """ Responds to query based on information in hacker news from the last 6 months performs a hybrid search (both semantic and keyword) to find relevant information. Use this tool when some amount of keyword search is expected to work better than semantic search, For example, when you are looking for specific keywords or use rare words in the query. """, tool_args_schema = QueryHackerNews, reranker = "multilingual_reranker_v1", rerank_k = 100, n_sentences_before = 2, n_sentences_after = 2, lambda_val = 0.1, summary_num_results = 10, vectara_summarizer = 'vectara-summary-ext-24-05-med-omni', include_citations = True, ) def get_top_stories( n_stories: int = Field(default=10, description="The number of top stories to return.") ) -> list[str]: """ Get the top stories from hacker news. Returns a list of story IDS for the top stories right now """ db_url = 'https://hacker-news.firebaseio.com/v0/' top_stories = requests.get(f"{db_url}topstories.json").json() return top_stories[:n_stories] def get_show_stories( n_stories: int = Field(default=10, description="The number of top SHOW HN stories to return.") ) -> list[str]: """ Get the top SHOW HN stories from hacker news. Returns a list of story IDS for the top SHOW HN stories right now """ db_url = 'https://hacker-news.firebaseio.com/v0/' top_stories = requests.get(f"{db_url}showstories.json").json() return top_stories[:n_stories] def get_ask_stories( n_stories: int = Field(default=10, description="The number of top ASK HN stories to return.") ) -> list[str]: """ Get the top ASK HN stories from hacker news. Returns a list of story IDS for the top ASK HN stories right now """ db_url = 'https://hacker-news.firebaseio.com/v0/' top_stories = requests.get(f"{db_url}askstories.json").json() return top_stories[:n_stories] def get_story_details( story_id: str = Field(..., description="The story ID.") ) -> Tuple[str, str]: """ Get the title of a story from hacker news. Returns: - The title of the story (str) - The main URL of the story (str) - The external link pointed to in the story (str) """ db_url = 'https://hacker-news.firebaseio.com/v0/' story = requests.get(f"{db_url}item/{story_id}.json").json() story_url = f'https://news.ycombinator.com/item?id={story_id}' return story['title'], story_url, story['url'], def get_story_text( story_id: str = Field(..., description="The story ID.") ) -> str: """ Get the text of the story from hacker news (original text + all comments) Returns the extracted text of the story as a string. """ url = f'https://news.ycombinator.com/item?id={story_id}' html = requests.get(url, headers=get_headers).text soup = BeautifulSoup(html, 'html5lib') for element in soup.find_all(['script', 'style']): element.decompose() text = soup.get_text(" ", strip=True).replace('\n', ' ') return text def whats_new( n_stories: int = Field(default=10, description="The number of new stories to return.") ) -> list[str]: """ Provides a succint summary of what is new in the hackernews community by summarizing the content and comments of top stories. Returns a string with the summary. """ stories = get_top_stories(n_stories) texts = [get_story_text(story_id) for story_id in stories[:n_stories]] all_stories = '---------\n\n'.join(texts) return summarize_text(all_stories) return ( tools_factory.get_tools( [ get_top_stories, get_show_stories, get_ask_stories, get_story_details, get_story_text, whats_new ] ) + tools_factory.standard_tools() + tools_factory.guardrail_tools() + [ask_hackernews_semantic, ask_hackernews_hybrid] ) def initialize_agent(_cfg): bot_instructions = """ - You are a helpful assistant, with expertise in answering user questions about Hacker News stories and comments. - Never discuss politics, and always respond politely. """ def update_func(status_type: AgentStatusType, msg: str): if status_type != AgentStatusType.AGENT_UPDATE: output = f"{status_type.value} - {msg}" st.session_state.log_messages.append(output) agent = Agent( tools=create_tools(_cfg), topic="hacker news", custom_instructions=bot_instructions, update_func=update_func ) return agent def toggle_logs(): st.session_state.show_logs = not st.session_state.show_logs def launch_bot(): def reset(): st.session_state.messages = [{"role": "assistant", "content": initial_prompt, "avatar": "🦖"}] st.session_state.thinking_message = "Agent at work..." st.session_state.log_messages = [] st.session_state.prompt = None st.session_state.show_logs = False st.set_page_config(page_title="Hacker News Bot", layout="wide") if 'cfg' not in st.session_state: cfg = OmegaConf.create({ 'customer_id': str(os.environ['VECTARA_CUSTOMER_ID']), 'corpus_id': str(os.environ['VECTARA_CORPUS_ID']), 'api_key': str(os.environ['VECTARA_API_KEY']), }) st.session_state.cfg = cfg reset() cfg = st.session_state.cfg if 'agent' not in st.session_state: st.session_state.agent = initialize_agent(cfg) # left side content with st.sidebar: image = Image.open('Vectara-logo.png') st.image(image, width=250) st.markdown("## Welcome to the hacker news assistant demo.\n\n\n") st.markdown("\n\n") bc1, _ = st.columns([1, 1]) with bc1: if st.button('Start Over'): reset() st.markdown("---") st.markdown( "## How this works?\n" "This app was built with [Vectara](https://vectara.com).\n\n" "It demonstrates the use of Agentic RAG functionality with Vectara" ) st.markdown("---") if "messages" not in st.session_state.keys(): reset() # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"], avatar=message["avatar"]): st.write(message["content"]) # User-provided prompt if prompt := st.chat_input(): st.session_state.messages.append({"role": "user", "content": prompt, "avatar": '🧑‍💻'}) st.session_state.prompt = prompt # Save the prompt in session state st.session_state.log_messages = [] st.session_state.show_logs = False with st.chat_message("user", avatar='🧑‍💻'): print(f"Starting new question: {prompt}\n") st.write(prompt) # Generate a new response if last message is not from assistant if st.session_state.prompt: with st.chat_message("assistant", avatar='🤖'): with st.spinner(st.session_state.thinking_message): res = st.session_state.agent.chat(st.session_state.prompt) res = res.replace('$', '\\$') # escape dollar sign for markdown message = {"role": "assistant", "content": res, "avatar": '🤖'} st.session_state.messages.append(message) st.markdown(res) st.session_state.prompt = None log_placeholder = st.empty() with log_placeholder.container(): if st.session_state.show_logs: st.button("Hide Logs", on_click=toggle_logs) for msg in st.session_state.log_messages: st.text(msg) else: if len(st.session_state.log_messages) > 0: st.button("Show Logs", on_click=toggle_logs) sys.stdout.flush() if __name__ == "__main__": launch_bot()