from haystack.components.generators import OpenAIGenerator from haystack.utils import Secret from haystack.components.builders.prompt_builder import PromptBuilder from haystack.components.routers import ConditionalRouter from haystack import Pipeline from haystack.components.writers import DocumentWriter from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder from haystack.components.preprocessors import DocumentSplitter from haystack.components.converters.txt import TextFileToDocument from haystack.components.preprocessors import DocumentCleaner from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack.components.retrievers import InMemoryEmbeddingRetriever import gradio as gr embedding_model = "dunzhang/stella_en_400M_v5" ######################## ####### Indexing ####### ######################## # In memory version for now document_store = InMemoryDocumentStore(embedding_similarity_function="cosine") converter = TextFileToDocument() cleaner = DocumentCleaner() splitter = DocumentSplitter(split_by="word", split_length=200, split_overlap=100) embedder = SentenceTransformersDocumentEmbedder(model=embedding_model, trust_remote_code=True) writer = DocumentWriter(document_store=document_store) indexing = Pipeline() indexing.add_component("converter", converter) indexing.add_component("cleaner", cleaner) indexing.add_component("splitter", splitter) indexing.add_component("embedder", embedder) indexing.add_component("writer", writer) indexing.connect("converter", "cleaner") indexing.connect("cleaner", "splitter") indexing.connect("splitter", "embedder") indexing.connect("embedder", "writer") indexing.run({"sources": ["knowledge-plain.txt"]}) ################################## ####### Answering pipeline ####### ################################## no_answer_message = ( "I'm not allowed to answer this question. Please ask something related to " "APIs access in accordance DSA’s transparency and data-sharing provisions. " "Is there anything else I can do for you? " ) relevance_prompt_template = """ Classify whether this user is asking for something related to social media APIs, the Digital Services Act (DSA), or any topic related to online platforms’ compliance with legal and data-sharing frameworks. Relevant topics include social media API access, data transparency, compliance with DSA provisions, and online platform regulations. Here is their message: {{query}} Here are the two previous messages. ONLY refer to these if the above message refers previous ones. {% for message in user_history[-2:] %} * {{message["content"]}} {% endfor %} If the request is related to these topics, respond “YES”. If it is off-topic (e.g., unrelated to APIs, the DSA, or legal frameworks), respond “NO”.""" routes = [ { "condition": "{{'YES' in replies[0]}}", "output": "{{query}}", "output_name": "query", "output_type": str, }, { "condition": "{{'NO' in replies[0]}}", "output": no_answer_message, "output_name": "no_answer", "output_type": str, } ] query_prompt_template = """Conversation history: {{conv_history}} Here is what the user has requested: {{query}} Reply to the question with a short paragraph according to the following documents: {% for document in documents %} * {{document.content}} {% endfor %} Do not mention the documents in your answer, present it as your own knowledge. """ prompt_builder = PromptBuilder(template=relevance_prompt_template) llm = OpenAIGenerator( api_key=Secret.from_env_var("OPENAI_API_KEY"), model="gpt-4o-mini", generation_kwargs = {"max_tokens": 8192} ) router = ConditionalRouter(routes=routes) embedder = SentenceTransformersTextEmbedder(model=embedding_model) # Again: in memory for now retriever = InMemoryEmbeddingRetriever(document_store) prompt_builder2 = PromptBuilder(template=query_prompt_template) llm2 = OpenAIGenerator( api_key=Secret.from_env_var("OPENAI_API_KEY"), model="gpt-4o-mini", generation_kwargs = {"max_tokens": 8192} ) answer_query = Pipeline() answer_query.add_component("prompt_builder", prompt_builder) answer_query.add_component("llm", llm) answer_query.add_component("router", router) answer_query.add_component("embedder", embedder) answer_query.add_component("retriever", retriever) answer_query.add_component("prompt_builder2", prompt_builder2) answer_query.add_component("llm2", llm2) answer_query.connect("prompt_builder", "llm") answer_query.connect("llm", "router") answer_query.connect("router.query", "embedder") answer_query.connect("embedder", "retriever") answer_query.connect("retriever", "prompt_builder2") answer_query.connect("prompt_builder2", "llm2") answer_query.warm_up() ########################## ####### Gradio app ####### ########################## def chat(message, history): """ Chat function for Gradio. Uses the pipeline to produce next answer. """ conv_history = "\n\n".join([f"{message['role']}: {message['content']}" for message in history[-2:]]) user_history = [message for message in history if message["role"] == "user"] results = answer_query.run({"user_history": user_history, "query": message, "conv_history": conv_history}) if "llm2" in results: answer = results["llm2"]["replies"][0] elif "router" in results and "no_answer" in results["router"]: answer = results["router"]["no_answer"] else: answer = "Sorry, a mistake occured" return answer if __name__ == "__main__": gr.ChatInterface(chat, type="messages").launch()