Spaces:
Sleeping
Sleeping
File size: 5,791 Bytes
741a077 73c784b d1d1d97 741a077 d1d1d97 73c784b d543428 d7f3fa0 73c784b d1d1d97 d7f3fa0 73c784b c3b1fcc a361cda d1d1d97 73c784b d1d1d97 73c784b c6194f8 a361cda 73c784b a874957 73c784b d7f3fa0 73c784b 741a077 a361cda d1d1d97 73c784b a361cda a874957 dcc55fe a361cda dcc55fe d1d1d97 a361cda d543428 dcc55fe d1d1d97 73c784b c6194f8 73c784b a361cda a874957 dcc55fe d1d1d97 73c784b d543428 73c784b a361cda 73c784b d543428 a361cda 741a077 73c784b 741a077 73c784b 741a077 73c784b a874957 741a077 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
from concurrent.futures import ThreadPoolExecutor
import os
import requests
import shutil
import gradio as gr
from zipfile import ZipFile
import logging
from typing import IO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
IIIF_URL = "https://lbiiif.riksarkivet.se" # "https://iiifintern.ra.se"
def get_image_ids(batch_id: str) -> list[str]:
"""A list of image IDs in the given batch"""
logging.info(f"Fetching image IDs for batch {batch_id}")
response = requests.get(f"{IIIF_URL}/arkis!{batch_id}/manifest")
response.raise_for_status()
response = response.json()
image_ids = []
for item in response.get("items", []):
id_parts = item["id"].split("!")
if len(id_parts) > 1:
image_id = id_parts[1][:14]
image_ids.append(image_id)
else:
logging.warning(f"Unexpected id format: {item['id']}")
if not image_ids:
raise ValueError("No images found in the manifest.")
logging.info(f"Found {len(image_ids)} images in batch {batch_id}")
return image_ids
def download_image(url: str, dest: str) -> None:
"""
Download an image
Arguments:
url: Image url
dest: Destination file name
"""
logging.info(f"Downloading image from {url} to {dest}")
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(dest, "wb") as out_file:
shutil.copyfileobj(response.raw, out_file)
logging.info(f"Successfully downloaded image: {dest}")
else:
logging.error(f"Failed to download image from {url}. Status code: {response.status_code}")
raise Exception(f"Failed to download image from {url}. Status code: {response.status_code}")
del response
def download_image_by_image_id(image_id: str):
"""
Download the image with the given image ID
Creates a directory named after the batch ID and saves the image in
that directory.
"""
batch_id = image_id[:8]
os.makedirs(batch_id, exist_ok=True)
url = f"{IIIF_URL}/arkis!{image_id}/full/max/0/default.jpg"
dest = os.path.join(batch_id, image_id + ".jpg")
download_image(url, dest)
def rest_download_batch_images(batch_id: str) -> str:
image_ids = get_image_ids(batch_id)
def track_download(image_id):
download_image_by_image_id(image_id)
with ThreadPoolExecutor() as executor:
for _, image_id in enumerate(image_ids):
executor.submit(track_download, image_id)
zip_filename = f"{batch_id}.zip"
with ZipFile(zip_filename, 'w') as zipf:
for image_id in image_ids:
img_path = os.path.join(batch_id, f"{image_id}.jpg")
if os.path.exists(img_path):
zipf.write(img_path, arcname=os.path.basename(img_path))
return zip_filename
def download_batch_images(batch_id: str, progress=None):
logging.info(f"Starting download for batch {batch_id}")
if progress is not None:
progress(0, desc=f"Starting download for {batch_id}...")
image_ids = get_image_ids(batch_id)
total_images = len(image_ids)
for idx, image_id in enumerate(image_ids):
download_image_by_image_id(image_id)
logging.info(f"Downloaded image {image_id}")
if progress is not None:
current_progress = (idx + 1) / total_images
progress(current_progress, desc=f"Downloading {image_id}...")
logging.info(f"Zipping downloaded images for batch {batch_id}")
zip_filename = f"{batch_id}.zip"
with ZipFile(zip_filename, 'w') as zipf:
for image_id in image_ids:
img_path = os.path.join(batch_id, f"{image_id}.jpg")
if os.path.exists(img_path):
zipf.write(img_path, arcname=os.path.basename(img_path))
else:
logging.warning(f"Image {img_path} does not exist and will not be zipped.")
if progress is not None:
progress(1, desc=f"Completed {batch_id}")
logging.info(f"Completed download and zip for batch {batch_id}")
return zip_filename
def gradio_interface(batch_id_input, progress=gr.Progress()):
try:
zip_file = download_batch_images(batch_id_input, progress=progress)
return zip_file
except Exception as e:
logging.error(f"Error processing batch: {e}")
raise gr.Error(f"Error: {str(e)}")
def rest_gradio_interface(batch_id_input :str ) -> IO[bytes]:
try:
zip_file = rest_download_batch_images(batch_id_input)
return zip_file
except Exception as e:
logging.error(f"Error processing batch: {e}")
raise gr.Error(f"Error: {str(e)}")
with gr.Blocks() as app:
gr.Markdown("# IIIF Downloader")
with gr.Tab("Download Batch"):
with gr.Row():
with gr.Column():
batch_id_input = gr.Textbox(label="Batch ID", placeholder="Enter batch ID.")
download_button = gr.Button("Download Images")
with gr.Column():
output_file = gr.File(label="Download Zip File")
download_button.click(
gradio_interface,
inputs=[batch_id_input],
outputs=[output_file]
)
download_button.click(
rest_gradio_interface,
api_name="iiif_rest_download" ,
inputs=[batch_id_input],
outputs=[output_file]
)
with gr.Tab("Multiple Batches"):
gr.Markdown("WIP")
gr.Markdown("Make it possible to download batches to a huggingface account so it can be used through fastapi")
gr.Markdown("Will uses threading")
pass
with gr.Tab("How to use"):
gr.Markdown("WIP, instructional video")
pass
app.queue()
app.launch() |