File size: 5,791 Bytes
741a077
73c784b
 
 
 
 
d1d1d97
741a077
 
d1d1d97
 
73c784b
d543428
d7f3fa0
73c784b
 
d1d1d97
d7f3fa0
73c784b
 
c3b1fcc
 
 
 
 
 
 
 
a361cda
 
d1d1d97
 
73c784b
 
 
 
 
 
 
 
d1d1d97
73c784b
c6194f8
 
 
 
 
 
a361cda
73c784b
 
a874957
73c784b
 
 
 
 
 
 
d7f3fa0
73c784b
 
 
741a077
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a361cda
d1d1d97
73c784b
a361cda
a874957
dcc55fe
a361cda
 
 
 
dcc55fe
d1d1d97
a361cda
d543428
dcc55fe
 
d1d1d97
73c784b
 
 
 
c6194f8
 
 
 
73c784b
a361cda
a874957
dcc55fe
d1d1d97
73c784b
 
d543428
73c784b
a361cda
 
73c784b
d543428
a361cda
741a077
 
 
 
 
 
 
 
 
73c784b
 
741a077
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73c784b
741a077
 
 
73c784b
 
a874957
741a077
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from concurrent.futures import ThreadPoolExecutor
import os
import requests
import shutil
import gradio as gr
from zipfile import ZipFile
import logging
from typing import IO


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

IIIF_URL = "https://lbiiif.riksarkivet.se"  # "https://iiifintern.ra.se"

def get_image_ids(batch_id: str) -> list[str]:
    """A list of image IDs in the given batch"""
    logging.info(f"Fetching image IDs for batch {batch_id}")
    response = requests.get(f"{IIIF_URL}/arkis!{batch_id}/manifest")
    response.raise_for_status()
    response = response.json()
    image_ids = []
    for item in response.get("items", []):
        id_parts = item["id"].split("!")
        if len(id_parts) > 1:
            image_id = id_parts[1][:14]
            image_ids.append(image_id)
        else:
            logging.warning(f"Unexpected id format: {item['id']}")
    if not image_ids:
        raise ValueError("No images found in the manifest.")
    logging.info(f"Found {len(image_ids)} images in batch {batch_id}")
    return image_ids

def download_image(url: str, dest: str) -> None:
    """
    Download an image
    Arguments:
        url: Image url
        dest: Destination file name
    """
    logging.info(f"Downloading image from {url} to {dest}")
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(dest, "wb") as out_file:
            shutil.copyfileobj(response.raw, out_file)
        logging.info(f"Successfully downloaded image: {dest}")
    else:
        logging.error(f"Failed to download image from {url}. Status code: {response.status_code}")
        raise Exception(f"Failed to download image from {url}. Status code: {response.status_code}")
    del response

def download_image_by_image_id(image_id: str):
    """
    Download the image with the given image ID
    Creates a directory named after the batch ID and saves the image in
    that directory.
    """
    batch_id = image_id[:8]
    os.makedirs(batch_id, exist_ok=True)
    url = f"{IIIF_URL}/arkis!{image_id}/full/max/0/default.jpg"
    dest = os.path.join(batch_id, image_id + ".jpg")
    download_image(url, dest)

def rest_download_batch_images(batch_id: str) -> str:

    image_ids = get_image_ids(batch_id)


    def track_download(image_id):
        download_image_by_image_id(image_id)

    with ThreadPoolExecutor() as executor:
        for _, image_id in enumerate(image_ids):
            executor.submit(track_download, image_id)

    zip_filename = f"{batch_id}.zip"
    with ZipFile(zip_filename, 'w') as zipf:
        for image_id in image_ids:
            img_path = os.path.join(batch_id, f"{image_id}.jpg")
            if os.path.exists(img_path):
                zipf.write(img_path, arcname=os.path.basename(img_path))

    return zip_filename


def download_batch_images(batch_id: str, progress=None):
    logging.info(f"Starting download for batch {batch_id}")

    if progress is not None:
        progress(0, desc=f"Starting download for {batch_id}...")

    image_ids = get_image_ids(batch_id)
    total_images = len(image_ids)

    for idx, image_id in enumerate(image_ids):
        download_image_by_image_id(image_id)
        logging.info(f"Downloaded image {image_id}")
        if progress is not None:
            current_progress = (idx + 1) / total_images
            progress(current_progress, desc=f"Downloading {image_id}...")

    logging.info(f"Zipping downloaded images for batch {batch_id}")
    zip_filename = f"{batch_id}.zip"
    with ZipFile(zip_filename, 'w') as zipf:
        for image_id in image_ids:
            img_path = os.path.join(batch_id, f"{image_id}.jpg")
            if os.path.exists(img_path):
                zipf.write(img_path, arcname=os.path.basename(img_path))
            else:
                logging.warning(f"Image {img_path} does not exist and will not be zipped.")

    if progress is not None:
        progress(1, desc=f"Completed {batch_id}")

    logging.info(f"Completed download and zip for batch {batch_id}")
    return zip_filename

def gradio_interface(batch_id_input, progress=gr.Progress()):
    try:
        zip_file = download_batch_images(batch_id_input, progress=progress)
        return zip_file 
    except Exception as e:
        logging.error(f"Error processing batch: {e}")
        raise gr.Error(f"Error: {str(e)}")
    
def rest_gradio_interface(batch_id_input :str ) -> IO[bytes]:
    try:
        zip_file = rest_download_batch_images(batch_id_input)
        return zip_file 
    except Exception as e:
        logging.error(f"Error processing batch: {e}")
        raise gr.Error(f"Error: {str(e)}")


with gr.Blocks() as app:
    gr.Markdown("# IIIF Downloader")

    with gr.Tab("Download Batch"):

        with gr.Row():
            with gr.Column():
                batch_id_input = gr.Textbox(label="Batch ID", placeholder="Enter batch ID.")
                download_button = gr.Button("Download Images")

            with gr.Column():
                output_file = gr.File(label="Download Zip File")

        download_button.click(
            gradio_interface,
            inputs=[batch_id_input],
            outputs=[output_file]
        )

        download_button.click(
            rest_gradio_interface,
            api_name="iiif_rest_download" ,
            inputs=[batch_id_input],
            outputs=[output_file]
        )

    with gr.Tab("Multiple Batches"):
        gr.Markdown("WIP")
        gr.Markdown("Make it possible to download batches to a huggingface account so it can be used through fastapi")
        gr.Markdown("Will uses threading")

        pass


    with gr.Tab("How to use"):
        gr.Markdown("WIP, instructional video")
        pass


app.queue()
app.launch()