|
import os |
|
import time |
|
from datetime import datetime |
|
|
|
import pandas as pd |
|
import schedule |
|
from datasets import Dataset |
|
|
|
from utilities.user_defined_functions import ( |
|
get_latest_data, |
|
merge_data, |
|
load_or_create_dataset, |
|
remove_filtered_rows, |
|
load_or_create_comment_dataset |
|
) |
|
|
|
from utilities.my_logger import setup_logger |
|
from utilities.readme_update import update_dataset_readme |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/reddit-{subreddit}" |
|
comment_dataset_name = f"{username}/reddit-comments-{subreddit}" |
|
|
|
dataset_readme_path = "README.md" |
|
|
|
frequency = os.environ.get("FREQUENCY", '').lower() |
|
if frequency not in ["daily", "hourly"]: |
|
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") |
|
|
|
|
|
auth_token = os.environ["HF_TOKEN"] |
|
|
|
logger = setup_logger(__name__) |
|
|
|
def upload(new_df, dataset, hf_dataset_name): |
|
date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
if 'train' in dataset.keys(): |
|
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() |
|
df = merge_data(old_df=old_df, new_df=new_df) |
|
new_rows = len(df) - len(old_df) |
|
|
|
else: |
|
df = new_df |
|
df['new'] = True |
|
df['updated'] = False |
|
new_rows = len(new_df) |
|
df = remove_filtered_rows(df) |
|
dataset['train'] = Dataset.from_pandas(df, preserve_index=False) |
|
|
|
|
|
logger.info(f"Adding {new_rows} rows for {date}.") |
|
|
|
|
|
logger.debug(f"Pushing data for {date} to {hf_dataset_name}") |
|
dataset.push_to_hub(hf_dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}") |
|
update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows) |
|
logger.info(f"Updated README.") |
|
|
|
|
|
def main(): |
|
date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
logger.warning(f"Running main function for date: {date}") |
|
sub_dataset = load_or_create_dataset() |
|
new_df, new_df_comment = get_latest_data() |
|
|
|
upload(new_df, sub_dataset, dataset_name) |
|
|
|
comment_dataset = load_or_create_comment_dataset() |
|
upload(new_df_comment, comment_dataset, comment_dataset_name) |
|
|
|
|
|
def schedule_periodic_task(): |
|
""" |
|
Schedule the main task to run at the user-defined frequency |
|
""" |
|
if frequency == 'hourly': |
|
logger.info(f'Scheduling tasks to run every hour at the top of the hour') |
|
schedule.every().hour.at(":00").do(main) |
|
elif frequency == 'daily': |
|
start_time = '05:00' |
|
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00') |
|
schedule.every().day.at(start_time).do(main) |
|
|
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
schedule_periodic_task() |
|
|