File size: 9,454 Bytes
24f9240 f733ed3 24f9240 8b1166b 24f9240 8b1166b f733ed3 8b1166b 24f9240 8b1166b 24f9240 8b1166b 24f9240 8b1166b 24f9240 8b1166b 24f9240 f733ed3 24f9240 8b1166b 24f9240 f733ed3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import gradio as gr
from typing import Dict
import logging
import tempfile
import io
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from pdf2image import convert_from_bytes
from PIL import Image
import pytesseract
import docx2txt
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import docx
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class AdvancedDocProcessor:
def __init__(self):
# Initialize BART model for text cleaning and summarization
self.bart_tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn")
self.bart_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn")
# Initialize T5 model for text generation tasks
self.t5_tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")
self.t5_model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base")
# Initialize pipeline for named entity recognition
self.ner_pipeline = pipeline("ner", model="dbmdz/bert-large-cased-finetuned-conll03-english")
def extract_text(self, file_content: bytes, file_type: str) -> str:
"""Extract text from various file types."""
try:
if file_type == "application/pdf":
return self.extract_text_from_pdf(file_content)
elif file_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
return self.extract_text_from_docx(file_content)
elif file_type == "text/plain":
return file_content.decode('utf-8')
else:
raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e:
logger.error(f"Error extracting text: {str(e)}")
return ""
def extract_text_from_pdf(self, pdf_content: bytes) -> str:
"""Extract text from PDF using OCR."""
try:
images = convert_from_bytes(pdf_content, timeout=60) # Add timeout
text = ""
for image in images:
text += pytesseract.image_to_string(image)
return text
except Exception as e:
logger.error(f"Error extracting text from PDF: {str(e)}")
return ""
def extract_text_from_docx(self, docx_content: bytes) -> str:
"""Extract text from a DOCX file."""
try:
return docx2txt.process(io.BytesIO(docx_content))
except Exception as e:
logger.error(f"Error extracting text from DOCX: {str(e)}")
return ""
def clean_and_summarize_text(self, text: str) -> str:
"""Clean and summarize the text using BART."""
try:
chunk_size = 1024
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
summarized_chunks = []
for chunk in chunks:
inputs = self.bart_tokenizer([chunk], max_length=1024, return_tensors="pt", truncation=True)
summary_ids = self.bart_model.generate(inputs["input_ids"], num_beams=4, max_length=150, early_stopping=True)
summarized_chunks.append(self.bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True))
return " ".join(summarized_chunks)
except Exception as e:
logger.error(f"Error cleaning and summarizing text: {str(e)}")
return text
def process_with_t5(self, text: str, prompt: str) -> str:
"""Process the text with T5 based on the given prompt."""
try:
chunk_size = 512
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
processed_chunks = []
for chunk in chunks:
input_text = f"{prompt} {chunk}"
inputs = self.t5_tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True)
outputs = self.t5_model.generate(
**inputs,
max_length=150,
num_return_sequences=1,
do_sample=True,
temperature=0.7
)
processed_chunks.append(self.t5_tokenizer.decode(outputs[0], skip_special_tokens=True))
return " ".join(processed_chunks)
except Exception as e:
logger.error(f"Error processing with T5: {str(e)}")
return f"Error processing text: {str(e)}"
def extract_entities(self, text: str) -> str:
"""Extract named entities from the text."""
try:
chunk_size = 10000
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
all_entities = []
for chunk in chunks:
entities = self.ner_pipeline(chunk)
all_entities.extend(entities)
unique_entities = set((ent['word'], ent['entity']) for ent in all_entities)
return "\n".join([f"{word} ({entity})" for word, entity in unique_entities])
except Exception as e:
logger.error(f"Error extracting entities: {str(e)}")
return "Error extracting entities"
def process_document(self, file_content: bytes, file_type: str, prompt: str) -> Dict[str, str]:
raw_text = self.extract_text(file_content, file_type)
cleaned_text = self.clean_and_summarize_text(raw_text)
processed_text = self.process_with_t5(cleaned_text, prompt)
entities = self.extract_entities(raw_text)
return {
"cleaned": cleaned_text,
"processed": processed_text,
"entities": entities
}
def create_gradio_interface():
processor = AdvancedDocProcessor()
def process_and_display(file, prompt, output_format):
def processing_task():
file_content = file
file_type = infer_file_type(file_content)
results = processor.process_document(file_content, file_type, prompt)
if output_format == "txt":
output_path = save_as_txt(results)
elif output_format == "docx":
output_path = save_as_docx(results)
else: # pdf
output_path = save_as_pdf(results)
return (f"Cleaned and Summarized Text:\n{results['cleaned']}\n\n"
f"Processed Text:\n{results['processed']}\n\n"
f"Extracted Entities:\n{results['entities']}"), output_path
with ThreadPoolExecutor() as executor:
future = executor.submit(processing_task)
try:
return future.result(timeout=300) # 5 minutes timeout
except TimeoutError:
return "Processing timed out after 5 minutes.", None
iface = gr.Interface(
fn=process_and_display,
inputs=[
gr.File(label="Upload Document (PDF, DOCX, or TXT)", type="binary"),
gr.Textbox(label="Enter your prompt for processing", lines=3),
gr.Radio(["txt", "docx", "pdf"], label="Output Format", value="txt")
],
outputs=[
gr.Textbox(label="Processing Results", lines=30),
gr.File(label="Download Processed Document")
],
title="Advanced Document Processing Tool",
description="Upload a document (PDF, DOCX, or TXT) and enter a prompt to process and analyze the text using state-of-the-art NLP models.",
)
return iface
def infer_file_type(file_content: bytes) -> str:
"""Infer the file type from the byte content."""
if file_content.startswith(b'%PDF'):
return "application/pdf"
elif file_content.startswith(b'PK\x03\x04'):
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
else:
return "text/plain"
def save_as_txt(results: Dict[str, str]) -> str:
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as temp_file:
for key, value in results.items():
temp_file.write(f"{key.upper()}:\n{value}\n\n")
return temp_file.name
def save_as_docx(results: Dict[str, str]) -> str:
doc = docx.Document()
for key, value in results.items():
doc.add_heading(key.capitalize(), level=1)
doc.add_paragraph(value)
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp:
doc.save(tmp.name)
return tmp.name
def save_as_pdf(results: Dict[str, str]) -> str:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
doc = SimpleDocTemplate(tmp.name, pagesize=letter)
styles = getSampleStyleSheet()
story = []
for key, value in results.items():
story.append(Paragraph(key.capitalize(), styles['Heading1']))
story.append(Paragraph(value, styles['BodyText']))
story.append(Spacer(1, 12))
doc.build(story)
return tmp.name
# Launch the Gradio app
if __name__ == "__main__":
iface = create_gradio_interface()
iface.launch()
# Launch the Gradio app
if __name__ == "__main__":
iface = create_gradio_interface()
iface.launch()
|