Spaces:
Runtime error
Runtime error
import glob | |
import json | |
import os | |
import re | |
import subprocess | |
from typing import List | |
import requests | |
import pandas as pd | |
from bs4 import BeautifulSoup | |
from markdown import markdown | |
import nbformat | |
from nbconvert import MarkdownExporter | |
from nbconvert.preprocessors import Preprocessor, ClearOutputPreprocessor | |
from tqdm import tqdm | |
VALIDATE_URLS = False | |
def download_repositories(repo_urls_file: str, repo_dir: str): | |
""" | |
Downloads the Hugging Face repositories. | |
""" | |
if not os.path.exists(repo_dir): | |
os.makedirs(repo_dir) | |
with open(repo_urls_file, "r") as f: | |
repositories_urls = json.load(f)["urls"] | |
print(f'Downloading {len(repositories_urls)} repositories') | |
for url in repositories_urls: | |
try: | |
subprocess.run(["git", "clone", url], cwd=repo_dir) | |
except subprocess.CalledProcessError as e: | |
print("Command failed with error:", e.stderr) | |
class EmptyCellPreprocessor(Preprocessor): | |
def preprocess_cell(self, cell, resources, index): | |
if cell.source.strip() == '': | |
cell.source = '' | |
cell.cell_type = 'raw' | |
return cell, resources | |
def convert_notebook_to_txt(filename: str): | |
""" | |
Converts a notebook to a markdown file. | |
""" | |
with open(filename) as f: | |
notebook = nbformat.read(f, as_version=4) | |
# id validation error fix | |
for cell in notebook['cells']: | |
cell['id'] = str(cell['id']) | |
clear_output = ClearOutputPreprocessor() | |
notebook, resources = clear_output.preprocess(notebook, {}) | |
exporter = MarkdownExporter() | |
exporter.register_preprocessor(EmptyCellPreprocessor, enabled=True) | |
output_notebook_text, resources = exporter.from_notebook_node(notebook) | |
new_filename = filename.replace('.ipynb', '_ipynb.md') | |
with open(new_filename, 'w') as f: | |
f.write(output_notebook_text) | |
return new_filename | |
def extract_files_from_directories( | |
repo_urls_file: str, | |
repo_dir: str, | |
docs_dir: str, | |
files_extensions: List[str] | |
) -> None: | |
""" | |
This function reads markdown and markdownx files from the repositories directory, | |
filters out non-English files, and adds the source GitHub URL as the first line of each file. | |
The resulting files are saved in the docs_dir. | |
""" | |
languages = pd.read_csv("language-codes.csv").loc[:,"alpha2"].tolist() | |
languages.remove("en") | |
files = [ | |
filename | |
for extension in files_extensions | |
for filename in glob.glob(repo_dir + f"**/*{extension}", recursive=True) | |
] | |
print(f'Used extensions: {", ".join(files_extensions)}') | |
print(f'Found {len(files)} files') | |
repo_urls = [] | |
with open(repo_urls_file, "r") as f: | |
repo_urls = json.load(f)["urls"] | |
# filter out the files that are not in english | |
filtered_files = [] | |
for filename in files: | |
sep_file = filename.split("/") | |
for seq in sep_file: | |
if seq in languages: | |
break | |
else: | |
filtered_files.append(filename) | |
print(f'Found {len(filtered_files)} files in English') | |
# generate a GitHub URL for a file based on its name and a list of possible repository URLs | |
def get_github_url(filename: str, repo_urls: str, repo_dir: str) -> str: | |
source = filename.replace(repo_dir, '') | |
repo_name, file_path = source.split('/', 1) | |
repo_url_prefix = None | |
for repo_url in repo_urls: | |
if repo_name == repo_url.split('/')[-1]: | |
repo_url_prefix = repo_url | |
break | |
if not repo_url_prefix: | |
raise ValueError(f"Repo URL not found for {repo_name}") | |
url = f'{repo_url_prefix}/blob/main/{file_path}' | |
if VALIDATE_URLS: | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
except: | |
print(f'filename: {filename}') | |
print(f'repo: {repo_name}, file: {file_path}') | |
print(f'url: {url}') | |
raise | |
return url | |
# creates a valid filename by replacing certain characters and removing the repo_dir path | |
def create_filename_from_path(filename: str, repo_dir: str) -> str: | |
filename = filename.replace(repo_dir, '') | |
chars_to_replace = ['/', '{', '}', '-', '.'] | |
filename = ''.join(['_' if c in chars_to_replace else c for c in filename]) | |
return filename | |
# copy the files with the source added in the first line | |
if not os.path.exists(docs_dir): | |
os.makedirs(docs_dir) | |
copied_files = [] | |
for filename in tqdm(filtered_files): | |
source_url = get_github_url(filename, repo_urls, repo_dir) | |
data = f"source: {source_url}\n\n" | |
# convert jupyter notebooks to txt files | |
try: | |
if filename.endswith('.ipynb'): | |
filename = convert_notebook_to_txt(filename) | |
# rename and copy files | |
with open(filename, 'r') as f: | |
data += f.read() | |
output_filename = docs_dir + create_filename_from_path(filename, repo_dir) | |
with open(output_filename, 'w') as f: | |
f.write(data) | |
if not os.path.isfile(output_filename): | |
raise ValueError(f"Failed to create the output file: {output_filename}") | |
copied_files.append(output_filename) | |
except Exception as ex: | |
print(f'Failed to copy file {filename}: {ex}') | |
print(f'Successfully copied {len(set(copied_files))}/{len(filtered_files)} files') | |
def markdown_cleaner(data: str): | |
""" | |
Clean markdown text. | |
Args: | |
data (str): The markdown text to be cleaned. | |
Returns: | |
str: The cleaned markdown text. | |
""" | |
soupped = BeautifulSoup(markdown(data), "html.parser") | |
raw_text = ''.join(soupped.findAll(string=True)) | |
clean_text = re.sub(r"<!--.*?-->", "", raw_text, flags=re.DOTALL) | |
# remove any special tokens e.g <|endoftext|> | |
clean_text = re.sub(r"<\|endoftext\|>", "", clean_text, flags=re.DOTALL) | |
# discard non english text | |
clean_text = re.sub(r"[^a-zA-Z0-9\s]", "", clean_text, flags=re.DOTALL) | |
return "\n".join([t for t in clean_text.split("\n") if t]) | |
if __name__ == '__main__': | |
# repo_urls_file = "./datasets/hf_repositories_urls.json" | |
repo_urls_file = "./datasets/hf_repositories_urls_scraped.json" | |
repo_dir = "./datasets/huggingface_repositories/" | |
docs_dir = "./datasets/huggingface_docs/" | |
download_repositories(repo_urls_file, repo_dir) | |
extract_files_from_directories( | |
repo_urls_file, repo_dir, docs_dir, | |
files_extensions=['.md', '.mdx', '.ipynb'] | |
) | |