import os import cv2 import numpy as np from PIL import Image, ImageDraw import ffmpeg import gradio as gr from tqdm import tqdm import zstandard as zstd import brotli import torch import torchvision.transforms as transforms from torch.nn import functional as F import cupy as cp import io import mimetypes from pydub import AudioSegment from PyPDF2 import PdfFileReader, PdfFileWriter import docx import openpyxl class GPUAcceleratedCompressionToolkit: def __init__(self): self.supported_formats = { 'image': ['.jpg', '.jpeg', '.png', '.bmp', '.tiff'], 'video': ['.mp4', '.avi', '.mov', '.mkv'], 'audio': ['.mp3', '.wav', '.ogg', '.flac'], 'document': ['.txt', '.pdf', '.doc', '.docx'], 'spreadsheet': ['.xlsx', '.xls', '.csv'] } self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def detect_file_type(self, file_path): mime_type, _ = mimetypes.guess_type(file_path) if mime_type: type_category = mime_type.split('/')[0] if type_category in ['image', 'video', 'audio']: return type_category elif type_category == 'application': if mime_type in ['application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document']: return 'document' elif mime_type in ['application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']: return 'spreadsheet' return 'other' def compress_image(self, input_path, output_path, compression_level, use_gpu=False, output_format='original'): img = Image.open(input_path) original_format = img.format if img.format else 'JPEG' if output_format == 'original': save_format = original_format else: save_format = output_format.upper() quality = int(compression_level) if use_gpu: tensor = transforms.ToTensor()(img).unsqueeze(0).to(self.device) compressed = F.interpolate(tensor, scale_factor=quality/100, mode='bilinear', align_corners=False) compressed = F.interpolate(compressed, size=tensor.shape[2:], mode='bilinear', align_corners=False) result = transforms.ToPILImage()(compressed.squeeze(0).cpu()) result.save(output_path, format=save_format, quality=quality) else: img.save(output_path, format=save_format, quality=quality) def compress_video(self, input_path, output_path, compression_level, use_gpu=False, output_format=None): if use_gpu: vcodec = 'h264_nvenc' else: vcodec = 'libx264' crf = int(100 - compression_level) # Invert the scale for CRF if output_format is None: output_format = os.path.splitext(input_path)[1][1:] ( ffmpeg .input(input_path) .output(output_path, vcodec=vcodec, crf=str(crf), acodec='aac', **{'preset': 'slow'}) .overwrite_output() .run(capture_stdout=True, capture_stderr=True) ) def compress_audio(self, input_path, output_path, compression_level, output_format=None): if output_format is None: output_format = os.path.splitext(input_path)[1][1:] bitrate = f"{int(compression_level * 3.2)}k" # Scale compression_level to bitrate audio = AudioSegment.from_file(input_path) audio.export(output_path, format=output_format, bitrate=bitrate) def compress_document(self, input_path, output_path, compression_level, output_format=None): if output_format is None: output_format = os.path.splitext(input_path)[1][1:] if output_format == 'pdf': with open(input_path, 'rb') as file: reader = PdfFileReader(file) writer = PdfFileWriter() for page in range(reader.getNumPages()): page = reader.getPage(page) page.compressContentStreams() # This is CPU intensive! writer.addPage(page) with open(output_path, 'wb') as output_file: writer.write(output_file) elif output_format in ['doc', 'docx']: doc = docx.Document(input_path) doc.save(output_path) else: # For other document types, use generic file compression self.compress_file_gpu(input_path, output_path, compression_level) def compress_spreadsheet(self, input_path, output_path, compression_level, output_format=None): if output_format is None: output_format = os.path.splitext(input_path)[1][1:] wb = openpyxl.load_workbook(input_path) wb.save(output_path) def compress_file_gpu(self, input_path, output_path, compression_level): level = int(compression_level / 10) # Scale compression_level to Zstandard level with open(input_path, 'rb') as f_in: data = f_in.read() d_data = cp.asarray(bytearray(data)) cctx = zstd.ZstdCompressor(level=level) d_compressed = cp.asarray(bytearray(cctx.compress(d_data.get()))) compressed = d_compressed.get().tobytes() with open(output_path, 'wb') as f_out: f_out.write(compressed) def batch_compress_gpu(self, input_files, output_dir, use_gpu, output_format, compression_level): if not os.path.exists(output_dir): os.makedirs(output_dir) results = [] for file in tqdm(input_files): input_path = file.name file_type = self.detect_file_type(input_path) # Determine output format and path if output_format == 'original': _, ext = os.path.splitext(input_path) output_path = os.path.join(output_dir, f"compressed_{os.path.basename(input_path)}") else: output_path = os.path.join(output_dir, f"compressed_{os.path.splitext(os.path.basename(input_path))[0]}.{output_format}") if file_type == 'image': self.compress_image(input_path, output_path, compression_level, use_gpu=use_gpu, output_format=output_format) elif file_type == 'video': self.compress_video(input_path, output_path, compression_level, use_gpu=use_gpu, output_format=output_format if output_format != 'original' else None) elif file_type == 'audio': self.compress_audio(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) elif file_type == 'document': self.compress_document(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) elif file_type == 'spreadsheet': self.compress_spreadsheet(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) else: self.compress_file_gpu(input_path, output_path, compression_level) results.append(output_path) return results def real_time_preview_gpu(self, input_path, compression_level, use_gpu=False): file_type = self.detect_file_type(input_path) if file_type == 'image': img = Image.open(input_path) if use_gpu: tensor = transforms.ToTensor()(img).unsqueeze(0).to(self.device) tensor = F.interpolate(tensor, size=(300, 300), mode='bilinear', align_corners=False) compressed = F.interpolate(tensor, scale_factor=compression_level/100, mode='bilinear', align_corners=False) compressed = F.interpolate(compressed, size=tensor.shape[2:], mode='bilinear', align_corners=False) result = transforms.ToPILImage()(compressed.squeeze(0).cpu()) else: img = img.resize((300, 300)) buffer = io.BytesIO() img.save(buffer, format='JPEG', quality=int(compression_level)) buffer.seek(0) result = Image.open(buffer) return result elif file_type == 'video': video = cv2.VideoCapture(input_path) ret, frame = video.read() if ret: if use_gpu: d_frame = cp.asarray(frame) d_frame = cp.resize(d_frame, (300, 300)) _, d_buffer = cv2.imencode('.jpg', cp.asnumpy(d_frame), [cv2.IMWRITE_JPEG_QUALITY, int(compression_level)]) else: frame = cv2.resize(frame, (300, 300)) _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, int(compression_level)]) return Image.open(io.BytesIO(buffer.tobytes())) elif file_type in ['audio', 'document', 'spreadsheet', 'other']: placeholder = Image.new('RGB', (300, 300), color='lightgray') draw = ImageDraw.Draw(placeholder) draw.text((10, 150), f"{file_type.capitalize()} Preview\nNot Available", fill='black') return placeholder return None def gradio_interface(toolkit): def process_files(files, use_gpu, output_format, compression_level): output_dir = "compressed_output" return toolkit.batch_compress_gpu(files, output_dir, use_gpu, output_format, compression_level) def update_preview(file, compression_level, use_gpu): if file is None: return None return toolkit.real_time_preview_gpu(file.name, compression_level, use_gpu) iface = gr.Interface( fn=process_files, inputs=[ gr.File(label="Input Files", file_count="multiple"), gr.Checkbox(label="Use GPU Acceleration"), gr.Dropdown( choices=["original", "jpg", "png", "mp4", "mp3", "pdf", "docx", "xlsx"], label="Output Format", value="original" ), gr.Slider(1, 100, 50, step=1, label="Compression Level") ], outputs=gr.File(label="Compressed Files", file_count="multiple"), title="GPU-Accelerated Compression Toolkit", description="Drag and drop files for compression of images, videos, audio, documents, spreadsheets, and other files. File type is automatically detected.", allow_flagging="never" ) preview = gr.Interface( fn=update_preview, inputs=[ gr.File(label="Input File"), gr.Slider(1, 100, 50, step=1, label="Compression Level"), gr.Checkbox(label="Use GPU Acceleration") ], outputs=gr.Image(label="Preview"), title="Real-time Compression Preview", live=True, allow_flagging="never" ) return gr.TabbedInterface([iface, preview], ["Compress", "Preview"]) if __name__ == "__main__": toolkit = GPUAcceleratedCompressionToolkit() interface = gradio_interface(toolkit) interface.launch(share=True)