Spaces:
Sleeping
Sleeping
File size: 11,180 Bytes
e8c2906 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
import prawcore.exceptions
from settings import CONFIG_FILE_PATH
import re
from praw.models import MoreComments, Subreddit, ListingGenerator
from typing import Iterator, Any, AnyStr, Union, Dict, List
class RedditContent:
"""
Authenticates to Reddit,
Search for subreddits, get threads and comments.
"""
def __init__(self,
max_comment_length=2000, min_comment_length=100, min_comment_score=10,
thread_limit=20, include_nsfw:bool=False, config_file_path:str=None,
sub_limit=100
):
"""
:param max_comment_length:
:param min_comment_length:
:param min_comment_score:
:param thread_limit:
:param include_nsfw:
:param config_file_path:
:param sub_limit:
"""
self.config_file_path = CONFIG_FILE_PATH if config_file_path is None else config_file_path
self.thread_limit = thread_limit
self.sub_limit = sub_limit
self.include_nsfw = include_nsfw
### Comment related parameters
self.max_comment_length = max_comment_length
self.min_comment_length = min_comment_length
self.min_comment_score = min_comment_score
### Parameters to be used
self.reddit = None
self.subs:List[Subreddit]=None
self.contents = {}
self.controlled=False
def process(self,
get_popular:bool=True,
search_query:str=None,
exact_search:bool=False,
fuzzy_search:bool=False,
subreddits:List[AnyStr]=None
):
## Authenticate if not already authenticated
if not self.reddit:
self.authenticate()
## Run process pipeline
if subreddits:
from time import sleep
for sub in subreddits:
sleep(2)
try:
self.pipe(query=sub, fuzzy=False, exact=True)
except prawcore.exceptions.Forbidden as e:
print(f"Could not get the {sub} subreddit due to:\n{e}")
continue
elif search_query:
self.pipe(query=search_query, fuzzy=fuzzy_search, exact=exact_search)
else:## default value -> gets popular subreddits
self.pipe(query=None, fuzzy=False, exact=False)
if not self.controlled:
##control the contents
self.control_contents()
def pipe(self, query, fuzzy, exact):
self.get_subs(
query=query,
exact=exact,
fuzzy=fuzzy
)
## Get threads from subreddits
self.update_content_subreddits()
##get threads and comments
self.update_content_threads_comments()
def get_subs(self,query:AnyStr, exact:bool=False, fuzzy:bool=False):
"""
Calls the methods for popular or query related subreddits
:param query:
:param exact:
:param fuzzy:
:return:
"""
## Get query related subreddits if search query is provided
## default is to have popular subreddits
if query is None:
print("Getting the popular subreddits.")
self.subs = self.get_popular_subreddits()
return self.subs
if fuzzy:
print(f"Getting {query} related subreddits.")
self.subs = self.search_subreddits(query)
return self.subs
print(f"Getting {query} subreddit.")
self.subs = self.search_subreddits_by_name(query, exact=exact)
return self.subs
def search_subreddits(self, query:str):
"""
Search for subreddits title and description by query
:param query:
:return:
"""
return self.reddit.subreddits.search(query=query)
def search_subreddits_by_name(self, query:str, exact:bool=False):
return self.reddit.subreddits.search_by_name(query, exact=exact, include_nsfw=self.include_nsfw)
def get_subreddits(self, subreddits:list|str)->Subreddit:
## https://praw.readthedocs.io/en/stable/code_overview/models/subreddit.html#praw.models.Subreddit
if isinstance(subreddits, str):
return self.reddit.subreddit(subreddits)
return self.reddit.subreddit("+".join(subreddits))
def get_subreddit_threads(self, subreddits:List[AnyStr])->Iterator[Any]:## subreddit object of the subreddits
return self.get_subreddits(subreddits).hot(limit=self.thread_limit)
def get_popular_subreddits(self):
return self.reddit.subreddits.popular(limit=self.sub_limit)
def update_content_subreddits(self):
"""
Populates self.contents with subreddits
:param subreddits:
:return:
"""
for sub in self.subs:
r_url = sub.url.split("/")[2]
self.contents.update({r_url: {
"title": sub.title,
"description": sub.description,
"display_name": sub.display_name,
"url": r_url,
"contents": []
}})
def update_content_threads_comments(self, subreddit_names:List[str]=None):
"""
Updates self.contents with threads and comments
:return:
"""
##get display names for the subreddits
subreddit_names = [sub["display_name"] for sub in self.contents.values()] if subreddit_names is None else subreddit_names
threads_comments = self.get_subreddit_threads(subreddit_names)
for thread in threads_comments:
comments = self.get_comments(thread.id)
if not comments: continue
r_url = comments[0]["url"].split("/")[2]
self.contents[r_url]["contents"].append({
"thread_id": thread.id,
"title": thread.title,
"self_text": thread.selftext,
"num_comments": thread.num_comments,
"comments": comments,
"is_nsfw": thread.over_18,
"upvotes": thread.score,
"thread_url": thread.url,
"upvote_ratio": thread.upvote_ratio,
})
def get_comments(self, post_id, sort_by="top"):
"""
Get comments from a post / thread, filters the comments and returns them
:param post_id: post id to be submitted
:param sort_by: after getting the comments, sort them by top, new, controversial, etc.
:return: a dict of comments --> {text: str, id: str, url: str, author: str, upvotes: int}
"""
submission = self.reddit.submission(post_id)
if submission.over_18 and not self.include_nsfw: return {}
submission.comment_sort = sort_by
comments = []
for top_level_comment in submission.comments.list():
if isinstance(top_level_comment, MoreComments): continue ## TODO: include all comments in the comment tree
filters = [ ## If any of the filters are true, skip the comment
top_level_comment.body in ["[removed]", "[deleted]"],
top_level_comment.stickied,
len(top_level_comment.body) < self.min_comment_length,
len(top_level_comment.body) > self.max_comment_length,
top_level_comment.author is None,
top_level_comment.score < self.min_comment_score
]
if any(filters): continue
sanitised_text = self.sanitise_text(top_level_comment.body)
if sanitised_text is None or sanitised_text.isspace(): continue
comments.append({
"text": sanitised_text,
"id": top_level_comment.id,
"url": top_level_comment.permalink,
"author": top_level_comment.author.name,
"upvotes": top_level_comment.score,
})
return comments
def authenticate(self):
from authentication import authenticate_reddit, get_reddit_config
config = get_reddit_config(self.config_file_path)
self.reddit = authenticate_reddit(config)
def control_contents(self):
"""
When querying multiple subreddits, sometimes some contents returned empty.
Controls the contents of the self.contents
:return:
"""
subs_with_no_contents = [
subreddit_content["display_name"] for subreddit_url, subreddit_content in self.contents.items()
if not subreddit_content["contents"]
]
self.controlled = True
##print(no_contents) ##Debug purposes
if subs_with_no_contents:
self.update_content_threads_comments(
subreddit_names=subs_with_no_contents
)
@staticmethod
def sanitise_text(text:str, no_urls:bool=True, no_special_chars:bool=True, no_emojis:bool=True)->str:
r"""Sanitizes the text for tts.
What gets removed:
- following characters`^_~@!&;#:-%“”‘"%*/{}[]()\|<>?=+`
- any http or https links
- emojis
Args:
text (str): Text to be sanitized
Returns:
str: Sanitized text
"""
# remove any urls from the text
if no_urls:
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
text = re.sub(regex_urls, " ", text)
# note: not removing apostrophes
if no_special_chars:
regex_expr = r"\s['|’]|['|’]\s|[\^_~@!&;#:\-%—“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", text)
text = result.replace("+", "plus").replace("&", "and")
if no_emojis:
emoj = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002500-\U00002BEF" # chinese char
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
u"\U0001f926-\U0001f937"
u"\U00010000-\U0010ffff"
u"\u2640-\u2642"
u"\u2600-\u2B55"
u"\u200d"
u"\u23cf"
u"\u23e9"
u"\u231a"
u"\ufe0f" # dingbats
u"\u3030"
"]+", re.UNICODE)
text = re.sub(emoj, '', text)
# utf-8 decode translate unicode characters
text = str(text)
# remove extra whitespace
return " ".join(text.split())
def write_contents(self, out_file_path:str):
import json
with open(out_file_path, "w") as f:
json.dump(self.contents, f, indent=4) |