File size: 7,099 Bytes
4fb3c5e
 
 
 
cb229bd
4fb3c5e
af2a8f5
4fb3c5e
31cdb53
af2a8f5
cb229bd
4fb3c5e
 
af2a8f5
6f3a230
 
4fb3c5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb229bd
4fb3c5e
31cdb53
 
80b7378
 
6f3a230
4fb3c5e
 
 
 
 
 
 
bbe671a
 
4fb3c5e
 
 
bbe671a
 
4fb3c5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6f3a230
4fb3c5e
 
 
cb229bd
 
 
 
4fb3c5e
 
 
 
 
cb229bd
 
4fb3c5e
cb229bd
4fb3c5e
cb229bd
4fb3c5e
 
 
 
 
 
af2a8f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31cdb53
 
 
 
 
 
 
 
 
 
 
af2a8f5
 
c63e736
 
4fb3c5e
 
 
26f3e39
 
325745f
c63e736
 
cb229bd
 
 
 
 
 
 
eab3b8e
cb229bd
b6c80e7
 
cb229bd
 
6383bc4
cb229bd
0dc864d
cb229bd
6383bc4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from __future__ import annotations

import logging
import os
import random
import sys
import tempfile

import gradio as gr
import imageio
import numpy as np
import PIL.Image
import torch
import tqdm.auto
from diffusers import (DDIMPipeline, DDIMScheduler, DDPMPipeline,
                       DiffusionPipeline, PNDMPipeline, PNDMScheduler)

HF_TOKEN = os.environ['HF_TOKEN']

formatter = logging.Formatter(
    '[%(asctime)s] %(name)s %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S')
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.propagate = False
logger.addHandler(stream_handler)


class Model:

    MODEL_NAMES = [
        'ddpm-128-exp000',
    ]

    def __init__(self, device: str | torch.device):
        self.device = torch.device(device)
        self._download_all_models()

        self.model_name = self.MODEL_NAMES[0]
        self.scheduler_type = 'DDIM'
        self.pipeline = self._load_pipeline(self.model_name,
                                            self.scheduler_type)
        self.rng = random.Random()

        self.real_esrgan = gr.Interface.load('spaces/hysts/Real-ESRGAN-anime')

    @staticmethod
    def _load_pipeline(model_name: str,
                       scheduler_type: str) -> DiffusionPipeline:
        repo_id = f'hysts/diffusers-anime-faces-{model_name}'
        if scheduler_type == 'DDPM':
            pipeline = DDPMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
        elif scheduler_type == 'DDIM':
            pipeline = DDIMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
            pipeline.scheduler = DDIMScheduler.from_config(
                repo_id, subfolder='scheduler', use_auth_token=HF_TOKEN)
        elif scheduler_type == 'PNDM':
            pipeline = PNDMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
            pipeline.scheduler = PNDMScheduler.from_config(
                repo_id, subfolder='scheduler', use_auth_token=HF_TOKEN)
        else:
            raise ValueError
        return pipeline

    def set_pipeline(self, model_name: str, scheduler_type: str) -> None:
        logger.info('--- set_pipeline ---')
        logger.info(f'{model_name=}, {scheduler_type=}')

        if model_name == self.model_name and scheduler_type == self.scheduler_type:
            logger.info('Skipping')
            logger.info('--- done ---')
            return
        self.model_name = model_name
        self.scheduler_type = scheduler_type
        self.pipeline = self._load_pipeline(model_name, scheduler_type)

        logger.info('--- done ---')

    def _download_all_models(self) -> None:
        for name in self.MODEL_NAMES:
            self._load_pipeline(name, 'DDPM')

    def generate(self,
                 seed: int,
                 num_steps: int,
                 num_images: int = 1) -> list[PIL.Image.Image]:
        logger.info('--- generate ---')
        logger.info(f'{seed=}, {num_steps=}')

        torch.manual_seed(seed)
        if self.scheduler_type == 'DDPM':
            res = self.pipeline(batch_size=num_images,
                                torch_device=self.device)['sample']
        elif self.scheduler_type in ['DDIM', 'PNDM']:
            res = self.pipeline(batch_size=num_images,
                                torch_device=self.device,
                                num_inference_steps=num_steps)['sample']
        else:
            raise ValueError

        logger.info('--- done ---')
        return res

    @staticmethod
    def postprocess(sample: torch.Tensor) -> np.ndarray:
        res = (sample / 2 + 0.5).clamp(0, 1)
        res = (res * 255).to(torch.uint8)
        res = res.cpu().permute(0, 2, 3, 1).numpy()
        return res

    @torch.inference_mode()
    def generate_with_video(self, seed: int,
                            num_steps: int) -> tuple[PIL.Image.Image, str]:
        logger.info('--- generate_with_video ---')
        if self.scheduler_type == 'DDPM':
            num_steps = 1000
            fps = 100
        else:
            fps = 10
        logger.info(f'{seed=}, {num_steps=}')

        model = self.pipeline.unet.to(self.device)
        scheduler = self.pipeline.scheduler
        scheduler.set_timesteps(num_inference_steps=num_steps)
        input_shape = (1, model.config.in_channels, model.config.sample_size,
                       model.config.sample_size)
        torch.manual_seed(seed)

        out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
        writer = imageio.get_writer(out_file.name, fps=fps)
        sample = torch.randn(input_shape).to(self.device)
        for t in tqdm.auto.tqdm(scheduler.timesteps):
            out = model(sample, t)['sample']
            sample = scheduler.step(out, t, sample)['prev_sample']
            res = self.postprocess(sample)[0]
            writer.append_data(res)
        writer.close()

        logger.info('--- done ---')
        return PIL.Image.fromarray(res), out_file.name

    def superresolve(self, image: PIL.Image.Image) -> PIL.Image.Image:
        logger.info('--- superresolve ---')

        with tempfile.NamedTemporaryFile(suffix='.png') as f:
            image.save(f.name)
            out_file = self.real_esrgan(f.name)

        logger.info('--- done ---')
        return PIL.Image.open(out_file)

    def run(self, model_name: str, scheduler_type: str, num_steps: int,
            randomize_seed: bool,
            seed: int) -> tuple[PIL.Image.Image, PIL.Image.Image, int, str]:
        self.set_pipeline(model_name, scheduler_type)
        if scheduler_type == 'PNDM':
            num_steps = max(4, min(num_steps, 100))
        if randomize_seed:
            seed = self.rng.randint(0, 100000)
        res, filename = self.generate_with_video(seed, num_steps)
        superresolved = self.superresolve(res)
        return superresolved, res, seed, filename

    @staticmethod
    def to_grid(images: list[PIL.Image.Image],
                ncols: int = 2) -> PIL.Image.Image:
        images = [np.asarray(image) for image in images]
        nrows = (len(images) + ncols - 1) // ncols
        h, w = images[0].shape[:2]
        if (d := nrows * ncols - len(images)) > 0:
            images += [np.full((h, w, 3), 255, dtype=np.uint8)] * d
        grid = np.asarray(images).reshape(nrows, ncols, h, w, 3).transpose(
            0, 2, 1, 3, 4).reshape(nrows * h, ncols * w, 3)
        return PIL.Image.fromarray(grid)

    def run_simple(self) -> tuple[PIL.Image.Image, PIL.Image.Image]:
        self.set_pipeline(self.MODEL_NAMES[0], 'DDIM')
        seed = self.rng.randint(0, 1000000)
        images = self.generate(seed, num_steps=10, num_images=4)
        superresolved = [self.superresolve(image) for image in images]
        return self.to_grid(superresolved, 2), self.to_grid(images, 2)