File size: 3,034 Bytes
749d1d8
 
5d9e0b8
749d1d8
 
285612d
61f9cd0
749d1d8
67a3546
 
 
 
 
 
 
 
285612d
5ec6657
749d1d8
 
 
32235fd
8ddb605
67a3546
 
32235fd
749d1d8
5d9e0b8
 
 
 
749d1d8
47ad458
749d1d8
 
 
67a3546
285612d
67a3546
bcf2055
5d9e0b8
 
bcf2055
5d9e0b8
bcf2055
5d9e0b8
 
0f45b88
 
5d9e0b8
e77b07f
5d9e0b8
285612d
 
b65cbe6
285612d
 
67a3546
 
 
 
5ec6657
285612d
 
67a3546
 
 
 
 
 
 
 
 
 
 
 
 
5d9e0b8
285612d
5d9e0b8
285612d
5d9e0b8
 
 
 
 
 
e69ab4e
285612d
 
 
 
 
 
 
5d9e0b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import time
from datetime import datetime

import pandas as pd
import schedule
from datasets import Dataset

from utilities.user_defined_functions import (
    get_latest_data,
    merge_data,
    load_or_create_dataset,
    remove_filtered_rows,
    load_or_create_comment_dataset
)

from utilities.my_logger import setup_logger
from utilities.readme_update import update_dataset_readme

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"

dataset_readme_path = "README.md"

frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
    raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]

logger = setup_logger(__name__)

def upload(new_df, dataset, hf_dataset_name):
    date = datetime.now().strftime('%Y-%m-%d')
    
    # Using dataset from hub
    if 'train' in dataset.keys():
        old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
        df = merge_data(old_df=old_df, new_df=new_df)
        new_rows = len(df) - len(old_df)
    # New dataset
    else:
        df = new_df
        df['new'] = True
        df['updated'] = False
        new_rows = len(new_df)
    df = remove_filtered_rows(df)
    dataset['train'] = Dataset.from_pandas(df, preserve_index=False)

    # Update README
    logger.info(f"Adding {new_rows} rows for {date}.")

    # Push the augmented dataset to the Hugging Face hub
    logger.debug(f"Pushing data for {date} to {hf_dataset_name}")
    dataset.push_to_hub(hf_dataset_name, token=auth_token)
    logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}")
    update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows)
    logger.info(f"Updated README.")


def main():
    date = datetime.now().strftime('%Y-%m-%d')
    
    logger.warning(f"Running main function for date: {date}")
    sub_dataset = load_or_create_dataset()
    new_df, new_df_comment = get_latest_data()
    
    upload(new_df, sub_dataset, dataset_name)
    
    comment_dataset = load_or_create_comment_dataset()
    upload(new_df_comment, comment_dataset, comment_dataset_name)


def schedule_periodic_task():
    """
    Schedule the main task to run at the user-defined frequency
    """
    if frequency == 'hourly':
        logger.info(f'Scheduling tasks to run every hour at the top of the hour')
        schedule.every().hour.at(":00").do(main)
    elif frequency == 'daily':
        start_time = '05:00'
        logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
        schedule.every().day.at(start_time).do(main)

    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    schedule_periodic_task()