File size: 5,116 Bytes
749d1d8 ed3130d 749d1d8 32235fd 749d1d8 f50c7ba 749d1d8 f50c7ba 749d1d8 f50c7ba 749d1d8 9de4dba 749d1d8 130902a ffea6b6 749d1d8 130902a 749d1d8 f71f130 749d1d8 9de4dba 04654e7 9de4dba 749d1d8 04654e7 749d1d8 130902a 749d1d8 130902a 749d1d8 130902a 749d1d8 1b56724 f50c7ba 749d1d8 130902a f50c7ba 749d1d8 1b56724 749d1d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import time
from datetime import datetime, timedelta
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login
from my_logger import setup_logger
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-{subreddit}"
dataset_readme_path = "README.md"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def main(dataset, date_to_fetch):
"""
Runs the main data processing function to fetch and process subreddit data for the specified date.
Args:
dataset (datasets.DatasetDict): The Hugging Face dataset to fetch and process subreddit data for.
date_to_fetch (str): The date to fetch subreddit data for, in YYYY-MM-DD format.
Returns:
most_recent_date (str): The most recent date in the updated dataset.
"""
# Call get_subreddit_day with the calculated date
logger.info(f"Fetching data for {str(date_to_fetch)}")
submissions = scrape_submissions_by_day(subreddit, str(date_to_fetch))
df = submissions_to_dataframe(submissions)
logger.debug(f"Data fetched for {str(date_to_fetch)}")
most_recent_date = date_to_fetch
# Append DataFrame to split 'all_days' or create new split
if "all_days" in dataset:
logger.debug("Appending data to split 'all_days'")
# Merge the new submissions
old_data = dataset['all_days'].to_pandas()
new_data = pd.concat([old_data, df], ignore_index=True)
if '__index_level_0__' in new_data.columns:
new_data = new_data.drop('__index_level_0__', axis=1)
# Drop duplicates just in case
new_data = new_data.drop_duplicates(subset=['id'], keep="first")
# Figure out dates when we restart
old_data_most_recent_date = old_data['date'].max()
old_data_most_recent_date = datetime.strptime(old_data_most_recent_date, '%Y-%m-%d').date()
most_recent_date = max(old_data_most_recent_date, most_recent_date)
if len(old_data) == len(new_data):
logger.warning("Data in hub is much more recent, using that next!")
return most_recent_date
# Convert back to dataset
dataset["all_days"] = Dataset.from_pandas(new_data)
# Update README
update_readme(dataset_name, subreddit, date_to_fetch)
else:
logger.debug("Creating new split 'all_days'")
dataset["all_days"] = Dataset.from_pandas(df)
# Log appending or creating split 'all'
logger.debug("Appended or created split 'all_days'")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date_to_fetch} to the Hugging Face hub")
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub")
return most_recent_date
def run_main_continuously():
"""
This function runs the given `main_function` continuously, starting from the date specified
in the environment variable "START_DATE" until two days ago. Once it reaches two days ago,
it will wait until tomorrow to start again at the same time as when it started today.
"""
start_date_str = os.environ.get("START_DATE")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
# Calculate the start time for running the main_function every day.
start_time = datetime.now().time()
dataset = get_dataset()
while True:
today = datetime.now().date()
two_days_ago = today - timedelta(days=2)
if start_date <= two_days_ago:
logger.warning(f"Running main function for date: {start_date}")
most_recent_date = main(dataset, start_date)
start_date = most_recent_date + timedelta(days=1)
else:
tomorrow = today + timedelta(days=1)
now = datetime.now()
start_of_tomorrow = datetime.combine(tomorrow, start_time)
wait_until_tomorrow = (start_of_tomorrow - now).total_seconds()
logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds")
time.sleep(wait_until_tomorrow)
def get_dataset():
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True)
logger.debug("Loading existing dataset")
if "__index_level_0__" in dataset["all_days"].column_names:
dataset = dataset.remove_columns(["__index_level_0__"])
except FileNotFoundError:
logger.warning("Creating new dataset")
dataset = DatasetDict()
return dataset
if __name__ == '__main__':
run_main_continuously()
|