from queue import SimpleQueue from langchain.callbacks.base import BaseCallbackHandler job_done = object() # signals the processing is done # https://gist.github.com/mortymike/70711b028311681e5f3c6511031d5d43 class StreamingGradioCallbackHandler(BaseCallbackHandler): """Callback handler for streaming. Only works with LLMs that support streaming.""" def __init__(self, q: SimpleQueue): self.q = q def on_llm_new_token(self, token: str, **kwargs) -> None: self.q.put(token) def on_llm_end(self, *args, **kwargs) -> None: self.q.put(job_done)