import os from datetime import datetime import json import pandas as pd from datasets import Dataset, DatasetDict, load_dataset, DownloadMode from huggingface_hub import login from utilities.data_processing import data_processing from utilities.my_logger import setup_logger from utilities.praw_downloader import praw_downloader from utilities.praw_processor import preprocess_praw_data, preprocess_praw_comment_data # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/reddit-{subreddit}" comment_dataset_name = f"{username}/reddit-comments-{subreddit}" frequency = os.environ.get("FREQUENCY", '').lower() if frequency not in ["daily", "hourly"]: raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") # Authenticate with Hugging Face using an auth token auth_token = os.environ["HF_TOKEN"] login(auth_token, add_to_git_credential=True) logger = setup_logger(__name__) # Dummy row for when we create a new repo make sure to put everything in a list dummy_data = { "id": ['id'], "content": ["This is a sample post content. Just for demonstration purposes!"], "poster": ["sampleUser123"], "date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')], "flair": ["Discussion"], "title": ["Sample Post Title: How to Use Hugging Face?"], "score": [457], "permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"], "updated": [False], "new": [False], "nsfw": [False] } dummy_comment_data = { "id": ['id'], "content": ["This is a sample post content. Just for demonstration purposes!"], "poster": ["sampleUser123"], "date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')], "flair": ["Discussion"], "title": ["Sample Post Title: How to Use Hugging Face?"], "ups": [457], "score": [457], "permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"], "updated": [False], "new": [False], "depth": [2], "link_id": ["eqrkhgbjeh"], "submission_id": ["eqrkhgbjeh"] } def load_or_create_dataset(): """ Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist. This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found, it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it. After reloading, the dummy data is removed from the dataset. Returns: dataset (DatasetDict): The loaded or newly created dataset. Raises: FileNotFoundError: If the dataset cannot be loaded or created. """ # Load the existing dataset from the Hugging Face hub or create a new one try: logger.debug(f"Trying to download {dataset_name}") dataset = load_dataset(dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD) logger.debug("Loading existing dataset") except FileNotFoundError: logger.warning("Creating new dataset") # Creating Initial Repo dataset = DatasetDict() dataset['train'] = Dataset.from_dict(dummy_data) dataset.push_to_hub(repo_id=dataset_name, token=auth_token) # Pulling from Initial Repo dataset = load_dataset(dataset_name) # Remove dummy data del dataset['train'] return dataset def load_or_create_comment_dataset(): # Load the existing dataset from the Hugging Face hub or create a new one try: logger.debug(f"Trying to download {comment_dataset_name}") dataset = load_dataset(comment_dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD) logger.debug("Loading existing comment dataset") except FileNotFoundError: logger.warning("Creating new comment dataset") # Creating Initial Repo dataset = DatasetDict() dataset['train'] = Dataset.from_dict(dummy_comment_data) dataset.push_to_hub(repo_id=comment_dataset_name, token=auth_token) # Pulling from Initial Repo dataset = load_dataset(comment_dataset_name) # Remove dummy data del dataset['train'] return dataset def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame: """ Merges two dataframes, sorts them by 'date_utc', and marks new IDs. The function first marks rows from the new dataframe, then concatenates the old and new dataframes. It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not in the old dataframe are marked as 'new'. Args: - old_df (pd.DataFrame): The original dataframe. - new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe. Returns: - pd.DataFrame: The merged, sorted, and marked dataframe. """ old_df.drop(columns=['new', 'updated'], inplace=True) # Concatenate old and new dataframes, sort by 'date_utc', and reset index df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True) # Process data accordingly df = data_processing(df) # Identify new rows (present in new_df but not in old_df) df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id'])) return df def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame: """ Removes rows from the DataFrame where the 'id' is present in filter_ids.json. :param df: Input DataFrame to be filtered. :return: DataFrame with rows containing IDs present in filter_ids.json removed. """ # Load filter IDs from JSON file with open('filter_ids.json', 'r') as file: filter_ids = json.load(file) # Remove the rows with IDs present in filter_ids filtered_df = df[~df['id'].isin(filter_ids)] logger.info(f"Filtered {len(df) - len(filtered_df)} rows from the DataFrame") return filtered_df def get_latest_data(): submissions, comments = praw_downloader() df = preprocess_praw_data(submissions=submissions) df_comments = preprocess_praw_comment_data(comments=comments) return df, df_comments