import copy from dataclasses import dataclass import math from safetensors.torch import load_model import torch from torch import nn from transformers.modeling_utils import ModuleUtilsMixin, PretrainedConfig, PreTrainedModel from transformers.modeling_outputs import ModelOutput from transformers.models.t5.configuration_t5 import T5Config from transformers.models.t5.modeling_t5 import ( T5LayerNorm, T5DenseGatedActDense, ) from typing import Optional ### # Code from from PiotrNawrot/nanoT5/nanoT5/utils/t5_model.py @dataclass class EncoderOutput(ModelOutput): hidden_states: torch.FloatTensor = None attention_mask: torch.FloatTensor = None @dataclass class Seq2SeqLMOutput(ModelOutput): loss: torch.FloatTensor = None logits: torch.FloatTensor = None encoder_outputs: EncoderOutput = None class T5LayerFF(nn.Module): def __init__(self, config: T5Config): super().__init__() assert config.is_gated_act self.DenseReluDense = T5DenseGatedActDense(config) self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) self.dropout = nn.Dropout(config.dropout_rate) def forward(self, hidden_states): forwarded_states = self.layer_norm(hidden_states).type_as(hidden_states) forwarded_states = self.DenseReluDense(forwarded_states) hidden_states = hidden_states + self.dropout(forwarded_states) return hidden_states class T5Attention(nn.Module): def __init__(self, config: T5Config, has_relative_attention_bias=False): super().__init__() self.is_decoder = config.is_decoder self.has_relative_attention_bias = has_relative_attention_bias self.relative_attention_num_buckets = config.relative_attention_num_buckets self.relative_attention_max_distance = config.relative_attention_max_distance self.d_model = config.d_model self.key_value_proj_dim = config.d_kv self.n_heads = config.num_heads self.dropout = config.dropout_rate self.inner_dim = self.n_heads * self.key_value_proj_dim self.q = nn.Linear(self.d_model, self.inner_dim, bias=False) self.k = nn.Linear(self.d_model, self.inner_dim, bias=False) self.v = nn.Linear(self.d_model, self.inner_dim, bias=False) self.o = nn.Linear(self.inner_dim, self.d_model, bias=False) if self.has_relative_attention_bias: self.relative_attention_bias = nn.Embedding(self.relative_attention_num_buckets, self.n_heads) @staticmethod def _relative_position_bucket(relative_position, bidirectional=True, num_buckets=32, max_distance=128): """ Adapted from Mesh Tensorflow: https://github.com/tensorflow/mesh/blob/0cb87fe07da627bf0b7e60475d59f95ed6b5be3d/mesh_tensorflow/transformer/transformer_layers.py#L593 Translate relative position to a bucket number for relative attention. The relative position is defined as memory_position - query_position, i.e. the distance in tokens from the attending position to the attended-to position. If bidirectional=False, then positive relative positions are invalid. We use smaller buckets for small absolute relative_position and larger buckets for larger absolute relative_positions. All relative positions >=max_distance map to the same bucket. All relative positions <=-max_distance map to the same bucket. This should allow for more graceful generalization to longer sequences than the model has been trained on Args: relative_position: an int32 Tensor bidirectional: a boolean - whether the attention is bidirectional num_buckets: an integer max_distance: an integer Returns: a Tensor with the same shape as relative_position, containing int32 values in the range [0, num_buckets) """ relative_buckets = 0 if bidirectional: num_buckets //= 2 relative_buckets += (relative_position > 0).to(torch.long) * num_buckets relative_position = torch.abs(relative_position) else: relative_position = -torch.min(relative_position, torch.zeros_like(relative_position)) # now relative_position is in the range [0, inf) # half of the buckets are for exact increments in positions max_exact = num_buckets // 2 is_small = relative_position < max_exact # The other half of the buckets are for logarithmically bigger bins in positions up to max_distance relative_position_if_large = max_exact + ( torch.log(relative_position.float() / max_exact) / math.log(max_distance / max_exact) * (num_buckets - max_exact) ).to(torch.long) relative_position_if_large = torch.min( relative_position_if_large, torch.full_like(relative_position_if_large, num_buckets - 1) ) relative_buckets += torch.where(is_small, relative_position, relative_position_if_large) return relative_buckets def compute_bias(self, query_length, key_length, device=None): """Compute binned relative position bias""" if device is None: device = self.relative_attention_bias.weight.device context_position = torch.arange(query_length, dtype=torch.long, device=device)[:, None] memory_position = torch.arange(key_length, dtype=torch.long, device=device)[None, :] relative_position = memory_position - context_position # shape (query_length, key_length) relative_position_bucket = self._relative_position_bucket( relative_position, # shape (query_length, key_length) bidirectional=(not self.is_decoder), num_buckets=self.relative_attention_num_buckets, max_distance=self.relative_attention_max_distance, ) values = self.relative_attention_bias(relative_position_bucket) # shape (query_length, key_length, num_heads) values = values.permute([2, 0, 1]).unsqueeze(0) # shape (1, num_heads, query_length, key_length) return values def forward( self, hidden_states, mask=None, key_value_states=None, position_bias=None, ): """ Self-attention (if key_value_states is None) or attention over source sentence (provided by key_value_states). """ # Input is (batch_size, seq_length, dim) # Mask is (batch_size, key_length) (non-causal) or (batch_size, key_length, key_length) batch_size, seq_length = hidden_states.shape[:2] real_seq_length = seq_length key_length = real_seq_length if key_value_states is None else key_value_states.shape[1] def shape(states): """projection""" return states.view(batch_size, -1, self.n_heads, self.key_value_proj_dim).transpose(1, 2) def unshape(states): """reshape""" return states.transpose(1, 2).contiguous().view(batch_size, -1, self.inner_dim) query_states = self.q(hidden_states) if key_value_states is None: key_states, value_states = self.k(hidden_states), self.v(hidden_states) else: key_states, value_states = self.k(key_value_states), self.v(key_value_states) query_states, key_states, value_states = shape(query_states), shape(key_states), shape(value_states) scores = torch.matmul( query_states, key_states.transpose(3, 2) ) # equivalent of torch.einsum("bnqd,bnkd->bnqk", query_states, key_states), compatible with onnx op>9 if position_bias is None: if not self.has_relative_attention_bias: position_bias = torch.zeros( (1, self.n_heads, real_seq_length, key_length), device=scores.device, dtype=scores.dtype ) else: position_bias = self.compute_bias(real_seq_length, key_length, device=scores.device) if mask is not None: # Masking happens here, masked elements in the mask have the value of -inf position_bias = position_bias + mask # (batch_size, n_heads, seq_length, key_length) position_bias_masked = position_bias scores += position_bias_masked attn_weights = nn.functional.softmax(scores.float(), dim=-1).type_as( scores ) # (batch_size, n_heads, seq_length, key_length) attn_weights = nn.functional.dropout( attn_weights, p=self.dropout, training=self.training ) # (batch_size, n_heads, seq_length, key_length) attn_output = unshape(torch.matmul(attn_weights, value_states)) # (batch_size, seq_length, dim) attn_output = self.o(attn_output) return (attn_output, position_bias) class T5LayerSelfAttention(nn.Module): def __init__(self, config, has_relative_attention_bias=False): super().__init__() self.SelfAttention = T5Attention(config, has_relative_attention_bias=has_relative_attention_bias) self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) self.dropout = nn.Dropout(config.dropout_rate) def forward( self, hidden_states, attention_mask=None, position_bias=None, ): normed_hidden_states = self.layer_norm(hidden_states).type_as(hidden_states) attention_output = self.SelfAttention( normed_hidden_states, mask=attention_mask, position_bias=position_bias, ) hidden_states = hidden_states + self.dropout(attention_output[0]) outputs = (hidden_states,) + attention_output[1:] return outputs class T5LayerCrossAttention(nn.Module): def __init__(self, config): super().__init__() self.EncDecAttention = T5Attention(config, has_relative_attention_bias=False) self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) self.dropout = nn.Dropout(config.dropout_rate) def forward( self, hidden_states, key_value_states, attention_mask=None, position_bias=None, ): normed_hidden_states = self.layer_norm(hidden_states) attention_output = self.EncDecAttention( normed_hidden_states, mask=attention_mask, key_value_states=key_value_states, position_bias=position_bias, ) layer_output = hidden_states + self.dropout(attention_output[0]) outputs = (layer_output,) + attention_output[1:] return outputs class T5Block(nn.Module): def __init__(self, config, has_relative_attention_bias=False): super().__init__() self.is_decoder = config.is_decoder self.layer = nn.ModuleList() self.layer.append(T5LayerSelfAttention(config, has_relative_attention_bias=has_relative_attention_bias)) if self.is_decoder: self.layer.append(T5LayerCrossAttention(config)) self.layer.append(T5LayerFF(config)) def forward( self, hidden_states, attention_mask=None, position_bias=None, encoder_hidden_states=None, encoder_attention_mask=None, encoder_decoder_position_bias=None, ): self_attention_outputs = self.layer[0]( hidden_states, attention_mask=attention_mask, position_bias=position_bias, ) hidden_states = self_attention_outputs[0] attention_outputs = self_attention_outputs[1:] # Relative position weights if self.is_decoder and encoder_hidden_states is not None: cross_attention_outputs = self.layer[1]( hidden_states, key_value_states=encoder_hidden_states, attention_mask=encoder_attention_mask, position_bias=encoder_decoder_position_bias, ) hidden_states = cross_attention_outputs[0] # Keep relative position weights attention_outputs = attention_outputs + cross_attention_outputs[1:] # Apply Feed Forward layer hidden_states = self.layer[-1](hidden_states) outputs = (hidden_states,) outputs = outputs + attention_outputs return outputs # hidden-states, (self-attention position bias), (cross-attention position bias) class T5Stack(nn.Module, ModuleUtilsMixin): def __init__(self, config, embed_tokens): super().__init__() assert embed_tokens is not None self.config = config self.embed_tokens = embed_tokens self.is_decoder = config.is_decoder self.block = nn.ModuleList( [T5Block(config, has_relative_attention_bias=bool(i == 0)) for i in range(config.num_layers)] ) self.final_layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) self.dropout = nn.Dropout(config.dropout_rate) def forward( self, input_ids=None, attention_mask=None, encoder_hidden_states=None, encoder_attention_mask=None, ) -> EncoderOutput: input_shape = input_ids.size() batch_size, seq_length = input_shape inputs_embeds = self.embed_tokens(input_ids) if hasattr(self.config, 'is_bf16') and self.config.is_bf16: inputs_embeds = inputs_embeds.to(torch.bfloat16) # Masking if attention_mask is None: attention_mask = torch.ones(batch_size, seq_length, device=inputs_embeds.device) if self.is_decoder and encoder_attention_mask is None and encoder_hidden_states is not None: encoder_seq_length = encoder_hidden_states.shape[1] encoder_attention_mask = torch.ones( batch_size, encoder_seq_length, device=inputs_embeds.device, dtype=torch.long ) # We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length] # ourselves in which case we just need to make it broadcastable to all heads. extended_attention_mask = self.get_extended_attention_mask(attention_mask, input_shape) # If a 2D or 3D attention mask is provided for the cross-attention # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] if self.is_decoder and encoder_hidden_states is not None: encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) if encoder_attention_mask is None: encoder_attention_mask = torch.ones(encoder_hidden_shape, device=inputs_embeds.device) encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) else: encoder_extended_attention_mask = None position_bias = None encoder_decoder_position_bias = None hidden_states = self.dropout(inputs_embeds) for _, layer_module in enumerate(self.block): layer_outputs = layer_module( hidden_states, attention_mask=extended_attention_mask, position_bias=position_bias, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_extended_attention_mask, encoder_decoder_position_bias=encoder_decoder_position_bias, ) hidden_states = layer_outputs[0] # We share the position biases between the layers - the first layer store them position_bias = layer_outputs[1] if self.is_decoder and encoder_hidden_states is not None: encoder_decoder_position_bias = layer_outputs[2] hidden_states = self.final_layer_norm(hidden_states).type_as(hidden_states) hidden_states = self.dropout(hidden_states) return EncoderOutput( hidden_states=hidden_states, attention_mask=attention_mask, ) ### # Code from huggingface/twodgirl # License: apache-2.0 class T5EncoderModel(nn.Module): def __init__(self, config: T5Config): super().__init__() config.is_encoder_decoder = False assert not config.tie_word_embeddings self.config = config self.model_dim = config.d_model self.shared = nn.Embedding(config.vocab_size, config.d_model) encoder_config = copy.deepcopy(config) encoder_config.is_decoder = False self.encoder = T5Stack(encoder_config, self.shared) flux_dev_d_model = 4096 self.last_layer = nn.Sequential( nn.Linear(config.d_model, flux_dev_d_model), nn.ReLU(), nn.Linear(flux_dev_d_model, flux_dev_d_model) ) self.apply(self._init_weights) def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.FloatTensor] = None ): encoder_outputs = self.encoder( input_ids=input_ids, attention_mask=attention_mask, ) return self.last_layer(encoder_outputs.hidden_states) def get_input_embeddings(self): return self.shared def _init_weights(self, module): factor = self.config.initializer_factor # Used for testing weights initialization if isinstance(module, T5LayerNorm): module.weight.data.fill_(factor * 1.0) elif isinstance(module, T5EncoderModel): module.shared.weight.data.normal_(mean=0.0, std=factor * 1.0) if hasattr(module, "lm_head") and not self.config.tie_word_embeddings: module.lm_head.weight.data.normal_(mean=0.0, std=factor * 1.0) elif isinstance(module, T5DenseGatedActDense): d_ff, d_model = module.wi_0.weight.data.size() module.wi_0.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) module.wi_1.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) module.wo.weight.data.normal_(mean=0.0, std=factor * ((d_ff) ** -0.5)) elif isinstance(module, T5Attention): d_model = self.config.d_model key_value_proj_dim = self.config.d_kv n_heads = self.config.num_heads module.q.weight.data.normal_(mean=0.0, std=factor * ((d_model * key_value_proj_dim) ** -0.5)) module.k.weight.data.normal_(mean=0.0, std=factor * (d_model ** -0.5)) module.v.weight.data.normal_(mean=0.0, std=factor * (d_model ** -0.5)) module.o.weight.data.normal_(mean=0.0, std=factor * ((n_heads * key_value_proj_dim) ** -0.5)) if hasattr(module, "relative_attention_bias"): module.relative_attention_bias.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) class PretrainedTextEncoder(PreTrainedModel): # Call by: # t5 = PretrainedTextEncoder(t5_config, T5EncoderModel(t5_config)).to(dtype=torch.float16) # t5.load_model('text_encoder_2.safetensors') # ... # FluxPipeline.from_pretrained(..., text_encoder_2=t5) def __init__(self, config, model): super().__init__(config, model) self.model = model def load_model(self, filepath): load_model(self.model, filepath) def forward(self, x, output_hidden_states=False): return self.model(x), t5_config = T5Config(d_model=4096 // 2, dd_ff=10240 // 2, num_layers=2, num_heads=32, is_gated_act=True, tie_word_embeddings=False, max_seq_len=512)