File size: 5,098 Bytes
26205f8
 
f539398
26205f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f539398
 
 
 
 
bbc8fc2
 
 
d08ca35
bbc8fc2
 
f539398
 
 
 
bbc8fc2
 
 
 
 
d08ca35
 
 
bbc8fc2
 
 
 
 
 
 
 
 
26205f8
 
 
 
 
 
 
 
 
 
d08ca35
bbc8fc2
 
 
 
 
e925d44
bbc8fc2
96403e9
bbc8fc2
0be785e
 
26205f8
 
 
 
 
 
e925d44
eb06eb9
3d02299
 
 
bbc8fc2
 
 
 
 
 
 
 
58b98dc
 
bbc8fc2
 
d08ca35
bbc8fc2
 
 
d08ca35
 
 
 
 
bbc8fc2
 
 
bbcd24f
bbc8fc2
 
 
7f2b4c3
bbc8fc2
 
 
 
 
 
 
 
 
d08ca35
bbc8fc2
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import json
import os
import re
from datetime import datetime
from pathlib import Path

import gradio as gr
from huggingface_hub import CommitScheduler, HfApi
from huggingface_hub.utils import HfHubHTTPError


HF_TOKEN = os.getenv("HF_TOKEN")

JSON_DATASET_DIR = Path("dataset")
JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True)
JSON_DATASET_PATH = JSON_DATASET_DIR / "dataset.jsonl"

scheduler = CommitScheduler(
    repo_id="librarian-bots/collection_cloner-usage-stats",
    repo_type="dataset",
    folder_path=JSON_DATASET_DIR,
    path_in_repo=str(JSON_DATASET_PATH),
    token=HF_TOKEN,
)


def save_json(source_slug: str, destination_slug: str) -> None:
    with scheduler.lock:
        with JSON_DATASET_PATH.open("a") as f:
            if source_slug.startswith("hf_"):  # catch people accidentally adding tokens
                return None
            if destination_slug.startswith("hf_"):
                return None
            json.dump(
                {
                    "source_collection": source_slug,
                    "destination_collection": destination_slug,
                    "datetime": datetime.now().isoformat(),
                },
                f,
            )
            f.write("\n")


def extract_slug(url):
    pattern = r"https://huggingface\.co/collections/(.*)"
    return match.group(1) if (match := re.search(pattern, url)) else None


def clone_collection(
    source_slug, dest_title, token, dest_namespace=None, private=False, exist_ok=False
):
    api = HfApi(token=token)
    source_slug = source_slug.strip()
    # check if formatted as url
    if source_slug.startswith("https://huggingface.co/collections/"):
        source_slug = extract_slug(source_slug)
    collection = api.get_collection(source_slug)
    if not collection:
        raise gr.Error(
            f"Collection {source_slug} does not exist or you do not have access to it."
        )
    description = f"Copied from {collection.title} using https://huggingface.co/spaces/librarian-bots/collection_cloner."
    if dest_namespace == "username":
        dest_namespace = None
    new_collection = api.create_collection(
        dest_title,
        namespace=dest_namespace,
        exists_ok=exist_ok,
        private=private,
        description=description,
        token=token,
    )
    for item in collection.items:
        try:
            api.add_collection_item(
                new_collection.slug, item.item_id, item_type=item.item_type
            )
        except HfHubHTTPError as e:
            gr.Info(
                f"Failed to add item {item.item_id} to collection {new_collection.slug} because it already exists in this collection."
            )
    if not private:
        save_json(collection.slug, new_collection.slug)
    return f"[Collection]({collection.url}) has been cloned into [{new_collection.slug}]({new_collection.url})"


title = (
    """<h1 style='text-align: center;'> &#129516; Collection Cloner &#129516;</h1>"""
)


with gr.Blocks(css="style.css") as demo:
    gr.HTML(title)
    gr.HTML(
        """<p style='text-align: center;'>
This space allows you to clone a <a href="https://huggingface.co/docs/hub/collections">Collection</a> from the Hugging Face Hub into your own namespace.<p>
<p style='text-align: center;'> You can edit this cloned Collection to your liking!</p>"""
    )
    gr.Markdown(
        """
                **Note**: To track interest in this feature this Space keeps a record of clones which are cloned into public collection. Clones into private Collections are not tracked."""
    )
    gr.Markdown("## Authentication")
    gr.Markdown(
        "Token is required to create a new collection and clone private collections. You can get your token from your [profile page](https://huggingface.co/settings/token)."
    )
    with gr.Row():
        token = gr.Textbox(
            label="Token",
            type="password",
        )
    with gr.Column():
        gr.Markdown("## Source Collection")
        source_slug = gr.Textbox(
            label="Source Collection slug or URL",
            placeholder="e.g. username/collection-slug",
        )
        gr.Markdown("## Destination Collection info")

        dest_title = gr.Textbox(
            label="Destination Title",
        )
        dest_namespace = gr.Textbox(
            value="username",
            label="Destination Namespace (optional - defaults to your username))",
            interactive=True,
        )
        with gr.Row():
            private = gr.Checkbox(
                False,
                label="Make new collection private?",
            )
            overwrite = gr.Checkbox(
                False,
                label="Overwrite any collection with same slug as the destination?",
            )
    submit_btn = gr.Button("Clone Collection")
    response = gr.Markdown()
    submit_btn.click(
        clone_collection,
        [
            source_slug,
            dest_title,
            token,
            dest_namespace,
            private,
            overwrite,
        ],
        response,
    )

demo.launch()