Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
import math | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from typing import Optional | |
from michelangelo.models.modules.checkpoint import checkpoint | |
def init_linear(l, stddev): | |
nn.init.normal_(l.weight, std=stddev) | |
if l.bias is not None: | |
nn.init.constant_(l.bias, 0.0) | |
class MultiheadAttention(nn.Module): | |
def __init__( | |
self, | |
*, | |
device: torch.device, | |
dtype: torch.dtype, | |
n_ctx: int, | |
width: int, | |
heads: int, | |
init_scale: float, | |
qkv_bias: bool, | |
flash: bool = False | |
): | |
super().__init__() | |
self.n_ctx = n_ctx | |
self.width = width | |
self.heads = heads | |
self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias, device=device, dtype=dtype) | |
self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) | |
self.attention = QKVMultiheadAttention(device=device, dtype=dtype, heads=heads, n_ctx=n_ctx, flash=flash) | |
init_linear(self.c_qkv, init_scale) | |
init_linear(self.c_proj, init_scale) | |
def forward(self, x): | |
x = self.c_qkv(x) | |
x = checkpoint(self.attention, (x,), (), True) | |
x = self.c_proj(x) | |
return x | |
class QKVMultiheadAttention(nn.Module): | |
def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, n_ctx: int, flash: bool = False): | |
super().__init__() | |
self.device = device | |
self.dtype = dtype | |
self.heads = heads | |
self.n_ctx = n_ctx | |
self.flash = flash | |
def forward(self, qkv): | |
bs, n_ctx, width = qkv.shape | |
attn_ch = width // self.heads // 3 | |
scale = 1 / math.sqrt(math.sqrt(attn_ch)) | |
qkv = qkv.view(bs, n_ctx, self.heads, -1) | |
q, k, v = torch.split(qkv, attn_ch, dim=-1) | |
if self.flash: | |
out = F.scaled_dot_product_attention(q, k, v) | |
else: | |
weight = torch.einsum( | |
"bthc,bshc->bhts", q * scale, k * scale | |
) # More stable with f16 than dividing afterwards | |
wdtype = weight.dtype | |
weight = torch.softmax(weight.float(), dim=-1).type(wdtype) | |
out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) | |
return out | |
class ResidualAttentionBlock(nn.Module): | |
def __init__( | |
self, | |
*, | |
device: torch.device, | |
dtype: torch.dtype, | |
n_ctx: int, | |
width: int, | |
heads: int, | |
init_scale: float = 1.0, | |
qkv_bias: bool = True, | |
flash: bool = False, | |
use_checkpoint: bool = False | |
): | |
super().__init__() | |
self.use_checkpoint = use_checkpoint | |
self.attn = MultiheadAttention( | |
device=device, | |
dtype=dtype, | |
n_ctx=n_ctx, | |
width=width, | |
heads=heads, | |
init_scale=init_scale, | |
qkv_bias=qkv_bias, | |
flash=flash | |
) | |
self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) | |
self.mlp = MLP(device=device, dtype=dtype, width=width, init_scale=init_scale) | |
self.ln_2 = nn.LayerNorm(width, device=device, dtype=dtype) | |
def _forward(self, x: torch.Tensor): | |
x = x + self.attn(self.ln_1(x)) | |
x = x + self.mlp(self.ln_2(x)) | |
return x | |
def forward(self, x: torch.Tensor): | |
return checkpoint(self._forward, (x,), self.parameters(), self.use_checkpoint) | |
class MultiheadCrossAttention(nn.Module): | |
def __init__( | |
self, | |
*, | |
device: torch.device, | |
dtype: torch.dtype, | |
width: int, | |
heads: int, | |
init_scale: float, | |
qkv_bias: bool = True, | |
flash: bool = False, | |
n_data: Optional[int] = None, | |
data_width: Optional[int] = None, | |
): | |
super().__init__() | |
self.n_data = n_data | |
self.width = width | |
self.heads = heads | |
self.data_width = width if data_width is None else data_width | |
self.c_q = nn.Linear(width, width, bias=qkv_bias, device=device, dtype=dtype) | |
self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias, device=device, dtype=dtype) | |
self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) | |
self.attention = QKVMultiheadCrossAttention( | |
device=device, dtype=dtype, heads=heads, n_data=n_data, flash=flash | |
) | |
init_linear(self.c_q, init_scale) | |
init_linear(self.c_kv, init_scale) | |
init_linear(self.c_proj, init_scale) | |
def forward(self, x, data): | |
x = self.c_q(x) | |
data = self.c_kv(data) | |
x = checkpoint(self.attention, (x, data), (), True) | |
x = self.c_proj(x) | |
return x | |
class QKVMultiheadCrossAttention(nn.Module): | |
def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, | |
flash: bool = False, n_data: Optional[int] = None): | |
super().__init__() | |
self.device = device | |
self.dtype = dtype | |
self.heads = heads | |
self.n_data = n_data | |
self.flash = flash | |
def forward(self, q, kv): | |
_, n_ctx, _ = q.shape | |
bs, n_data, width = kv.shape | |
attn_ch = width // self.heads // 2 | |
scale = 1 / math.sqrt(math.sqrt(attn_ch)) | |
q = q.view(bs, n_ctx, self.heads, -1) | |
kv = kv.view(bs, n_data, self.heads, -1) | |
k, v = torch.split(kv, attn_ch, dim=-1) | |
if self.flash: | |
out = F.scaled_dot_product_attention(q, k, v) | |
else: | |
weight = torch.einsum( | |
"bthc,bshc->bhts", q * scale, k * scale | |
) # More stable with f16 than dividing afterwards | |
wdtype = weight.dtype | |
weight = torch.softmax(weight.float(), dim=-1).type(wdtype) | |
out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) | |
return out | |
class ResidualCrossAttentionBlock(nn.Module): | |
def __init__( | |
self, | |
*, | |
device: Optional[torch.device], | |
dtype: Optional[torch.dtype], | |
n_data: Optional[int] = None, | |
width: int, | |
heads: int, | |
data_width: Optional[int] = None, | |
init_scale: float = 0.25, | |
qkv_bias: bool = True, | |
flash: bool = False | |
): | |
super().__init__() | |
if data_width is None: | |
data_width = width | |
self.attn = MultiheadCrossAttention( | |
device=device, | |
dtype=dtype, | |
n_data=n_data, | |
width=width, | |
heads=heads, | |
data_width=data_width, | |
init_scale=init_scale, | |
qkv_bias=qkv_bias, | |
flash=flash, | |
) | |
self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) | |
self.ln_2 = nn.LayerNorm(data_width, device=device, dtype=dtype) | |
self.mlp = MLP(device=device, dtype=dtype, width=width, init_scale=init_scale) | |
self.ln_3 = nn.LayerNorm(width, device=device, dtype=dtype) | |
def forward(self, x: torch.Tensor, data: torch.Tensor): | |
x = x + self.attn(self.ln_1(x), self.ln_2(data)) | |
x = x + self.mlp(self.ln_3(x)) | |
return x | |
class MLP(nn.Module): | |
def __init__(self, *, | |
device: Optional[torch.device], | |
dtype: Optional[torch.dtype], | |
width: int, | |
init_scale: float): | |
super().__init__() | |
self.width = width | |
self.c_fc = nn.Linear(width, width * 4, device=device, dtype=dtype) | |
self.c_proj = nn.Linear(width * 4, width, device=device, dtype=dtype) | |
self.gelu = nn.GELU() | |
init_linear(self.c_fc, init_scale) | |
init_linear(self.c_proj, init_scale) | |
def forward(self, x): | |
return self.c_proj(self.gelu(self.c_fc(x))) | |
class Transformer(nn.Module): | |
def __init__( | |
self, | |
*, | |
device: Optional[torch.device], | |
dtype: Optional[torch.dtype], | |
n_ctx: int, | |
width: int, | |
layers: int, | |
heads: int, | |
init_scale: float = 0.25, | |
qkv_bias: bool = True, | |
flash: bool = False, | |
use_checkpoint: bool = False | |
): | |
super().__init__() | |
self.n_ctx = n_ctx | |
self.width = width | |
self.layers = layers | |
self.resblocks = nn.ModuleList( | |
[ | |
ResidualAttentionBlock( | |
device=device, | |
dtype=dtype, | |
n_ctx=n_ctx, | |
width=width, | |
heads=heads, | |
init_scale=init_scale, | |
qkv_bias=qkv_bias, | |
flash=flash, | |
use_checkpoint=use_checkpoint | |
) | |
for _ in range(layers) | |
] | |
) | |
def forward(self, x: torch.Tensor): | |
for block in self.resblocks: | |
x = block(x) | |
return x | |