Spaces:
Runtime error
Runtime error
File size: 7,256 Bytes
4f8ad24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import os.path
import re
from typing import Optional, Iterator, List, Tuple, Union, Literal
from hbutils.system import urlsplit
from requests.auth import HTTPBasicAuth
from .web import NoURL, WebDataSource
from ..config.meta import __TITLE__, __VERSION__
from ..utils import get_requests_session, srequest
_DanbooruSiteTyping = Literal['konachan', 'yandere', 'danbooru', 'safebooru', 'lolibooru']
class DanbooruLikeSource(WebDataSource):
def __init__(self, tags: List[str], min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
site_name: Optional[str] = 'danbooru', site_url: Optional[str] = 'https://danbooru.donmai.us/',
group_name: Optional[str] = None):
WebDataSource.__init__(self, group_name or site_name, None, download_silent)
self.session = get_requests_session(headers={
"User-Agent": f"{__TITLE__}/{__VERSION__}",
'Content-Type': 'application/json; charset=utf-8',
})
self.auth = HTTPBasicAuth(username, api_key) if username and api_key else None
self.site_name, self.site_url = site_name, site_url
self.tags = tags
self.min_size = min_size
def _get_data_from_raw(self, raw):
return raw
def _select_url(self, data):
if self.min_size is not None and "media_asset" in data and "variants" in data["media_asset"]:
variants = data["media_asset"]["variants"]
width, height, url = None, None, None
for item in variants:
if 'width' in item and 'height' in item and \
item['width'] >= self.min_size and item['height'] >= self.min_size:
if url is None or item['width'] < width:
width, height, url = item['width'], item['height'], item['url']
if url is not None:
return url
if 'file_url' not in data:
raise NoURL
return data['file_url']
def _get_tags(self, data):
return re.split(r'\s+', data["tag_string"])
def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
page = 1
while True:
resp = srequest(self.session, 'GET', f'{self.site_url}/posts.json', params={
"format": "json",
"limit": "100",
"page": str(page),
"tags": ' '.join(self.tags),
}, auth=self.auth)
resp.raise_for_status()
page_items = self._get_data_from_raw(resp.json())
if not page_items:
break
for data in page_items:
try:
url = self._select_url(data)
except NoURL:
continue
_, ext_name = os.path.splitext(urlsplit(url).filename)
filename = f'{self.group_name}_{data["id"]}{ext_name}'
meta = {
self.site_name: data,
'group_id': f'{self.group_name}_{data["id"]}',
'filename': filename,
'tags': {key: 1.0 for key in self._get_tags(data)}
}
yield data['id'], url, meta
page += 1
class DanbooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'danbooru', 'https://danbooru.donmai.us/', group_name)
class SafebooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'safebooru', 'https://safebooru.donmai.us', group_name)
class ATFBooruSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'danbooru', 'https://booru.allthefallen.moe', group_name)
class E621LikeSource(DanbooruLikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
site_name: Optional[str] = 'e621', site_url: Optional[str] = 'https://e621.net/',
group_name: Optional[str] = None):
DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
site_name, site_url, group_name or site_name)
def _get_data_from_raw(self, raw):
return raw['posts']
def _select_url(self, data):
urls = []
urls.append((data['file']['url'], data['file']['width'], data['file']['height']))
urls.append((data['preview']['url'], data['preview']['width'], data['preview']['height']))
if 'sample' in data and data['sample']['has']:
urls.append((data['sample']['url'], data['sample']['width'], data['sample']['height']))
if self.min_size is not None:
f_url, f_width, f_height = None, None, None
for url, width, height in urls:
if width >= self.min_size and height >= self.min_size:
if f_url is None or width < f_width:
f_url, f_width, f_height = url, width, height
if f_url is not None:
return f_url
return urls[0][0]
def _get_tags(self, data):
tags = []
for value in data['tags'].values():
tags.extend(value)
return tags
class E621Source(E621LikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = 'e621'):
E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'e621', 'https://e621.net/', group_name)
class E926Source(E621LikeSource):
def __init__(self, tags: List[str],
min_size: Optional[int] = 800, download_silent: bool = True,
username: Optional[str] = None, api_key: Optional[str] = None,
group_name: Optional[str] = 'e926'):
E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
'e926', 'https://e926.net/', group_name)
|