Spaces:
Runtime error
Runtime error
import os | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
from PIL import ImageDraw | |
import streamlit as st | |
from st_clickable_images import clickable_images | |
st.set_page_config(layout="wide") | |
import torch | |
from docquery.pipeline import get_pipeline | |
from docquery.document import load_bytes, load_document | |
def ensure_list(x): | |
if isinstance(x, list): | |
return x | |
else: | |
return [x] | |
CHECKPOINTS = { | |
"LayoutLMv1 🦉": "impira/layoutlm-document-qa", | |
"Donut 🍩": "naver-clova-ix/donut-base-finetuned-docvqa", | |
} | |
def construct_pipeline(model): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
ret = get_pipeline(checkpoint=CHECKPOINTS[model], device=device) | |
return ret | |
def run_pipeline(model, question, document, top_k): | |
pipeline = construct_pipeline(model) | |
return pipeline(question=question, **document.context, top_k=top_k) | |
# TODO: Move into docquery | |
# TODO: Support words past the first page (or window?) | |
def lift_word_boxes(document): | |
return document.context["image"][0][1] | |
def expand_bbox(word_boxes): | |
if len(word_boxes) == 0: | |
return None | |
min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) | |
return [min(min_x), min(min_y), max(max_x), max(max_y)] | |
# LayoutLM boxes are normalized to 0, 1000 | |
def normalize_bbox(box, width, height): | |
pct = [c / 1000 for c in box] | |
return [pct[0] * width, pct[1] * height, pct[2] * width, pct[3] * height] | |
st.markdown("# DocQuery: Query Documents w/ NLP") | |
if "document" not in st.session_state: | |
st.session_state["document"] = None | |
if "last_clicked" not in st.session_state: | |
st.session_state["last_clicked"] = None | |
input_col, model_col = st.columns(2) | |
with input_col: | |
input_type = st.radio( | |
"Pick an input type", ["Upload", "URL", "Examples"], horizontal=True | |
) | |
with model_col: | |
model_type = st.radio("Pick a model", list(CHECKPOINTS.keys()), horizontal=True) | |
def load_file_cb(): | |
if st.session_state.file_input is None: | |
return | |
file = st.session_state.file_input | |
with loading_placeholder: | |
with st.spinner("Processing..."): | |
document = load_bytes(file, file.name) | |
_ = document.context | |
st.session_state.document = document | |
def load_url_cb(): | |
if st.session_state.url_input is None: | |
return | |
url = st.session_state.url_input | |
with loading_placeholder: | |
with st.spinner("Downloading..."): | |
document = load_document(url) | |
with st.spinner("Processing..."): | |
_ = document.context | |
st.session_state.document = document | |
examples = [ | |
( | |
"https://templates.invoicehome.com/invoice-template-us-neat-750px.png", | |
"What is the invoice number?", | |
), | |
( | |
"https://miro.medium.com/max/787/1*iECQRIiOGTmEFLdWkVIH2g.jpeg", | |
"What is the purchase amount?", | |
), | |
( | |
"https://www.accountingcoach.com/wp-content/uploads/2013/10/[email protected]", | |
"What are net sales for 2020?", | |
), | |
] | |
imgs_clicked = [] | |
if input_type == "Upload": | |
file = st.file_uploader( | |
"Upload a PDF or Image document", key="file_input", on_change=load_file_cb | |
) | |
elif input_type == "URL": | |
url = st.text_input("URL", "", key="url_input", on_change=load_url_cb) | |
elif input_type == "Examples": | |
example_cols = st.columns(len(examples)) | |
for (i, (path, question)) in enumerate(examples): | |
with example_cols[i]: | |
imgs_clicked.append( | |
clickable_images( | |
[path], | |
div_style={ | |
"display": "flex", | |
"justify-content": "center", | |
"flex-wrap": "wrap", | |
"cursor": "pointer", | |
}, | |
img_style={"margin": "5px", "height": "200px"}, | |
) | |
) | |
st.markdown( | |
f"<p style='text-align: center'>{question}</p>", | |
unsafe_allow_html=True, | |
) | |
print(imgs_clicked) | |
imgs_clicked = [-1] * len(imgs_clicked) | |
# clicked = clickable_images( | |
# [x[0] for x in examples], | |
# titles=[x[1] for x in examples], | |
# div_style={"display": "flex", "justify-content": "center", "flex-wrap": "wrap"}, | |
# img_style={"margin": "5px", "height": "200px"}, | |
# ) | |
# | |
# st.markdown(f"Image #{clicked} clicked" if clicked > -1 else "No image clicked") | |
question = st.text_input("QUESTION", "", key="question") | |
document = st.session_state.document | |
loading_placeholder = st.empty() | |
if document is not None: | |
col1, col2 = st.columns(2) | |
image = document.preview | |
question = st.session_state.question | |
colors = ["blue", "red", "green"] | |
if document is not None and question is not None and len(question) > 0: | |
col2.header(f"Answers ({model_type})") | |
with col2: | |
answers_placeholder = st.container() | |
answers_loading_placeholder = st.container() | |
with answers_loading_placeholder: | |
# Run this (one-time) expensive operation outside of the processing | |
# question placeholder | |
with st.spinner("Constructing pipeline..."): | |
construct_pipeline(model_type) | |
with st.spinner("Processing question..."): | |
predictions = run_pipeline( | |
model=model_type, question=question, document=document, top_k=1 | |
) | |
with answers_placeholder: | |
image = image.copy() | |
draw = ImageDraw.Draw(image) | |
for i, p in enumerate(ensure_list(predictions)): | |
col2.markdown(f"#### { p['answer'] }: ({round(p['score'] * 100, 1)}%)") | |
if "start" in p and "end" in p: | |
x1, y1, x2, y2 = normalize_bbox( | |
expand_bbox( | |
lift_word_boxes(document)[p["start"] : p["end"] + 1] | |
), | |
image.width, | |
image.height, | |
) | |
draw.rectangle(((x1, y1), (x2, y2)), outline=colors[i], width=3) | |
if document is not None: | |
col1.image(image, use_column_width="auto") | |
"DocQuery uses LayoutLMv1 fine-tuned on DocVQA, a document visual question answering dataset, as well as SQuAD, which boosts its English-language comprehension. To use it, simply upload an image or PDF, type a question, and click 'submit', or click one of the examples to load them." | |
"[Github Repo](https://github.com/impira/docquery)" | |