File size: 7,256 Bytes
4f8ad24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os.path
import re
from typing import Optional, Iterator, List, Tuple, Union, Literal

from hbutils.system import urlsplit
from requests.auth import HTTPBasicAuth

from .web import NoURL, WebDataSource
from ..config.meta import __TITLE__, __VERSION__
from ..utils import get_requests_session, srequest

_DanbooruSiteTyping = Literal['konachan', 'yandere', 'danbooru', 'safebooru', 'lolibooru']


class DanbooruLikeSource(WebDataSource):
    def __init__(self, tags: List[str], min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 site_name: Optional[str] = 'danbooru', site_url: Optional[str] = 'https://danbooru.donmai.us/',
                 group_name: Optional[str] = None):
        WebDataSource.__init__(self, group_name or site_name, None, download_silent)
        self.session = get_requests_session(headers={
            "User-Agent": f"{__TITLE__}/{__VERSION__}",
            'Content-Type': 'application/json; charset=utf-8',
        })
        self.auth = HTTPBasicAuth(username, api_key) if username and api_key else None
        self.site_name, self.site_url = site_name, site_url
        self.tags = tags
        self.min_size = min_size

    def _get_data_from_raw(self, raw):
        return raw

    def _select_url(self, data):
        if self.min_size is not None and "media_asset" in data and "variants" in data["media_asset"]:
            variants = data["media_asset"]["variants"]
            width, height, url = None, None, None
            for item in variants:
                if 'width' in item and 'height' in item and \
                        item['width'] >= self.min_size and item['height'] >= self.min_size:
                    if url is None or item['width'] < width:
                        width, height, url = item['width'], item['height'], item['url']

            if url is not None:
                return url

        if 'file_url' not in data:
            raise NoURL

        return data['file_url']

    def _get_tags(self, data):
        return re.split(r'\s+', data["tag_string"])

    def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
        page = 1
        while True:
            resp = srequest(self.session, 'GET', f'{self.site_url}/posts.json', params={
                "format": "json",
                "limit": "100",
                "page": str(page),
                "tags": ' '.join(self.tags),
            }, auth=self.auth)
            resp.raise_for_status()
            page_items = self._get_data_from_raw(resp.json())
            if not page_items:
                break

            for data in page_items:
                try:
                    url = self._select_url(data)
                except NoURL:
                    continue

                _, ext_name = os.path.splitext(urlsplit(url).filename)
                filename = f'{self.group_name}_{data["id"]}{ext_name}'
                meta = {
                    self.site_name: data,
                    'group_id': f'{self.group_name}_{data["id"]}',
                    'filename': filename,
                    'tags': {key: 1.0 for key in self._get_tags(data)}
                }
                yield data['id'], url, meta

            page += 1


class DanbooruSource(DanbooruLikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 group_name: Optional[str] = None):
        DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                    'danbooru', 'https://danbooru.donmai.us/', group_name)


class SafebooruSource(DanbooruLikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 group_name: Optional[str] = None):
        DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                    'safebooru', 'https://safebooru.donmai.us', group_name)


class ATFBooruSource(DanbooruLikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 group_name: Optional[str] = None):
        DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                    'danbooru', 'https://booru.allthefallen.moe', group_name)


class E621LikeSource(DanbooruLikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 site_name: Optional[str] = 'e621', site_url: Optional[str] = 'https://e621.net/',
                 group_name: Optional[str] = None):
        DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                    site_name, site_url, group_name or site_name)

    def _get_data_from_raw(self, raw):
        return raw['posts']

    def _select_url(self, data):
        urls = []
        urls.append((data['file']['url'], data['file']['width'], data['file']['height']))
        urls.append((data['preview']['url'], data['preview']['width'], data['preview']['height']))
        if 'sample' in data and data['sample']['has']:
            urls.append((data['sample']['url'], data['sample']['width'], data['sample']['height']))

        if self.min_size is not None:
            f_url, f_width, f_height = None, None, None
            for url, width, height in urls:
                if width >= self.min_size and height >= self.min_size:
                    if f_url is None or width < f_width:
                        f_url, f_width, f_height = url, width, height

            if f_url is not None:
                return f_url

        return urls[0][0]

    def _get_tags(self, data):
        tags = []
        for value in data['tags'].values():
            tags.extend(value)
        return tags


class E621Source(E621LikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 group_name: Optional[str] = 'e621'):
        E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                'e621', 'https://e621.net/', group_name)


class E926Source(E621LikeSource):
    def __init__(self, tags: List[str],
                 min_size: Optional[int] = 800, download_silent: bool = True,
                 username: Optional[str] = None, api_key: Optional[str] = None,
                 group_name: Optional[str] = 'e926'):
        E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key,
                                'e926', 'https://e926.net/', group_name)