|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from datetime import datetime
|
|
import pymupdf
|
|
import logging
|
|
|
|
|
|
from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords
|
|
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
|
|
|
|
|
|
MAX_FILE_SIZE_MB = 50
|
|
CONVERSION_TIMEOUT_SECONDS = 300
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_and_format_from_pdf(pdf_path):
|
|
"""
|
|
Extract text from a PDF file and convert it to Markdown, preserving formatting.
|
|
"""
|
|
try:
|
|
log_counter("pdf_text_extraction_attempt", labels={"file_path": pdf_path})
|
|
start_time = datetime.now()
|
|
|
|
markdown_text = ""
|
|
with pymupdf.open(pdf_path) as doc:
|
|
for page_num, page in enumerate(doc, 1):
|
|
markdown_text += f"## Page {page_num}\n\n"
|
|
blocks = page.get_text("dict")["blocks"]
|
|
current_paragraph = ""
|
|
for block in blocks:
|
|
if block["type"] == 0:
|
|
for line in block["lines"]:
|
|
line_text = ""
|
|
for span in line["spans"]:
|
|
text = span["text"]
|
|
font_size = span["size"]
|
|
font_flags = span["flags"]
|
|
|
|
|
|
if font_size > 20:
|
|
text = f"# {text}"
|
|
elif font_size > 16:
|
|
text = f"## {text}"
|
|
elif font_size > 14:
|
|
text = f"### {text}"
|
|
|
|
if font_flags & 2 ** 0:
|
|
text = f"**{text}**"
|
|
if font_flags & 2 ** 1:
|
|
text = f"*{text}*"
|
|
|
|
line_text += text + " "
|
|
|
|
|
|
line_text = line_text.rstrip()
|
|
if line_text.endswith('-'):
|
|
line_text = line_text[:-1]
|
|
else:
|
|
line_text += " "
|
|
|
|
current_paragraph += line_text
|
|
|
|
|
|
if current_paragraph:
|
|
|
|
current_paragraph = re.sub(r'\s+', ' ', current_paragraph).strip()
|
|
markdown_text += current_paragraph + "\n\n"
|
|
current_paragraph = ""
|
|
elif block["type"] == 1:
|
|
markdown_text += "[Image]\n\n"
|
|
markdown_text += "\n---\n\n"
|
|
|
|
|
|
markdown_text = re.sub(r'(\w+)-\s*\n(\w+)', r'\1\2', markdown_text)
|
|
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
log_histogram("pdf_text_extraction_duration", processing_time, labels={"file_path": pdf_path})
|
|
log_counter("pdf_text_extraction_success", labels={"file_path": pdf_path})
|
|
|
|
return markdown_text
|
|
except Exception as e:
|
|
logging.error(f"Error extracting text and formatting from PDF: {str(e)}")
|
|
log_counter("pdf_text_extraction_error", labels={"file_path": pdf_path, "error": str(e)})
|
|
raise
|
|
|
|
|
|
def extract_metadata_from_pdf(pdf_path):
|
|
"""
|
|
Extract metadata from a PDF file using PyMuPDF.
|
|
"""
|
|
try:
|
|
log_counter("pdf_metadata_extraction_attempt", labels={"file_path": pdf_path})
|
|
with pymupdf.open(pdf_path) as doc:
|
|
metadata = doc.metadata
|
|
log_counter("pdf_metadata_extraction_success", labels={"file_path": pdf_path})
|
|
return metadata
|
|
except Exception as e:
|
|
logging.error(f"Error extracting metadata from PDF: {str(e)}")
|
|
log_counter("pdf_metadata_extraction_error", labels={"file_path": pdf_path, "error": str(e)})
|
|
return {}
|
|
|
|
|
|
def process_and_ingest_pdf(file, title, author, keywords):
|
|
if file is None:
|
|
log_counter("pdf_ingestion_error", labels={"error": "No file uploaded"})
|
|
return "Please select a PDF file to upload."
|
|
|
|
try:
|
|
log_counter("pdf_ingestion_attempt", labels={"file_name": file.name})
|
|
start_time = datetime.now()
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
|
temp_path = os.path.join(temp_dir, "temp.pdf")
|
|
|
|
|
|
shutil.copy(file.name, temp_path)
|
|
|
|
|
|
markdown_text = extract_text_and_format_from_pdf(temp_path)
|
|
|
|
|
|
metadata = extract_metadata_from_pdf(temp_path)
|
|
|
|
|
|
if not title:
|
|
title = metadata.get('title', os.path.splitext(os.path.basename(file.name))[0])
|
|
if not author:
|
|
author = metadata.get('author', 'Unknown')
|
|
|
|
|
|
if not keywords:
|
|
keywords = 'pdf_file,markdown_converted'
|
|
else:
|
|
keywords = f'pdf_file,markdown_converted,{keywords}'
|
|
|
|
|
|
if 'subject' in metadata:
|
|
keywords += f",{metadata['subject']}"
|
|
|
|
|
|
add_media_with_keywords(
|
|
url=file.name,
|
|
title=title,
|
|
media_type='document',
|
|
content=markdown_text,
|
|
keywords=keywords,
|
|
prompt='No prompt for PDF files',
|
|
summary='No summary for PDF files',
|
|
transcription_model='None',
|
|
author=author,
|
|
ingestion_date=datetime.now().strftime('%Y-%m-%d')
|
|
)
|
|
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
log_histogram("pdf_ingestion_duration", processing_time, labels={"file_name": file.name})
|
|
log_counter("pdf_ingestion_success", labels={"file_name": file.name})
|
|
|
|
return f"PDF file '{title}' by {author} ingested successfully and converted to Markdown."
|
|
except Exception as e:
|
|
logging.error(f"Error ingesting PDF file: {str(e)}")
|
|
log_counter("pdf_ingestion_error", labels={"file_name": file.name, "error": str(e)})
|
|
return f"Error ingesting PDF file: {str(e)}"
|
|
|
|
|
|
def process_and_cleanup_pdf(file, title, author, keywords):
|
|
if file is None:
|
|
log_counter("pdf_processing_error", labels={"error": "No file uploaded"})
|
|
return "No file uploaded. Please upload a PDF file."
|
|
|
|
try:
|
|
log_counter("pdf_processing_attempt", labels={"file_name": file.name})
|
|
start_time = datetime.now()
|
|
|
|
result = process_and_ingest_pdf(file, title, author, keywords)
|
|
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
log_histogram("pdf_processing_duration", processing_time, labels={"file_name": file.name})
|
|
log_counter("pdf_processing_success", labels={"file_name": file.name})
|
|
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"Error in processing and cleanup: {str(e)}")
|
|
log_counter("pdf_processing_error", labels={"file_name": file.name, "error": str(e)})
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|