|
import os |
|
import time |
|
from datetime import datetime, timedelta |
|
|
|
import pandas as pd |
|
from datasets import Dataset, DatasetDict, load_dataset |
|
from huggingface_hub import login |
|
|
|
from my_logger import setup_logger |
|
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe |
|
from utilities.readme_update import update_readme |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-{subreddit}" |
|
dataset_readme_path = "README.md" |
|
|
|
|
|
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] |
|
login(auth_token, add_to_git_credential=True) |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
def main(dataset, date_to_fetch): |
|
""" |
|
Runs the main data processing function to fetch and process subreddit data for the specified date. |
|
|
|
Args: |
|
dataset (datasets.DatasetDict): The Hugging Face dataset to fetch and process subreddit data for. |
|
date_to_fetch (str): The date to fetch subreddit data for, in YYYY-MM-DD format. |
|
|
|
Returns: |
|
most_recent_date (str): The most recent date in the updated dataset. |
|
""" |
|
|
|
logger.info(f"Fetching data for {str(date_to_fetch)}") |
|
submissions = scrape_submissions_by_day(subreddit, str(date_to_fetch)) |
|
df = submissions_to_dataframe(submissions) |
|
logger.debug(f"Data fetched for {str(date_to_fetch)}") |
|
most_recent_date = date_to_fetch |
|
|
|
|
|
if "all_days" in dataset: |
|
logger.debug("Appending data to split 'all_days'") |
|
|
|
old_data = dataset['all_days'].to_pandas() |
|
new_data = pd.concat([old_data, df], ignore_index=True) |
|
if '__index_level_0__' in new_data.columns: |
|
new_data = new_data.drop('__index_level_0__', axis=1) |
|
|
|
|
|
new_data = new_data.drop_duplicates(subset=['id'], keep="first") |
|
|
|
|
|
old_data_most_recent_date = old_data['date'].max() |
|
old_data_most_recent_date = datetime.strptime(old_data_most_recent_date, '%Y-%m-%d').date() |
|
most_recent_date = max(old_data_most_recent_date, most_recent_date) |
|
|
|
if len(old_data) == len(new_data): |
|
logger.warning("Data in hub is much more recent, using that next!") |
|
return most_recent_date |
|
|
|
|
|
dataset["all_days"] = Dataset.from_pandas(new_data) |
|
|
|
|
|
update_readme(dataset_name, subreddit, date_to_fetch) |
|
else: |
|
logger.debug("Creating new split 'all_days'") |
|
dataset["all_days"] = Dataset.from_pandas(df) |
|
|
|
logger.debug("Appended or created split 'all_days'") |
|
|
|
|
|
logger.debug(f"Pushing data for {date_to_fetch} to the Hugging Face hub") |
|
dataset.push_to_hub(dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub") |
|
return most_recent_date |
|
|
|
|
|
def run_main_continuously(): |
|
""" |
|
This function runs the given `main_function` continuously, starting from the date specified |
|
in the environment variable "START_DATE" until two days ago. Once it reaches two days ago, |
|
it will wait until tomorrow to start again at the same time as when it started today. |
|
""" |
|
start_date_str = os.environ.get("START_DATE") |
|
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date() |
|
|
|
|
|
start_time = datetime.now().time() |
|
|
|
|
|
try: |
|
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True) |
|
logger.debug("Loading existing dataset") |
|
if "__index_level_0__" in dataset["all_days"].column_names: |
|
dataset = dataset.remove_columns(["__index_level_0__"]) |
|
except FileNotFoundError: |
|
logger.warning("Creating new dataset") |
|
dataset = DatasetDict() |
|
|
|
while True: |
|
today = datetime.now().date() |
|
two_days_ago = today - timedelta(days=2) |
|
|
|
if start_date <= two_days_ago: |
|
logger.warning(f"Running main function for date: {start_date}") |
|
most_recent_date = main(dataset, start_date) |
|
start_date = most_recent_date + timedelta(days=1) |
|
else: |
|
tomorrow = today + timedelta(days=1) |
|
now = datetime.now() |
|
start_of_tomorrow = datetime.combine(tomorrow, start_time) |
|
wait_until_tomorrow = (start_of_tomorrow - now).total_seconds() |
|
logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds") |
|
time.sleep(wait_until_tomorrow) |
|
|
|
|
|
if __name__ == '__main__': |
|
run_main_continuously() |
|
|