Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from typing import List, Optional | |
import gradio as gr | |
from git_commd import GitCommandWrapper | |
HF_TOKEN = os.environ["HF_TOKEN"] if "HF_TOKEN" in os.environ else "" | |
WiseModel_TOKEN = os.environ["WM_TOKEN"] if "WM_TOKEN" in os.environ else "" | |
GIT_USER = os.environ["GIT_USER"] if "GIT_USER" in os.environ else "" | |
GIT_EMAIL = os.environ["GIT_EMAIL"] if "GIT_EMAIL" in os.environ else "" | |
def get_cache_dir(): | |
from random_word import RandomWords | |
r = RandomWords() | |
return r.get_random_word() | |
def check_disk(): | |
import os | |
return os.system("df -h /") | |
def init_git_user(username: str, email: str): | |
if not username or not email: | |
raise gr.Error( | |
"Please config your git username and email in the environment variables." | |
) | |
os.system(f"git config --global user.email {email}") | |
os.system(f"git config --global user.name {username}") | |
def pull_from_wisemodel( | |
token: str, url: str, repo_name: str, cache_dir, branch: Optional[str] = None | |
): | |
print("pull_from_wisemodel start") | |
print(cache_dir) | |
os.makedirs(cache_dir, exist_ok=True) | |
# os.system("cd "+cache_dir) | |
gitCmd = GitCommandWrapper() | |
isLfs = gitCmd.is_lfs_installed() | |
if not isLfs: | |
gitCmd.git_lfs_install() | |
gitCmd.clone(cache_dir, token, url, repo_name, branch) | |
print("pull_from_wisemodel end") | |
return f"Pulled {branch} to temp folder {cache_dir}: {url}" | |
def push_to_wiseModel( | |
token: str, | |
url: str, | |
repo_name: str, | |
cache_dir: str, | |
hf_repo_id: str, | |
branch: Optional[str] = None, | |
): | |
print("push_to_wiseModel start") | |
gitCmd = GitCommandWrapper() | |
isLfs = gitCmd.is_lfs_installed() | |
if not isLfs: | |
gitCmd.git_lfs_install() | |
gitCmd.clone(cache_dir, token, url, repo_name, branch) | |
repo_dir = f"./{cache_dir}/{repo_name}" | |
os.makedirs(repo_dir, exist_ok=True) | |
source_dir = f"./{cache_dir}/hf/{hf_repo_id}" | |
move_file(source_dir, repo_dir) | |
# move_file(source_dir, repo_dir, excludes=[".gitattributes", ".git"]) | |
gitCmd.add(repo_dir, all_files=True) | |
gitCmd.commit(repo_dir, message="commit from hf to wisemodel") | |
gitCmd.push(repo_dir, token, url, branch, branch) | |
print("push_to_wiseModel end") | |
return f"Pushed {branch} to {url}" | |
def move_file(source: str, destination: str, excludes: list[str] = []): | |
import shutil | |
try: | |
# move all files in the source directory to the destination directory | |
# list all files in the source directory | |
files = os.listdir(source) | |
for file in files: | |
if file in excludes: | |
continue | |
# if file already exists in the destination directory, remove it | |
if os.path.exists(f"{destination}/{file}"): | |
print(f"Removing {file} from {destination}") | |
# incase is a directory, remove it recursively | |
if os.path.isdir(f"{destination}/{file}"): | |
# force remove the directory and all its contents | |
shutil.rmtree(f"{destination}/{file}", ignore_errors=True) | |
else: | |
os.remove(f"{destination}/{file}") | |
print(f"Moving {file} to {destination}") | |
# move each file to destination Directory, if it already exists, it will be replaced | |
shutil.move(f"{source}/{file}", destination) | |
except Exception as e: | |
logging.exception(e) | |
return f"Error moving files from {source} to {destination}, {e.args[0]}" | |
return "file moved" | |
def remove_file(cache_dir, repo_name): | |
import os | |
try: | |
os.remove(f"{cache_dir}/{repo_name}") | |
except: | |
return "" | |
return "README.md file removed" | |
def push_to_hf(cache_dir, WiseModel_repo_name, hf_repo_id): | |
from huggingface_hub import HfApi | |
if not HF_TOKEN: | |
raise gr.Error("Please enter your HF_TOKEN") | |
print("push_to_hf start") | |
api = HfApi(token=HF_TOKEN) # Token is not persisted on the machine. | |
output = api.upload_folder( | |
folder_path=f"{cache_dir}/{WiseModel_repo_name}", | |
repo_id=hf_repo_id, | |
repo_type="model", | |
) | |
print("push_to_hf end") | |
return f"Pushed to {hf_repo_id}" | |
def pull_from_hf(cache_dir, hf_repo_id): | |
from huggingface_hub import HfApi | |
if not HF_TOKEN: | |
raise gr.Error("Please enter your HF_TOKEN") | |
print("pull_from_hf start") | |
api = HfApi(token=HF_TOKEN) # Token is not persisted on the machine. | |
output = api.snapshot_download( | |
repo_id=hf_repo_id, | |
repo_type="model", | |
local_dir=cache_dir + "/hf/" + hf_repo_id, | |
local_dir_use_symlinks=False, | |
) | |
print(f"pull_from_hf end, output: {output}") | |
return f"Pulled from {hf_repo_id}" | |
def handle(wisemodel_link, hf_repo_id): | |
cache_dir = get_cache_dir() | |
wiseModel_repo_url = ( | |
wisemodel_link.replace(".git", "") | |
.replace("git", "") | |
.replace("clone", "") | |
.replace(" ", "") | |
) | |
wiseModel_repo_info = ( | |
wisemodel_link.replace(".git", "") | |
.replace("git", "") | |
.replace("clone", "") | |
.replace(" ", "") | |
.split("/") | |
) | |
print(wiseModel_repo_info) | |
wisemodel_repo_name = wiseModel_repo_info[-1] | |
stages = [ | |
(check_disk, (), {}), | |
# # Run all the sanity checks on README.md | |
# (pull_from_wisemodel, (WiseModel_TOKEN,wiseModel_repo_url,wisemodel_repo_name, cache_dir,"main"), {}), | |
# (remove_file, (wisemodel_repo_name, cache_dir), {}), | |
# (check_disk, (), {}), | |
# (push_to_hf, (cache_dir, wisemodel_repo_name, hf_repo_id), {}), | |
# (check_disk, (), {}), | |
(init_git_user, (GIT_USER, GIT_EMAIL), {}), | |
(pull_from_hf, (cache_dir, hf_repo_id), {}), | |
(remove_file, (hf_repo_id, cache_dir), {}), | |
(check_disk, (), {}), | |
( | |
push_to_wiseModel, | |
( | |
WiseModel_TOKEN, | |
wiseModel_repo_url, | |
wisemodel_repo_name, | |
cache_dir, | |
hf_repo_id, | |
"main", | |
), | |
{}, | |
), | |
(check_disk, (), {}), | |
] | |
results = [] | |
errors = [] | |
for func, args, kwargs in stages: | |
try: | |
results.append(str(func(*args, **kwargs))) | |
except Exception as e: | |
logging.exception(e) | |
errors.append(str(e)) | |
if errors: | |
break | |
return "\n\n".join(results), "\n\n".join(errors) | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# HF-to-wisemodel/从Huggingface上拉取模型、数据集等到wisemodel | |
# 这是一个示例Space,实际使用请参考下面说明,先Duplicate一个私有的space | |
- 这个space可以把已经发布在Huggingface上的模型拉取到wisemodel上。 | |
- This space uploads model from Huggingface to wisemodel. | |
- **请确认您是repo的拥有者或者有权限操作!** | |
- **Please make sure that you're the owner of the repo or have permission from the owner to do so!** | |
# 如何使用这个空间? | |
# How to use this Space? | |
- 点击右上角settings后面的“…”按钮,选择“Duplicate this Space”创建一个私有的space,同时输入wisemodel的token(必填),以及Huggingface的token(必填),确保有相应repo写入的权限。你还需要输入你的git用户名和邮箱。 | |
- Duplicate this Space and providing WiseModel token (mandatory) and your read/write HF token (mandatory).you also need to provide your git username and email to push the model to WiseModel. | |
- 在wiseModel上创建一个空的repo,这一步需要手动完成,Space不会为您创建一个空的repo。 | |
- Create your target model repo on WiseModel. This step is not automated. | |
- 在刚刚自己创建的私有space里填写相应的信息,wisemodel的git clone链接,以及Huggingface的repo名称。 | |
- In your own private Space, fill in information below. | |
- 点击submit按钮,然后可以通过logs按钮查看进度。 | |
- Click submit then watch for output in container log for progress. | |
- Create README.md file (since the metadata is not compatible with HF) | |
""" | |
) | |
wisemodel_link = gr.Textbox( | |
label="Copy the git download link from the model detail page of wisemodel(从wisemodel上获取该模型的完整git clone链接) " | |
) | |
hf_repo_id = gr.Textbox( | |
label="Source HF Model Repo ID (case sensitive). \nPlease make sure that this model has already been created" | |
) | |
with gr.Row(): | |
button = gr.Button("Submit", variant="primary") | |
clear = gr.Button("Clear") | |
error = gr.Textbox(label="Error") | |
output = gr.Textbox(label="Output") | |
button.click(handle, [wisemodel_link, hf_repo_id], [output, error]) | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |