alvanli
commited on
Commit
•
67a3546
1
Parent(s):
ce087e5
add comment ds
Browse files- main.py +29 -12
- utilities/praw_downloader.py +23 -4
- utilities/praw_processor.py +19 -0
- utilities/user_defined_functions.py +44 -3
main.py
CHANGED
@@ -6,7 +6,14 @@ import pandas as pd
|
|
6 |
import schedule
|
7 |
from datasets import Dataset
|
8 |
|
9 |
-
from utilities.user_defined_functions import
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
10 |
from utilities.my_logger import setup_logger
|
11 |
from utilities.readme_update import update_dataset_readme
|
12 |
|
@@ -14,6 +21,8 @@ from utilities.readme_update import update_dataset_readme
|
|
14 |
subreddit = os.environ["SUBREDDIT"]
|
15 |
username = os.environ["USERNAME"]
|
16 |
dataset_name = f"{username}/reddit-{subreddit}"
|
|
|
|
|
17 |
dataset_readme_path = "README.md"
|
18 |
|
19 |
frequency = os.environ.get("FREQUENCY", '').lower()
|
@@ -25,14 +34,9 @@ auth_token = os.environ["HF_TOKEN"]
|
|
25 |
|
26 |
logger = setup_logger(__name__)
|
27 |
|
28 |
-
|
29 |
-
def main():
|
30 |
date = datetime.now().strftime('%Y-%m-%d')
|
31 |
-
|
32 |
-
dataset = load_or_create_dataset()
|
33 |
-
|
34 |
-
new_df = get_latest_data()
|
35 |
-
|
36 |
# Using dataset from hub
|
37 |
if 'train' in dataset.keys():
|
38 |
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
|
@@ -51,13 +55,26 @@ def main():
|
|
51 |
logger.info(f"Adding {new_rows} rows for {date}.")
|
52 |
|
53 |
# Push the augmented dataset to the Hugging Face hub
|
54 |
-
logger.debug(f"Pushing data for {date} to
|
55 |
-
dataset.push_to_hub(
|
56 |
-
logger.info(f"Processed and pushed data for {date} to
|
57 |
-
update_dataset_readme(dataset_name=
|
58 |
logger.info(f"Updated README.")
|
59 |
|
60 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
61 |
def schedule_periodic_task():
|
62 |
"""
|
63 |
Schedule the main task to run at the user-defined frequency
|
|
|
6 |
import schedule
|
7 |
from datasets import Dataset
|
8 |
|
9 |
+
from utilities.user_defined_functions import (
|
10 |
+
get_latest_data,
|
11 |
+
merge_data,
|
12 |
+
load_or_create_dataset,
|
13 |
+
remove_filtered_rows,
|
14 |
+
load_or_create_comment_dataset
|
15 |
+
)
|
16 |
+
|
17 |
from utilities.my_logger import setup_logger
|
18 |
from utilities.readme_update import update_dataset_readme
|
19 |
|
|
|
21 |
subreddit = os.environ["SUBREDDIT"]
|
22 |
username = os.environ["USERNAME"]
|
23 |
dataset_name = f"{username}/reddit-{subreddit}"
|
24 |
+
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
|
25 |
+
|
26 |
dataset_readme_path = "README.md"
|
27 |
|
28 |
frequency = os.environ.get("FREQUENCY", '').lower()
|
|
|
34 |
|
35 |
logger = setup_logger(__name__)
|
36 |
|
37 |
+
def upload(new_df, dataset, hf_dataset_name):
|
|
|
38 |
date = datetime.now().strftime('%Y-%m-%d')
|
39 |
+
|
|
|
|
|
|
|
|
|
40 |
# Using dataset from hub
|
41 |
if 'train' in dataset.keys():
|
42 |
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
|
|
|
55 |
logger.info(f"Adding {new_rows} rows for {date}.")
|
56 |
|
57 |
# Push the augmented dataset to the Hugging Face hub
|
58 |
+
logger.debug(f"Pushing data for {date} to {hf_dataset_name}")
|
59 |
+
dataset.push_to_hub(hf_dataset_name, token=auth_token)
|
60 |
+
logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}")
|
61 |
+
update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows)
|
62 |
logger.info(f"Updated README.")
|
63 |
|
64 |
|
65 |
+
def main():
|
66 |
+
date = datetime.now().strftime('%Y-%m-%d')
|
67 |
+
|
68 |
+
logger.warning(f"Running main function for date: {date}")
|
69 |
+
sub_dataset = load_or_create_dataset()
|
70 |
+
new_df, new_df_comment = get_latest_data()
|
71 |
+
|
72 |
+
upload(new_df, sub_dataset, dataset_name)
|
73 |
+
|
74 |
+
comment_dataset = load_or_create_comment_dataset()
|
75 |
+
upload(new_df_comment, comment_dataset, comment_dataset_name)
|
76 |
+
|
77 |
+
|
78 |
def schedule_periodic_task():
|
79 |
"""
|
80 |
Schedule the main task to run at the user-defined frequency
|
utilities/praw_downloader.py
CHANGED
@@ -1,6 +1,6 @@
|
|
1 |
import os
|
2 |
from datetime import datetime
|
3 |
-
from typing import Any, Dict, List
|
4 |
|
5 |
import praw
|
6 |
|
@@ -37,8 +37,24 @@ def extract_submission_data(submission: praw.models.Submission) -> Dict[str, Any
|
|
37 |
"nsfw": submission.over_18,
|
38 |
}
|
39 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
40 |
|
41 |
-
def praw_downloader() -> List[Dict[str, str]]:
|
42 |
"""Main function to extract and save all submissions from the subreddit."""
|
43 |
reddit = get_reddit_instance()
|
44 |
subreddit = reddit.subreddit(subreddit_var)
|
@@ -46,13 +62,16 @@ def praw_downloader() -> List[Dict[str, str]]:
|
|
46 |
logger.info(f'Starting to fetch submissions from {os.getenv("SUBREDDIT")}.')
|
47 |
|
48 |
submissions = []
|
|
|
49 |
for submission in subreddit.new(limit=reddit_pull_limit): # Set limit=None to get all posts
|
50 |
# logger.debug(f'Processing post {submission.id} - {submission.title}')
|
51 |
data = extract_submission_data(submission)
|
|
|
|
|
52 |
submissions.append(data)
|
53 |
|
54 |
-
logger.info(f'Finished downloading {len(submissions)} submissions
|
55 |
-
return submissions
|
56 |
|
57 |
|
58 |
if __name__ == "__main__":
|
|
|
1 |
import os
|
2 |
from datetime import datetime
|
3 |
+
from typing import Any, Dict, List, Tuple
|
4 |
|
5 |
import praw
|
6 |
|
|
|
37 |
"nsfw": submission.over_18,
|
38 |
}
|
39 |
|
40 |
+
def extract_comment_data(comment: praw.models.Comment) -> Dict[str, Any]:
|
41 |
+
"""Extract and return relevant data from a given Reddit comment"""
|
42 |
+
return {
|
43 |
+
'content': comment.body,
|
44 |
+
'poster': str(comment.author),
|
45 |
+
'date_utc': datetime.utcfromtimestamp(comment.created_utc).strftime('%Y-%m-%d %H:%M:%S'),
|
46 |
+
'flair': comment.author_flair_text,
|
47 |
+
'ups': comment.ups,
|
48 |
+
'score': comment.score,
|
49 |
+
'permalink': comment.permalink,
|
50 |
+
'depth': comment.depth,
|
51 |
+
'link_id': comment.link_id,
|
52 |
+
'submission_id': comment._submission.id,
|
53 |
+
'id': comment.id
|
54 |
+
}
|
55 |
+
|
56 |
|
57 |
+
def praw_downloader() -> Tuple[List[Dict[str, str]]]:
|
58 |
"""Main function to extract and save all submissions from the subreddit."""
|
59 |
reddit = get_reddit_instance()
|
60 |
subreddit = reddit.subreddit(subreddit_var)
|
|
|
62 |
logger.info(f'Starting to fetch submissions from {os.getenv("SUBREDDIT")}.')
|
63 |
|
64 |
submissions = []
|
65 |
+
comments = []
|
66 |
for submission in subreddit.new(limit=reddit_pull_limit): # Set limit=None to get all posts
|
67 |
# logger.debug(f'Processing post {submission.id} - {submission.title}')
|
68 |
data = extract_submission_data(submission)
|
69 |
+
for comment in submission.comments.list():
|
70 |
+
comments.append(extract_comment_data(comment))
|
71 |
submissions.append(data)
|
72 |
|
73 |
+
logger.info(f'Finished downloading {len(submissions)} submissions, {len(comments)} comments')
|
74 |
+
return submissions, comments
|
75 |
|
76 |
|
77 |
if __name__ == "__main__":
|
utilities/praw_processor.py
CHANGED
@@ -33,3 +33,22 @@ def preprocess_praw_data(submissions: List[Dict]) -> pd.DataFrame:
|
|
33 |
praw_df['id'] = praw_df.permalink.str.split('/').str[4]
|
34 |
|
35 |
return praw_df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
33 |
praw_df['id'] = praw_df.permalink.str.split('/').str[4]
|
34 |
|
35 |
return praw_df
|
36 |
+
|
37 |
+
def preprocess_praw_comment_data(comments: List[Dict]) -> pd.DataFrame:
|
38 |
+
"""
|
39 |
+
Preprocesses praw comment data into a DataFrame.
|
40 |
+
|
41 |
+
Parameters:
|
42 |
+
- submissions: List of submission dictionaries.
|
43 |
+
|
44 |
+
Returns:
|
45 |
+
- pd.DataFrame: Preprocessed DataFrame.
|
46 |
+
"""
|
47 |
+
|
48 |
+
# Convert the submissions list to a DataFrame
|
49 |
+
praw_df = pd.DataFrame(comments)
|
50 |
+
|
51 |
+
# Convert 'date' column to datetime format
|
52 |
+
praw_df.date_utc = pd.to_datetime(praw_df.date_utc)
|
53 |
+
|
54 |
+
return praw_df
|
utilities/user_defined_functions.py
CHANGED
@@ -9,12 +9,13 @@ from huggingface_hub import login
|
|
9 |
from utilities.data_processing import data_processing
|
10 |
from utilities.my_logger import setup_logger
|
11 |
from utilities.praw_downloader import praw_downloader
|
12 |
-
from utilities.praw_processor import preprocess_praw_data
|
13 |
|
14 |
# Set dataset name, path to README.md, and existing dataset details
|
15 |
subreddit = os.environ["SUBREDDIT"]
|
16 |
username = os.environ["USERNAME"]
|
17 |
dataset_name = f"{username}/reddit-{subreddit}"
|
|
|
18 |
|
19 |
frequency = os.environ.get("FREQUENCY", '').lower()
|
20 |
if frequency not in ["daily", "hourly"]:
|
@@ -41,6 +42,23 @@ dummy_data = {
|
|
41 |
"nsfw": [False]
|
42 |
}
|
43 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
44 |
|
45 |
def load_or_create_dataset():
|
46 |
"""
|
@@ -77,6 +95,28 @@ def load_or_create_dataset():
|
|
77 |
return dataset
|
78 |
|
79 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
80 |
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
|
81 |
"""
|
82 |
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
|
@@ -125,6 +165,7 @@ def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame:
|
|
125 |
|
126 |
|
127 |
def get_latest_data():
|
128 |
-
submissions = praw_downloader()
|
129 |
df = preprocess_praw_data(submissions=submissions)
|
130 |
-
|
|
|
|
9 |
from utilities.data_processing import data_processing
|
10 |
from utilities.my_logger import setup_logger
|
11 |
from utilities.praw_downloader import praw_downloader
|
12 |
+
from utilities.praw_processor import preprocess_praw_data, preprocess_praw_comment_data
|
13 |
|
14 |
# Set dataset name, path to README.md, and existing dataset details
|
15 |
subreddit = os.environ["SUBREDDIT"]
|
16 |
username = os.environ["USERNAME"]
|
17 |
dataset_name = f"{username}/reddit-{subreddit}"
|
18 |
+
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
|
19 |
|
20 |
frequency = os.environ.get("FREQUENCY", '').lower()
|
21 |
if frequency not in ["daily", "hourly"]:
|
|
|
42 |
"nsfw": [False]
|
43 |
}
|
44 |
|
45 |
+
dummy_comment_data = {
|
46 |
+
"id": ['id'],
|
47 |
+
"content": ["This is a sample post content. Just for demonstration purposes!"],
|
48 |
+
"poster": ["sampleUser123"],
|
49 |
+
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
|
50 |
+
"flair": ["Discussion"],
|
51 |
+
"title": ["Sample Post Title: How to Use Hugging Face?"],
|
52 |
+
"ups": [457],
|
53 |
+
"score": [457],
|
54 |
+
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
|
55 |
+
"updated": [False],
|
56 |
+
"new": [False],
|
57 |
+
"depth": [2],
|
58 |
+
"link_id": ["eqrkhgbjeh"],
|
59 |
+
"submission_id": ["eqrkhgbjeh"]
|
60 |
+
}
|
61 |
+
|
62 |
|
63 |
def load_or_create_dataset():
|
64 |
"""
|
|
|
95 |
return dataset
|
96 |
|
97 |
|
98 |
+
def load_or_create_comment_dataset():
|
99 |
+
# Load the existing dataset from the Hugging Face hub or create a new one
|
100 |
+
try:
|
101 |
+
logger.debug(f"Trying to download {comment_dataset_name}")
|
102 |
+
dataset = load_dataset(comment_dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD)
|
103 |
+
logger.debug("Loading existing comment dataset")
|
104 |
+
except FileNotFoundError:
|
105 |
+
logger.warning("Creating new comment dataset")
|
106 |
+
|
107 |
+
# Creating Initial Repo
|
108 |
+
dataset = DatasetDict()
|
109 |
+
dataset['train'] = Dataset.from_dict(dummy_comment_data)
|
110 |
+
dataset.push_to_hub(repo_id=comment_dataset_name, token=auth_token)
|
111 |
+
|
112 |
+
# Pulling from Initial Repo
|
113 |
+
dataset = load_dataset(comment_dataset_name)
|
114 |
+
|
115 |
+
# Remove dummy data
|
116 |
+
del dataset['train']
|
117 |
+
return dataset
|
118 |
+
|
119 |
+
|
120 |
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
|
121 |
"""
|
122 |
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
|
|
|
165 |
|
166 |
|
167 |
def get_latest_data():
|
168 |
+
submissions, comments = praw_downloader()
|
169 |
df = preprocess_praw_data(submissions=submissions)
|
170 |
+
df_comments = preprocess_praw_comment_data(comments=comments)
|
171 |
+
return df, df_comments
|