Spaces:
Running
Running
import spaces | |
import streamlit as st | |
from PIL import Image | |
import torch | |
from transformers import ( | |
DonutProcessor, | |
VisionEncoderDecoderModel, | |
LayoutLMv3Processor, | |
LayoutLMv3ForSequenceClassification, | |
AutoProcessor, | |
AutoModelForCausalLM, | |
AutoModelForVisualQuestionAnswering | |
) | |
from ultralytics import YOLO | |
import io | |
import base64 | |
import json | |
from datetime import datetime | |
import os | |
import logging | |
# Add this near the top of the file, after imports | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
def load_model(model_name): | |
"""Load the selected model and processor""" | |
try: | |
if model_name == "OmniParser": | |
try: | |
# Load model directly using official implementation | |
processor = AutoProcessor.from_pretrained( | |
"microsoft/OmniParser", | |
trust_remote_code=True | |
) | |
model = AutoModelForVisualQuestionAnswering.from_pretrained( | |
"microsoft/OmniParser", | |
trust_remote_code=True, | |
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
) | |
if torch.cuda.is_available(): | |
model = model.to("cuda") | |
st.success("Successfully loaded OmniParser model") | |
return { | |
'processor': processor, | |
'model': model | |
} | |
except Exception as e: | |
st.error(f"Failed to load OmniParser from HuggingFace Hub: {str(e)}") | |
logger.error(f"OmniParser loading error: {str(e)}", exc_info=True) | |
return None | |
elif model_name == "Donut": | |
processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base") | |
model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base") | |
# Configure Donut specific parameters | |
model.config.decoder_start_token_id = processor.tokenizer.bos_token_id | |
model.config.pad_token_id = processor.tokenizer.pad_token_id | |
model.config.vocab_size = len(processor.tokenizer) | |
return {'model': model, 'processor': processor} | |
elif model_name == "LayoutLMv3": | |
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base") | |
model = LayoutLMv3ForSequenceClassification.from_pretrained("microsoft/layoutlmv3-base") | |
return {'model': model, 'processor': processor} | |
else: | |
raise ValueError(f"Unknown model name: {model_name}") | |
except Exception as e: | |
st.error(f"Error loading model {model_name}: {str(e)}") | |
logger.error(f"Error details: {str(e)}", exc_info=True) | |
return None | |
def analyze_document(image, model_name, models_dict): | |
"""Analyze document using selected model""" | |
try: | |
if models_dict is None: | |
return {"error": "Model failed to load", "type": "model_error"} | |
if model_name == "OmniParser": | |
# Process image with OmniParser | |
inputs = models_dict['processor']( | |
images=image, | |
return_tensors="pt", | |
) | |
if torch.cuda.is_available(): | |
inputs = {k: v.to("cuda") if hasattr(v, "to") else v | |
for k, v in inputs.items()} | |
# Generate outputs | |
outputs = models_dict['model'](**inputs) | |
# Process results | |
# The exact processing will depend on the model's output format | |
results = { | |
"predictions": outputs.logits.softmax(-1).tolist(), | |
"detected_elements": len(outputs.logits[0]), | |
"model_output": { | |
k: v.tolist() if hasattr(v, "tolist") else str(v) | |
for k, v in outputs.items() | |
if k != "last_hidden_state" # Skip large tensors | |
} | |
} | |
return results | |
elif model_name == "Donut": | |
model = models_dict['model'] | |
processor = models_dict['processor'] | |
# Process image with Donut | |
pixel_values = processor(image, return_tensors="pt").pixel_values | |
task_prompt = "<s_cord>analyze the document and extract information</s_cord>" | |
decoder_input_ids = processor.tokenizer( | |
task_prompt, | |
add_special_tokens=False, | |
return_tensors="pt" | |
).input_ids | |
outputs = model.generate( | |
pixel_values, | |
decoder_input_ids=decoder_input_ids, | |
max_length=512, | |
early_stopping=True, | |
pad_token_id=processor.tokenizer.pad_token_id, | |
eos_token_id=processor.tokenizer.eos_token_id, | |
use_cache=True, | |
num_beams=4, | |
bad_words_ids=[[processor.tokenizer.unk_token_id]], | |
return_dict_in_generate=True | |
) | |
sequence = processor.batch_decode(outputs.sequences)[0] | |
sequence = sequence.replace(task_prompt, "").replace("</s_cord>", "").strip() | |
try: | |
result = json.loads(sequence) | |
except json.JSONDecodeError: | |
result = {"raw_text": sequence} | |
return result | |
elif model_name == "LayoutLMv3": | |
model = models_dict['model'] | |
processor = models_dict['processor'] | |
# Process image with LayoutLMv3 | |
encoded_inputs = processor( | |
image, | |
return_tensors="pt", | |
add_special_tokens=True, | |
return_offsets_mapping=True | |
) | |
outputs = model(**encoded_inputs) | |
predictions = outputs.logits.argmax(-1).squeeze().tolist() | |
# Convert predictions to labels | |
words = processor.tokenizer.convert_ids_to_tokens( | |
encoded_inputs.input_ids.squeeze().tolist() | |
) | |
result = { | |
"predictions": [ | |
{ | |
"text": word, | |
"label": pred | |
} | |
for word, pred in zip(words, predictions) | |
if word not in ["<s>", "</s>", "<pad>"] | |
], | |
"confidence_scores": outputs.logits.softmax(-1).max(-1).values.squeeze().tolist() | |
} | |
return result | |
else: | |
return {"error": f"Unknown model: {model_name}", "type": "model_error"} | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"Analysis error: {str(e)}\n{error_details}") | |
return { | |
"error": str(e), | |
"type": "processing_error", | |
"details": error_details | |
} | |
# Set page config with improved layout | |
st.set_page_config( | |
page_title="Document Analysis Comparison", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Add custom CSS for better styling | |
st.markdown(""" | |
<style> | |
.stAlert { | |
margin-top: 1rem; | |
} | |
.upload-text { | |
font-size: 1.2rem; | |
margin-bottom: 1rem; | |
} | |
.model-info { | |
padding: 1rem; | |
border-radius: 0.5rem; | |
background-color: #f8f9fa; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Title and description | |
st.title("Document Understanding Model Comparison") | |
st.markdown(""" | |
Compare different models for document analysis and understanding. | |
Upload an image and select a model to analyze it. | |
""") | |
# Create two columns for layout | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
# File uploader with improved error handling | |
uploaded_file = st.file_uploader( | |
"Choose a document image", | |
type=['png', 'jpg', 'jpeg', 'pdf'], | |
help="Supported formats: PNG, JPEG, PDF" | |
) | |
if uploaded_file is not None: | |
try: | |
# Display uploaded image | |
image = Image.open(uploaded_file) | |
st.image(image, caption='Uploaded Document', use_column_width=True) | |
except Exception as e: | |
st.error(f"Error loading image: {str(e)}") | |
with col2: | |
# Model selection with detailed information | |
model_info = { | |
"Donut": { | |
"description": "Best for structured OCR and document format understanding", | |
"memory": "6-8GB", | |
"strengths": ["Structured OCR", "Memory efficient", "Good with fixed formats"], | |
"best_for": ["Invoices", "Forms", "Structured documents", "Tables"] | |
}, | |
"LayoutLMv3": { | |
"description": "Strong layout understanding with reasoning capabilities", | |
"memory": "12-15GB", | |
"strengths": ["Layout understanding", "Reasoning", "Pre-trained knowledge"], | |
"best_for": ["Complex documents", "Mixed layouts", "Documents with tables", "Multi-column text"] | |
}, | |
"OmniParser": { | |
"description": "General screen parsing tool for UI understanding", | |
"memory": "8-10GB", | |
"strengths": ["UI element detection", "Interactive element recognition", "Function description"], | |
"best_for": ["Screenshots", "UI analysis", "Interactive elements", "Web interfaces"] | |
} | |
} | |
selected_model = st.selectbox( | |
"Select Model", | |
list(model_info.keys()) | |
) | |
# Display enhanced model information | |
st.markdown("### Model Details") | |
with st.expander("Model Information", expanded=True): | |
st.markdown(f"**Description:** {model_info[selected_model]['description']}") | |
st.markdown(f"**Memory Required:** {model_info[selected_model]['memory']}") | |
st.markdown("**Strengths:**") | |
for strength in model_info[selected_model]['strengths']: | |
st.markdown(f"- {strength}") | |
st.markdown("**Best For:**") | |
for use_case in model_info[selected_model]['best_for']: | |
st.markdown(f"- {use_case}") | |
# Inside the analysis section, replace the existing if-block with: | |
if uploaded_file is not None and selected_model: | |
if st.button("Analyze Document", help="Click to start document analysis"): | |
# Create two columns for results and debug info | |
result_col, debug_col = st.columns([1, 1]) | |
with st.spinner('Processing...'): | |
try: | |
# Create a progress bar in results column | |
with result_col: | |
st.markdown("### Analysis Progress") | |
progress_bar = st.progress(0) | |
# Initialize debug column | |
with debug_col: | |
st.markdown("### Debug Information") | |
debug_container = st.empty() | |
def update_debug(message, level="info"): | |
"""Update debug information with timestamp""" | |
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
color = { | |
"info": "blue", | |
"warning": "orange", | |
"error": "red", | |
"success": "green" | |
}.get(level, "black") | |
return f"<div style='color: {color};'>[{timestamp}] {message}</div>" | |
debug_messages = [] | |
def add_debug(message, level="info"): | |
debug_messages.append(update_debug(message, level)) | |
debug_container.markdown( | |
"\n".join(debug_messages), | |
unsafe_allow_html=True | |
) | |
# Load model with progress update | |
with result_col: | |
progress_bar.progress(25) | |
st.info("Loading model...") | |
add_debug(f"Loading {selected_model} model and processor...") | |
models_dict = load_model(selected_model) | |
if models_dict is None: | |
with result_col: | |
st.error("Failed to load model. Please try again.") | |
add_debug("Model loading failed!", "error") | |
else: | |
add_debug("Model loaded successfully", "success") | |
# For device info, we need to check which model we're using | |
if selected_model == "OmniParser": | |
model_device = next(models_dict['model'].parameters()).device | |
else: | |
model_device = next(models_dict['model'].parameters()).device | |
add_debug(f"Model device: {model_device}") | |
# Update progress | |
with result_col: | |
progress_bar.progress(50) | |
st.info("Analyzing document...") | |
# Log image details | |
add_debug(f"Image size: {image.size}") | |
add_debug(f"Image mode: {image.mode}") | |
# Analyze document | |
add_debug("Starting document analysis...") | |
results = analyze_document(image, selected_model, models_dict) | |
add_debug("Analysis completed", "success") | |
# Update progress | |
with result_col: | |
progress_bar.progress(75) | |
st.markdown("### Analysis Results") | |
if isinstance(results, dict) and "error" in results: | |
st.error(f"Analysis Error: {results['error']}") | |
add_debug(f"Analysis error: {results['error']}", "error") | |
else: | |
# Pretty print the results in results column | |
st.json(results) | |
# Show detailed results breakdown in debug column | |
add_debug("Results breakdown:", "info") | |
if isinstance(results, dict): | |
for key, value in results.items(): | |
add_debug(f"- {key}: {type(value)}") | |
else: | |
add_debug(f"Result type: {type(results)}") | |
# Complete progress | |
progress_bar.progress(100) | |
st.success("Analysis completed!") | |
# Final debug info | |
add_debug("Process completed successfully", "success") | |
with debug_col: | |
if torch.cuda.is_available(): | |
st.markdown("### Resource Usage") | |
st.markdown(f""" | |
- GPU Memory: {torch.cuda.max_memory_allocated()/1024**2:.2f}MB | |
- GPU Utilization: {torch.cuda.utilization()}% | |
""") | |
except Exception as e: | |
with result_col: | |
st.error(f"Error during analysis: {str(e)}") | |
add_debug(f"Error: {str(e)}", "error") | |
add_debug(f"Error type: {type(e)}", "error") | |
if hasattr(e, '__traceback__'): | |
add_debug("Traceback available in logs", "warning") | |
# Add improved information about usage and limitations | |
def verify_weights_directory(): | |
"""Verify the weights directory structure and files""" | |
weights_path = "weights" | |
required_files = { | |
os.path.join(weights_path, "icon_detect", "model.safetensors"): "YOLO model weights", | |
os.path.join(weights_path, "icon_detect", "model.yaml"): "YOLO model config", | |
os.path.join(weights_path, "icon_caption_florence", "model.safetensors"): "Florence model weights", | |
os.path.join(weights_path, "icon_caption_florence", "config.json"): "Florence model config", | |
os.path.join(weights_path, "icon_caption_florence", "generation_config.json"): "Florence generation config" | |
} | |
missing_files = [] | |
for file_path, description in required_files.items(): | |
if not os.path.exists(file_path): | |
missing_files.append(f"{description} at {file_path}") | |
if missing_files: | |
st.warning("Missing required model files:") | |
for missing in missing_files: | |
st.write(f"- {missing}") | |
return False | |
return True | |
# Add this in your app's initialization | |
if st.checkbox("Check Model Files"): | |
if verify_weights_directory(): | |
st.success("All required model files are present") | |
else: | |
st.error("Some model files are missing. Please ensure all required files are in the weights directory") | |
st.markdown(""" | |
--- | |
### Usage Notes: | |
- Different models excel at different types of documents | |
- Processing time and memory requirements vary by model | |
- Image quality significantly affects results | |
- Some models may require specific document formats | |
""") | |
# Add performance metrics section | |
if st.checkbox("Show Performance Metrics"): | |
st.markdown(""" | |
### Model Performance Metrics | |
| Model | Avg. Processing Time | Memory Usage | Accuracy* | | |
|-------|---------------------|--------------|-----------| | |
| Donut | 2-3 seconds | 6-8GB | 85-90% | | |
| LayoutLMv3 | 3-4 seconds | 12-15GB | 88-93% | | |
| OmniParser | 2-3 seconds | 8-10GB | 85-90% | | |
*Accuracy varies based on document type and quality | |
""") | |
# Add a footer with version and contact information | |
st.markdown("---") | |
st.markdown(""" | |
v1.1 - Created with Streamlit | |
\nPowered by Hugging Face Spaces 🤗 | |
""") | |
# Add model selection guidance | |
if st.checkbox("Show Model Selection Guide"): | |
st.markdown(""" | |
### How to Choose the Right Model | |
1. **Donut**: Choose for structured documents with clear layouts | |
2. **LayoutLMv3**: Best for documents with complex layouts and relationships | |
3. **OmniParser**: Best for UI elements and screen parsing | |
""") |