LittleApple-fp16's picture
Upload 88 files
4f8ad24
raw
history blame
7.26 kB
import os.path
import re
from typing import Optional, Iterator, List, Tuple, Union, Literal
from hbutils.system import urlsplit
from requests.auth import HTTPBasicAuth
from .web import NoURL, WebDataSource
from ..config.meta import __TITLE__, __VERSION__
from ..utils import get_requests_session, srequest
_DanbooruSiteTyping = Literal['konachan', 'yandere', 'danbooru', 'safebooru', 'lolibooru']
class DanbooruLikeSource(WebDataSource):
def __init__(self, tags: List[str], min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
site_name: Optional[str] = 'danbooru', site_url: Optional[str] = 'https://danbooru.donmai.us/',
group_name: Optional[str] = None):
WebDataSource.__init__(self, group_name or site_name, None, download_silent)
self.session = get_requests_session(headers={
"User-Agent": f"{__TITLE__}/{__VERSION__}",
'Content-Type': 'application/json; charset=utf-8',
})
self.auth = HTTPBasicAuth(username, api_key) if username and api_key else None
self.site_name, self.site_url = site_name, site_url
self.tags = tags
self.min_size = min_size
def _get_data_from_raw(self, raw):
return raw
def _select_url(self, data):
if self.min_size is not None and "media_asset" in data and "variants" in data["media_asset"]:
variants = data["media_asset"]["variants"]
width, height, url = None, None, None
for item in variants:
if 'width' in item and 'height' in item and \
item['width'] >= self.min_size and item['height'] >= self.min_size:
if url is None or item['width'] < width:
width, height, url = item['width'], item['height'], item['url']
if url is not None:
return url
if 'file_url' not in data:
raise NoURL
return data['file_url']
def _get_tags(self, data):
return re.split(r'\s+', data["tag_string"])
def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
page = 1
while True:
resp = srequest(self.session, 'GET', f'{self.site_url}/posts.json', params={
"format": "json",
"limit": "100",
"page": str(page),
"tags": ' '.join(self.tags),
}, auth=self.auth)
resp.raise_for_status()
page_items = self._get_data_from_raw(resp.json())
if not page_items:
break
for data in page_items:
try:
url = self._select_url(data)
except NoURL:
continue
_, ext_name = os.path.splitext(urlsplit(url).filename)
filename = f'{self.group_name}_{data["id"]}{ext_name}'
meta = {
self.site_name: data,
'group_id': f'{self.group_name}_{data["id"]}',
'filename': filename,
'tags': {key: 1.0 for key in self._get_tags(data)}
}
yield data['id'], url, meta
page += 1
class DanbooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'danbooru', 'https://danbooru.donmai.us/', group_name)
class SafebooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'safebooru', 'https://safebooru.donmai.us', group_name)
class ATFBooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'danbooru', 'https://booru.allthefallen.moe', group_name)
class E621LikeSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
site_name: Optional[str] = 'e621', site_url: Optional[str] = 'https://e621.net/',
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
site_name, site_url, group_name or site_name)
def _get_data_from_raw(self, raw):
return raw['posts']
def _select_url(self, data):
urls = []
urls.append((data['file']['url'], data['file']['width'], data['file']['height']))
urls.append((data['preview']['url'], data['preview']['width'], data['preview']['height']))
if 'sample' in data and data['sample']['has']:
urls.append((data['sample']['url'], data['sample']['width'], data['sample']['height']))
if self.min_size is not None:
f_url, f_width, f_height = None, None, None
for url, width, height in urls:
if width >= self.min_size and height >= self.min_size:
if f_url is None or width < f_width:
f_url, f_width, f_height = url, width, height
if f_url is not None:
return f_url
return urls[0][0]
def _get_tags(self, data):
tags = []
for value in data['tags'].values():
tags.extend(value)
return tags
class E621Source(E621LikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = 'e621'):
E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'e621', 'https://e621.net/', group_name)
class E926Source(E621LikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = 'e926'):
E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'e926', 'https://e926.net/', group_name)