alvanli
add comment ds
67a3546
raw
history blame
3.03 kB
import os
import time
from datetime import datetime
import pandas as pd
import schedule
from datasets import Dataset
from utilities.user_defined_functions import (
get_latest_data,
merge_data,
load_or_create_dataset,
remove_filtered_rows,
load_or_create_comment_dataset
)
from utilities.my_logger import setup_logger
from utilities.readme_update import update_dataset_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
dataset_readme_path = "README.md"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]
logger = setup_logger(__name__)
def upload(new_df, dataset, hf_dataset_name):
date = datetime.now().strftime('%Y-%m-%d')
# Using dataset from hub
if 'train' in dataset.keys():
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
df = merge_data(old_df=old_df, new_df=new_df)
new_rows = len(df) - len(old_df)
# New dataset
else:
df = new_df
df['new'] = True
df['updated'] = False
new_rows = len(new_df)
df = remove_filtered_rows(df)
dataset['train'] = Dataset.from_pandas(df, preserve_index=False)
# Update README
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to {hf_dataset_name}")
dataset.push_to_hub(hf_dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}")
update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows)
logger.info(f"Updated README.")
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
sub_dataset = load_or_create_dataset()
new_df, new_df_comment = get_latest_data()
upload(new_df, sub_dataset, dataset_name)
comment_dataset = load_or_create_comment_dataset()
upload(new_df_comment, comment_dataset, comment_dataset_name)
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
if frequency == 'hourly':
logger.info(f'Scheduling tasks to run every hour at the top of the hour')
schedule.every().hour.at(":00").do(main)
elif frequency == 'daily':
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()