|
import gradio as gr |
|
import logging |
|
import os |
|
import time |
|
import docx2txt |
|
import openai |
|
from docx import Document |
|
from reportlab.pdfgen import canvas |
|
from reportlab.lib.pagesizes import letter |
|
from PIL import Image, ImageDraw, ImageFont |
|
|
|
from adobe.pdfservices.operation.auth.service_principal_credentials import ServicePrincipalCredentials |
|
from adobe.pdfservices.operation.exception.exceptions import ServiceApiException, ServiceUsageException, SdkException |
|
from adobe.pdfservices.operation.io.cloud_asset import CloudAsset |
|
from adobe.pdfservices.operation.io.stream_asset import StreamAsset |
|
from adobe.pdfservices.operation.pdf_services import PDFServices |
|
from adobe.pdfservices.operation.pdf_services_media_type import PDFServicesMediaType |
|
from adobe.pdfservices.operation.pdfjobs.jobs.export_pdf_job import ExportPDFJob |
|
from adobe.pdfservices.operation.pdfjobs.params.export_pdf.export_pdf_params import ExportPDFParams |
|
from adobe.pdfservices.operation.pdfjobs.params.export_pdf.export_pdf_target_format import ExportPDFTargetFormat |
|
from adobe.pdfservices.operation.pdfjobs.result.export_pdf_result import ExportPDFResult |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
openai.api_key = "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" |
|
|
|
|
|
latest_processed_text = "" |
|
|
|
class ExportPDFToDOCX: |
|
def __init__(self, pdf_path): |
|
self.pdf_path = pdf_path |
|
self.credentials = ServicePrincipalCredentials( |
|
client_id="67a192fe77ad4e2f9771d8f6dcc10d4e", |
|
client_secret="p8e-oA61w9gc_B-L34L49Xu0to6E3MejqpA7" |
|
) |
|
self.pdf_services = PDFServices(credentials=self.credentials) |
|
|
|
def process(self, output_path): |
|
try: |
|
with open(self.pdf_path, 'rb') as file: |
|
input_stream = file.read() |
|
|
|
input_asset = self.pdf_services.upload(input_stream=input_stream, mime_type=PDFServicesMediaType.PDF) |
|
export_pdf_params = ExportPDFParams(target_format=ExportPDFTargetFormat.DOCX) |
|
export_pdf_job = ExportPDFJob(input_asset=input_asset, export_pdf_params=export_pdf_params) |
|
|
|
location = self.pdf_services.submit(export_pdf_job) |
|
pdf_services_response = self.pdf_services.get_job_result(location, ExportPDFResult) |
|
|
|
result_asset: CloudAsset = pdf_services_response.get_result().get_asset() |
|
stream_asset: StreamAsset = self.pdf_services.get_content(result_asset) |
|
|
|
with open(output_path, "wb") as file: |
|
file.write(stream_asset.get_input_stream()) |
|
|
|
return output_path |
|
|
|
except ServiceApiException as e: |
|
if "CORRUPT_DOCUMENT" in str(e): |
|
logging.error(f"The input PDF file appears to be corrupted: {e}") |
|
return "CORRUPT_DOCUMENT" |
|
else: |
|
logging.exception(f'Service API Exception encountered while converting PDF: {e}') |
|
return None |
|
except (ServiceUsageException, SdkException) as e: |
|
logging.exception(f'Exception encountered while converting PDF: {e}') |
|
return None |
|
|
|
def process_with_gpt(text, task_description): |
|
try: |
|
max_tokens = 4000 |
|
chunks = [text[i:i+max_tokens] for i in range(0, len(text), max_tokens)] |
|
|
|
processed_chunks = [] |
|
|
|
for i, chunk in enumerate(chunks): |
|
chunk_prompt = f"{task_description}\n\nPart {i+1} of {len(chunks)}:\n{chunk}\n\nPlease improve this text based on the given task description." |
|
|
|
response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant that processes and improves documents based on given instructions."}, |
|
{"role": "user", "content": chunk_prompt} |
|
], |
|
max_tokens=2000, |
|
n=1, |
|
temperature=0.7, |
|
) |
|
|
|
processed_chunk = response.choices[0].message['content'].strip() |
|
processed_chunks.append(processed_chunk) |
|
|
|
|
|
time.sleep(20) |
|
|
|
return " ".join(processed_chunks) |
|
except Exception as e: |
|
logging.exception(f"Error in GPT processing: {e}") |
|
return f"Error occurred during GPT processing: {str(e)}" |
|
|
|
def process_pdf(pdf_file, task_description): |
|
try: |
|
|
|
os.makedirs('adobe_output', exist_ok=True) |
|
os.makedirs('final_output', exist_ok=True) |
|
|
|
|
|
pdf_path = os.path.join('adobe_output', 'input.pdf') |
|
with open(pdf_path, 'wb') as f: |
|
f.write(pdf_file if isinstance(pdf_file, bytes) else pdf_file.read()) |
|
|
|
|
|
exporter = ExportPDFToDOCX(pdf_path) |
|
docx_path = os.path.join('adobe_output', 'output.docx') |
|
docx_file = exporter.process(docx_path) |
|
|
|
if docx_file is None: |
|
return "Error occurred during PDF to DOCX conversion." |
|
elif docx_file == "CORRUPT_DOCUMENT": |
|
return "The uploaded PDF file appears to be corrupted. Please check the file and try again." |
|
|
|
|
|
text = docx2txt.process(docx_file) |
|
|
|
if not text.strip(): |
|
return "The extracted text is empty. Please check the input PDF file." |
|
|
|
|
|
final_text = process_with_gpt(text, task_description) |
|
|
|
return final_text |
|
|
|
except Exception as e: |
|
logging.exception(f"Error processing PDF: {e}") |
|
return f"An error occurred while processing the PDF: {str(e)}" |
|
|
|
def iterative_processing(pdf_file, task_description, iteration_feedback): |
|
global latest_processed_text |
|
|
|
if pdf_file is not None and not latest_processed_text: |
|
|
|
latest_processed_text = process_pdf(pdf_file, task_description) |
|
return latest_processed_text |
|
elif latest_processed_text: |
|
|
|
iteration_prompt = f"Previous text:\n\n{latest_processed_text}\n\nRequested changes:\n{iteration_feedback}\n\nPlease apply these changes to the text." |
|
latest_processed_text = process_with_gpt(latest_processed_text, iteration_prompt) |
|
return latest_processed_text |
|
else: |
|
return "Please upload a PDF file first." |
|
|
|
def save_as_docx(text, output_path): |
|
doc = Document() |
|
doc.add_paragraph(text) |
|
doc.save(output_path) |
|
return output_path |
|
|
|
def save_as_pdf(text, output_path): |
|
c = canvas.Canvas(output_path, pagesize=letter) |
|
width, height = letter |
|
y = height - 50 |
|
for line in text.split('\n'): |
|
if y < 50: |
|
c.showPage() |
|
y = height - 50 |
|
c.drawString(50, y, line) |
|
y -= 15 |
|
c.save() |
|
return output_path |
|
|
|
def save_as_jpg(text, output_path): |
|
img = Image.new('RGB', (800, 1000), color='white') |
|
d = ImageDraw.Draw(img) |
|
font = ImageFont.load_default() |
|
y = 10 |
|
for line in text.split('\n'): |
|
d.text((10, y), line, fill='black', font=font) |
|
y += 20 |
|
img.save(output_path) |
|
return output_path |
|
|
|
def process_and_preview(pdf_file, task_description, iteration_feedback): |
|
global latest_processed_text |
|
|
|
if pdf_file is not None or latest_processed_text: |
|
|
|
latest_processed_text = iterative_processing(pdf_file, task_description, iteration_feedback) |
|
return latest_processed_text |
|
else: |
|
return "Please upload a PDF file first." |
|
|
|
def save_document(output_format): |
|
global latest_processed_text |
|
|
|
if not latest_processed_text: |
|
return "No processed text available. Please process a document first." |
|
|
|
|
|
temp_dir = os.path.join(os.getcwd(), 'temp') |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
if output_format == 'docx': |
|
temp_path = os.path.join(temp_dir, 'output.docx') |
|
save_as_docx(latest_processed_text, temp_path) |
|
elif output_format == 'pdf': |
|
temp_path = os.path.join(temp_dir, 'output.pdf') |
|
save_as_pdf(latest_processed_text, temp_path) |
|
elif output_format == 'jpg': |
|
temp_path = os.path.join(temp_dir, 'output.jpg') |
|
save_as_jpg(latest_processed_text, temp_path) |
|
else: |
|
return "Unsupported output format. Please choose docx, pdf, or jpg." |
|
|
|
if os.path.exists(temp_path): |
|
return temp_path |
|
else: |
|
return "Failed to create the output file." |
|
|
|
except Exception as e: |
|
logging.exception(f"Error saving document: {e}") |
|
return f"An error occurred while saving the document: {str(e)}" |
|
|
|
def clear_and_reset(): |
|
global latest_processed_text |
|
latest_processed_text = "" |
|
return ( |
|
None, |
|
"", |
|
"", |
|
"", |
|
"" |
|
) |
|
|
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("# PDF Cleaner, Improver, and Formatter with GPT-3.5-Turbo") |
|
gr.Markdown("Upload a PDF file, provide a task description, and iteratively refine the output. You can preview the processed text before saving.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
pdf_file = gr.File(label="Upload PDF", type="binary") |
|
task_description = gr.Textbox(label="Task Description", |
|
placeholder="e.g., 'Improve the formatting and clarity of the following document:'") |
|
iteration_feedback = gr.Textbox(label="Feedback for Iteration", |
|
placeholder="Describe the changes you want to make to the current version.") |
|
process_button = gr.Button("Process / Iterate") |
|
|
|
with gr.Column(): |
|
output_text = gr.Textbox(label="Processed Text", lines=10) |
|
output_format = gr.Dropdown(choices=['docx', 'pdf', 'jpg'], label="Output Format") |
|
save_button = gr.Button("Generate Download Link") |
|
download_link = gr.HTML(label="Download Link") |
|
clear_button = gr.Button("Clear and Reset") |
|
|
|
process_button.click( |
|
process_and_preview, |
|
inputs=[pdf_file, task_description, iteration_feedback], |
|
outputs=output_text |
|
) |
|
|
|
def generate_download_link(output_format): |
|
file_path = save_document(output_format) |
|
if file_path.startswith('temp/'): |
|
return f'<a href="file/{file_path}" target="_blank" download>Click here to download your file</a>' |
|
else: |
|
return file_path |
|
|
|
save_button.click( |
|
generate_download_link, |
|
inputs=output_format, |
|
outputs=download_link |
|
) |
|
|
|
clear_button.click( |
|
clear_and_reset, |
|
inputs=[], |
|
outputs=[pdf_file, task_description, iteration_feedback, output_text, download_link] |
|
) |
|
|
|
|
|
iface.launch() |