import os import time from datetime import datetime import pandas as pd import schedule from datasets import Dataset from utilities.user_defined_functions import ( get_latest_data, merge_data, load_or_create_dataset, remove_filtered_rows, load_or_create_comment_dataset ) from utilities.my_logger import setup_logger from utilities.readme_update import update_dataset_readme # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/reddit-{subreddit}" comment_dataset_name = f"{username}/reddit-comments-{subreddit}" dataset_readme_path = "README.md" frequency = os.environ.get("FREQUENCY", '').lower() if frequency not in ["daily", "hourly"]: raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") # Authenticate with Hugging Face using an auth token auth_token = os.environ["HF_TOKEN"] logger = setup_logger(__name__) def upload(new_df, dataset, hf_dataset_name): date = datetime.now().strftime('%Y-%m-%d') subset = f"year_{datetime.now().year}" if 'train' in dataset.keys(): old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() df = merge_data(old_df=old_df, new_df=new_df) new_rows = len(df) - len(old_df) # New dataset else: df = new_df df['new'] = True df['updated'] = False new_rows = len(new_df) df = remove_filtered_rows(df) dataset['train'] = Dataset.from_pandas(df, preserve_index=False) # Update README logger.info(f"Adding {new_rows} rows for {date}.") # Push the augmented dataset to the Hugging Face hub logger.debug(f"Pushing data for {date} to {hf_dataset_name}") dataset.push_to_hub(hf_dataset_name, subset, token=auth_token) logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}") update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows) logger.info(f"Updated README.") def main(): date = datetime.now().strftime('%Y-%m-%d') logger.warning(f"Running main function for date: {date}") sub_dataset = load_or_create_dataset() new_df, new_df_comment = get_latest_data() upload(new_df, sub_dataset, dataset_name) del sub_dataset, new_df import gc gc.collect() comment_dataset = load_or_create_comment_dataset() upload(new_df_comment, comment_dataset, comment_dataset_name) def schedule_periodic_task(): """ Schedule the main task to run at the user-defined frequency """ if frequency == 'hourly': logger.info(f'Scheduling tasks to run every hour at the top of the hour') schedule.every().hour.at(":00").do(main) elif frequency == 'daily': start_time = '05:00' logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00') schedule.every().day.at(start_time).do(main) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_periodic_task()