Spaces:
Sleeping
Sleeping
from concurrent.futures import ThreadPoolExecutor | |
import os | |
import requests | |
import shutil | |
import gradio as gr | |
from zipfile import ZipFile | |
import logging | |
from typing import IO | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
IIIF_URL = "https://lbiiif.riksarkivet.se" # "https://iiifintern.ra.se" | |
def get_image_ids(batch_id: str) -> list[str]: | |
"""A list of image IDs in the given batch""" | |
logging.info(f"Fetching image IDs for batch {batch_id}") | |
response = requests.get(f"{IIIF_URL}/arkis!{batch_id}/manifest") | |
response.raise_for_status() | |
response = response.json() | |
image_ids = [] | |
for item in response.get("items", []): | |
id_parts = item["id"].split("!") | |
if len(id_parts) > 1: | |
image_id = id_parts[1][:14] | |
image_ids.append(image_id) | |
else: | |
logging.warning(f"Unexpected id format: {item['id']}") | |
if not image_ids: | |
raise ValueError("No images found in the manifest.") | |
logging.info(f"Found {len(image_ids)} images in batch {batch_id}") | |
return image_ids | |
def download_image(url: str, dest: str) -> None: | |
""" | |
Download an image | |
Arguments: | |
url: Image url | |
dest: Destination file name | |
""" | |
logging.info(f"Downloading image from {url} to {dest}") | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(dest, "wb") as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
logging.info(f"Successfully downloaded image: {dest}") | |
else: | |
logging.error(f"Failed to download image from {url}. Status code: {response.status_code}") | |
raise Exception(f"Failed to download image from {url}. Status code: {response.status_code}") | |
del response | |
def download_image_by_image_id(image_id: str): | |
""" | |
Download the image with the given image ID | |
Creates a directory named after the batch ID and saves the image in | |
that directory. | |
""" | |
batch_id = image_id[:8] | |
os.makedirs(batch_id, exist_ok=True) | |
url = f"{IIIF_URL}/arkis!{image_id}/full/max/0/default.jpg" | |
dest = os.path.join(batch_id, image_id + ".jpg") | |
download_image(url, dest) | |
def rest_download_batch_images(batch_id: str) -> str: | |
image_ids = get_image_ids(batch_id) | |
def track_download(image_id): | |
download_image_by_image_id(image_id) | |
with ThreadPoolExecutor() as executor: | |
for _, image_id in enumerate(image_ids): | |
executor.submit(track_download, image_id) | |
zip_filename = f"{batch_id}.zip" | |
with ZipFile(zip_filename, 'w') as zipf: | |
for image_id in image_ids: | |
img_path = os.path.join(batch_id, f"{image_id}.jpg") | |
if os.path.exists(img_path): | |
zipf.write(img_path, arcname=os.path.basename(img_path)) | |
return zip_filename | |
def download_batch_images(batch_id: str, progress=None): | |
logging.info(f"Starting download for batch {batch_id}") | |
if progress is not None: | |
progress(0, desc=f"Starting download for {batch_id}...") | |
image_ids = get_image_ids(batch_id) | |
total_images = len(image_ids) | |
for idx, image_id in enumerate(image_ids): | |
download_image_by_image_id(image_id) | |
logging.info(f"Downloaded image {image_id}") | |
if progress is not None: | |
current_progress = (idx + 1) / total_images | |
progress(current_progress, desc=f"Downloading {image_id}...") | |
logging.info(f"Zipping downloaded images for batch {batch_id}") | |
zip_filename = f"{batch_id}.zip" | |
with ZipFile(zip_filename, 'w') as zipf: | |
for image_id in image_ids: | |
img_path = os.path.join(batch_id, f"{image_id}.jpg") | |
if os.path.exists(img_path): | |
zipf.write(img_path, arcname=os.path.basename(img_path)) | |
else: | |
logging.warning(f"Image {img_path} does not exist and will not be zipped.") | |
if progress is not None: | |
progress(1, desc=f"Completed {batch_id}") | |
logging.info(f"Completed download and zip for batch {batch_id}") | |
return zip_filename | |
def gradio_interface(batch_id_input, progress=gr.Progress()): | |
try: | |
zip_file = download_batch_images(batch_id_input, progress=progress) | |
return zip_file | |
except Exception as e: | |
logging.error(f"Error processing batch: {e}") | |
raise gr.Error(f"Error: {str(e)}") | |
def rest_gradio_interface(batch_id_input :str ) -> IO[bytes]: | |
try: | |
zip_file = rest_download_batch_images(batch_id_input) | |
return zip_file | |
except Exception as e: | |
logging.error(f"Error processing batch: {e}") | |
raise gr.Error(f"Error: {str(e)}") | |
with gr.Blocks() as app: | |
gr.Markdown("# IIIF Downloader") | |
with gr.Tab("Download Batch"): | |
with gr.Row(): | |
with gr.Column(): | |
batch_id_input = gr.Textbox(label="Batch ID", placeholder="Enter batch ID.") | |
download_button = gr.Button("Download Images") | |
with gr.Column(): | |
output_file = gr.File(label="Download Zip File") | |
download_button.click( | |
gradio_interface, | |
inputs=[batch_id_input], | |
outputs=[output_file] | |
) | |
download_button.click( | |
rest_gradio_interface, | |
api_name="iiif_rest_download" , | |
inputs=[batch_id_input], | |
outputs=[output_file] | |
) | |
with gr.Tab("Multiple Batches"): | |
gr.Markdown("WIP") | |
gr.Markdown("Make it possible to download batches to a huggingface account so it can be used through fastapi") | |
gr.Markdown("Will uses threading") | |
pass | |
with gr.Tab("How to use"): | |
gr.Markdown("WIP, instructional video") | |
pass | |
app.queue() | |
app.launch() |