derek-thomas's picture
derek-thomas HF staff
Adding more logging for new rows
b65cbe6
raw
history blame
2.68 kB
import os
import time
from datetime import datetime, timedelta
import pandas as pd
import schedule
from datasets import DatasetDict, load_dataset, Dataset
from huggingface_hub import login
from utilities.data_collator import merge_and_filter_data
from utilities.my_logger import setup_logger
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def get_dataset():
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True)
logger.debug("Loading existing dataset")
if "__index_level_0__" in dataset["train"].column_names:
dataset = dataset.remove_columns(["__index_level_0__"])
except FileNotFoundError:
logger.warning("Creating new dataset")
dataset = DatasetDict()
return dataset
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
dataset = get_dataset()
# Get Latest Data and merge with historic data
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
new_df = merge_and_filter_data(old_df=old_df)
dataset['train'] = Dataset.from_pandas(new_df, preserve_index=False)
# Update README
new_rows = len(new_df) - len(old_df)
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows)
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to the Hugging Face hub")
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")
def schedule_daily_task():
"""
Schedule the daily_task to run at the specific time every day.
"""
# start_time = (datetime.now() + timedelta(minutes=1)).time().strftime('%H:%M') # Now + 30 seconds
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time}')
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_daily_task()