import os import time from datetime import datetime, timedelta import pandas as pd from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import login from my_logger import setup_logger from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe from utilities.readme_update import update_readme # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/dataset-creator-{subreddit}" dataset_readme_path = "README.md" # Authenticate with Hugging Face using an auth token auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] login(auth_token, add_to_git_credential=True) logger = setup_logger(__name__) def main(dataset, date_to_fetch): """ Runs the main data processing function to fetch and process subreddit data for the specified date. Args: dataset (datasets.DatasetDict): The Hugging Face dataset to fetch and process subreddit data for. date_to_fetch (str): The date to fetch subreddit data for, in YYYY-MM-DD format. Returns: most_recent_date (str): The most recent date in the updated dataset. """ # Call get_subreddit_day with the calculated date logger.info(f"Fetching data for {str(date_to_fetch)}") submissions = scrape_submissions_by_day(subreddit, str(date_to_fetch)) df = submissions_to_dataframe(submissions) logger.debug(f"Data fetched for {str(date_to_fetch)}") most_recent_date = date_to_fetch # Append DataFrame to split 'all_days' or create new split if "all_days" in dataset: logger.debug("Appending data to split 'all_days'") # Merge the new submissions old_data = dataset['all_days'].to_pandas() new_data = pd.concat([old_data, df], ignore_index=True) if '__index_level_0__' in new_data.columns: new_data = new_data.drop('__index_level_0__', axis=1) # Drop duplicates just in case new_data = new_data.drop_duplicates(subset=['id'], keep="first") # Figure out dates when we restart old_data_most_recent_date = old_data['date'].max() old_data_most_recent_date = datetime.strptime(old_data_most_recent_date, '%Y-%m-%d').date() most_recent_date = max(old_data_most_recent_date, most_recent_date) if len(old_data) == len(new_data): logger.warning("Data in hub is much more recent, using that next!") return most_recent_date # Convert back to dataset dataset["all_days"] = Dataset.from_pandas(new_data) # Update README update_readme(dataset_name, subreddit, date_to_fetch) else: logger.debug("Creating new split 'all_days'") dataset["all_days"] = Dataset.from_pandas(df) # Log appending or creating split 'all' logger.debug("Appended or created split 'all_days'") # Push the augmented dataset to the Hugging Face hub logger.debug(f"Pushing data for {date_to_fetch} to the Hugging Face hub") dataset.push_to_hub(dataset_name, token=auth_token) logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub") return most_recent_date def run_main_continuously(): """ This function runs the given `main_function` continuously, starting from the date specified in the environment variable "START_DATE" until two days ago. Once it reaches two days ago, it will wait until tomorrow to start again at the same time as when it started today. """ start_date_str = os.environ.get("START_DATE") start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date() # Calculate the start time for running the main_function every day. start_time = datetime.now().time() # Load the existing dataset from the Hugging Face hub or create a new one try: dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True) logger.debug("Loading existing dataset") if "__index_level_0__" in dataset["all_days"].column_names: dataset = dataset.remove_columns(["__index_level_0__"]) except FileNotFoundError: logger.warning("Creating new dataset") dataset = DatasetDict() while True: today = datetime.now().date() two_days_ago = today - timedelta(days=2) if start_date <= two_days_ago: logger.warning(f"Running main function for date: {start_date}") most_recent_date = main(dataset, start_date) start_date = most_recent_date + timedelta(days=1) else: tomorrow = today + timedelta(days=1) now = datetime.now() start_of_tomorrow = datetime.combine(tomorrow, start_time) wait_until_tomorrow = (start_of_tomorrow - now).total_seconds() logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds") time.sleep(wait_until_tomorrow) if __name__ == '__main__': run_main_continuously()