import spaces
import streamlit as st
from PIL import Image
import torch
from transformers import (
DonutProcessor,
VisionEncoderDecoderModel,
LayoutLMv3Processor,
LayoutLMv3ForSequenceClassification,
AutoProcessor,
AutoModelForCausalLM,
AutoModelForVisualQuestionAnswering
)
from ultralytics import YOLO
import io
import base64
import json
from datetime import datetime
import os
import logging
# Add this near the top of the file, after imports
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@st.cache_resource
def load_model(model_name):
"""Load the selected model and processor"""
try:
if model_name == "OmniParser":
try:
# Load model directly using official implementation
processor = AutoProcessor.from_pretrained(
"microsoft/OmniParser",
trust_remote_code=True
)
model = AutoModelForVisualQuestionAnswering.from_pretrained(
"microsoft/OmniParser",
trust_remote_code=True,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
if torch.cuda.is_available():
model = model.to("cuda")
st.success("Successfully loaded OmniParser model")
return {
'processor': processor,
'model': model
}
except Exception as e:
st.error(f"Failed to load OmniParser from HuggingFace Hub: {str(e)}")
logger.error(f"OmniParser loading error: {str(e)}", exc_info=True)
return None
elif model_name == "Donut":
processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base")
model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base")
# Configure Donut specific parameters
model.config.decoder_start_token_id = processor.tokenizer.bos_token_id
model.config.pad_token_id = processor.tokenizer.pad_token_id
model.config.vocab_size = len(processor.tokenizer)
return {'model': model, 'processor': processor}
elif model_name == "LayoutLMv3":
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base")
model = LayoutLMv3ForSequenceClassification.from_pretrained("microsoft/layoutlmv3-base")
return {'model': model, 'processor': processor}
else:
raise ValueError(f"Unknown model name: {model_name}")
except Exception as e:
st.error(f"Error loading model {model_name}: {str(e)}")
logger.error(f"Error details: {str(e)}", exc_info=True)
return None
@spaces.GPU
@torch.inference_mode()
def analyze_document(image, model_name, models_dict):
"""Analyze document using selected model"""
try:
if models_dict is None:
return {"error": "Model failed to load", "type": "model_error"}
if model_name == "OmniParser":
# Process image with OmniParser
inputs = models_dict['processor'](
images=image,
return_tensors="pt",
)
if torch.cuda.is_available():
inputs = {k: v.to("cuda") if hasattr(v, "to") else v
for k, v in inputs.items()}
# Generate outputs
outputs = models_dict['model'](**inputs)
# Process results
# The exact processing will depend on the model's output format
results = {
"predictions": outputs.logits.softmax(-1).tolist(),
"detected_elements": len(outputs.logits[0]),
"model_output": {
k: v.tolist() if hasattr(v, "tolist") else str(v)
for k, v in outputs.items()
if k != "last_hidden_state" # Skip large tensors
}
}
return results
elif model_name == "Donut":
model = models_dict['model']
processor = models_dict['processor']
# Process image with Donut
pixel_values = processor(image, return_tensors="pt").pixel_values
task_prompt = "analyze the document and extract information"
decoder_input_ids = processor.tokenizer(
task_prompt,
add_special_tokens=False,
return_tensors="pt"
).input_ids
outputs = model.generate(
pixel_values,
decoder_input_ids=decoder_input_ids,
max_length=512,
early_stopping=True,
pad_token_id=processor.tokenizer.pad_token_id,
eos_token_id=processor.tokenizer.eos_token_id,
use_cache=True,
num_beams=4,
bad_words_ids=[[processor.tokenizer.unk_token_id]],
return_dict_in_generate=True
)
sequence = processor.batch_decode(outputs.sequences)[0]
sequence = sequence.replace(task_prompt, "").replace("", "").strip()
try:
result = json.loads(sequence)
except json.JSONDecodeError:
result = {"raw_text": sequence}
return result
elif model_name == "LayoutLMv3":
model = models_dict['model']
processor = models_dict['processor']
# Process image with LayoutLMv3
encoded_inputs = processor(
image,
return_tensors="pt",
add_special_tokens=True,
return_offsets_mapping=True
)
outputs = model(**encoded_inputs)
predictions = outputs.logits.argmax(-1).squeeze().tolist()
# Convert predictions to labels
words = processor.tokenizer.convert_ids_to_tokens(
encoded_inputs.input_ids.squeeze().tolist()
)
result = {
"predictions": [
{
"text": word,
"label": pred
}
for word, pred in zip(words, predictions)
if word not in ["", "", ""]
],
"confidence_scores": outputs.logits.softmax(-1).max(-1).values.squeeze().tolist()
}
return result
else:
return {"error": f"Unknown model: {model_name}", "type": "model_error"}
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Analysis error: {str(e)}\n{error_details}")
return {
"error": str(e),
"type": "processing_error",
"details": error_details
}
# Set page config with improved layout
st.set_page_config(
page_title="Document Analysis Comparison",
layout="wide",
initial_sidebar_state="expanded"
)
# Add custom CSS for better styling
st.markdown("""
""", unsafe_allow_html=True)
# Title and description
st.title("Document Understanding Model Comparison")
st.markdown("""
Compare different models for document analysis and understanding.
Upload an image and select a model to analyze it.
""")
# Create two columns for layout
col1, col2 = st.columns([1, 1])
with col1:
# File uploader with improved error handling
uploaded_file = st.file_uploader(
"Choose a document image",
type=['png', 'jpg', 'jpeg', 'pdf'],
help="Supported formats: PNG, JPEG, PDF"
)
if uploaded_file is not None:
try:
# Display uploaded image
image = Image.open(uploaded_file)
st.image(image, caption='Uploaded Document', use_column_width=True)
except Exception as e:
st.error(f"Error loading image: {str(e)}")
with col2:
# Model selection with detailed information
model_info = {
"Donut": {
"description": "Best for structured OCR and document format understanding",
"memory": "6-8GB",
"strengths": ["Structured OCR", "Memory efficient", "Good with fixed formats"],
"best_for": ["Invoices", "Forms", "Structured documents", "Tables"]
},
"LayoutLMv3": {
"description": "Strong layout understanding with reasoning capabilities",
"memory": "12-15GB",
"strengths": ["Layout understanding", "Reasoning", "Pre-trained knowledge"],
"best_for": ["Complex documents", "Mixed layouts", "Documents with tables", "Multi-column text"]
},
"OmniParser": {
"description": "General screen parsing tool for UI understanding",
"memory": "8-10GB",
"strengths": ["UI element detection", "Interactive element recognition", "Function description"],
"best_for": ["Screenshots", "UI analysis", "Interactive elements", "Web interfaces"]
}
}
selected_model = st.selectbox(
"Select Model",
list(model_info.keys())
)
# Display enhanced model information
st.markdown("### Model Details")
with st.expander("Model Information", expanded=True):
st.markdown(f"**Description:** {model_info[selected_model]['description']}")
st.markdown(f"**Memory Required:** {model_info[selected_model]['memory']}")
st.markdown("**Strengths:**")
for strength in model_info[selected_model]['strengths']:
st.markdown(f"- {strength}")
st.markdown("**Best For:**")
for use_case in model_info[selected_model]['best_for']:
st.markdown(f"- {use_case}")
# Inside the analysis section, replace the existing if-block with:
if uploaded_file is not None and selected_model:
if st.button("Analyze Document", help="Click to start document analysis"):
# Create two columns for results and debug info
result_col, debug_col = st.columns([1, 1])
with st.spinner('Processing...'):
try:
# Create a progress bar in results column
with result_col:
st.markdown("### Analysis Progress")
progress_bar = st.progress(0)
# Initialize debug column
with debug_col:
st.markdown("### Debug Information")
debug_container = st.empty()
def update_debug(message, level="info"):
"""Update debug information with timestamp"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
color = {
"info": "blue",
"warning": "orange",
"error": "red",
"success": "green"
}.get(level, "black")
return f"[{timestamp}] {message}
"
debug_messages = []
def add_debug(message, level="info"):
debug_messages.append(update_debug(message, level))
debug_container.markdown(
"\n".join(debug_messages),
unsafe_allow_html=True
)
# Load model with progress update
with result_col:
progress_bar.progress(25)
st.info("Loading model...")
add_debug(f"Loading {selected_model} model and processor...")
models_dict = load_model(selected_model)
if models_dict is None:
with result_col:
st.error("Failed to load model. Please try again.")
add_debug("Model loading failed!", "error")
else:
add_debug("Model loaded successfully", "success")
# For device info, we need to check which model we're using
if selected_model == "OmniParser":
model_device = next(models_dict['model'].parameters()).device
else:
model_device = next(models_dict['model'].parameters()).device
add_debug(f"Model device: {model_device}")
# Update progress
with result_col:
progress_bar.progress(50)
st.info("Analyzing document...")
# Log image details
add_debug(f"Image size: {image.size}")
add_debug(f"Image mode: {image.mode}")
# Analyze document
add_debug("Starting document analysis...")
results = analyze_document(image, selected_model, models_dict)
add_debug("Analysis completed", "success")
# Update progress
with result_col:
progress_bar.progress(75)
st.markdown("### Analysis Results")
if isinstance(results, dict) and "error" in results:
st.error(f"Analysis Error: {results['error']}")
add_debug(f"Analysis error: {results['error']}", "error")
else:
# Pretty print the results in results column
st.json(results)
# Show detailed results breakdown in debug column
add_debug("Results breakdown:", "info")
if isinstance(results, dict):
for key, value in results.items():
add_debug(f"- {key}: {type(value)}")
else:
add_debug(f"Result type: {type(results)}")
# Complete progress
progress_bar.progress(100)
st.success("Analysis completed!")
# Final debug info
add_debug("Process completed successfully", "success")
with debug_col:
if torch.cuda.is_available():
st.markdown("### Resource Usage")
st.markdown(f"""
- GPU Memory: {torch.cuda.max_memory_allocated()/1024**2:.2f}MB
- GPU Utilization: {torch.cuda.utilization()}%
""")
except Exception as e:
with result_col:
st.error(f"Error during analysis: {str(e)}")
add_debug(f"Error: {str(e)}", "error")
add_debug(f"Error type: {type(e)}", "error")
if hasattr(e, '__traceback__'):
add_debug("Traceback available in logs", "warning")
# Add improved information about usage and limitations
def verify_weights_directory():
"""Verify the weights directory structure and files"""
weights_path = "weights"
required_files = {
os.path.join(weights_path, "icon_detect", "model.safetensors"): "YOLO model weights",
os.path.join(weights_path, "icon_detect", "model.yaml"): "YOLO model config",
os.path.join(weights_path, "icon_caption_florence", "model.safetensors"): "Florence model weights",
os.path.join(weights_path, "icon_caption_florence", "config.json"): "Florence model config",
os.path.join(weights_path, "icon_caption_florence", "generation_config.json"): "Florence generation config"
}
missing_files = []
for file_path, description in required_files.items():
if not os.path.exists(file_path):
missing_files.append(f"{description} at {file_path}")
if missing_files:
st.warning("Missing required model files:")
for missing in missing_files:
st.write(f"- {missing}")
return False
return True
# Add this in your app's initialization
if st.checkbox("Check Model Files"):
if verify_weights_directory():
st.success("All required model files are present")
else:
st.error("Some model files are missing. Please ensure all required files are in the weights directory")
st.markdown("""
---
### Usage Notes:
- Different models excel at different types of documents
- Processing time and memory requirements vary by model
- Image quality significantly affects results
- Some models may require specific document formats
""")
# Add performance metrics section
if st.checkbox("Show Performance Metrics"):
st.markdown("""
### Model Performance Metrics
| Model | Avg. Processing Time | Memory Usage | Accuracy* |
|-------|---------------------|--------------|-----------|
| Donut | 2-3 seconds | 6-8GB | 85-90% |
| LayoutLMv3 | 3-4 seconds | 12-15GB | 88-93% |
| OmniParser | 2-3 seconds | 8-10GB | 85-90% |
*Accuracy varies based on document type and quality
""")
# Add a footer with version and contact information
st.markdown("---")
st.markdown("""
v1.1 - Created with Streamlit
\nPowered by Hugging Face Spaces 🤗
""")
# Add model selection guidance
if st.checkbox("Show Model Selection Guide"):
st.markdown("""
### How to Choose the Right Model
1. **Donut**: Choose for structured documents with clear layouts
2. **LayoutLMv3**: Best for documents with complex layouts and relationships
3. **OmniParser**: Best for UI elements and screen parsing
""")