File size: 8,385 Bytes
8c649e7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import torch
import torch.nn.functional as F
from dataclasses import dataclass
#import tiktoken
#tokenizer = tiktoken.get_encoding("gpt2")
import safetensors.torch
# Define the GPTConfig dataclass
@dataclass
class GPTConfig:
vocab_size : int = 50304
n_layer : int = 12
n_head : int = 6 # head dim 128 suggested by @Grad62304977
n_embd : int = 768
# Define the Rotary class
class Rotary(torch.nn.Module):
def __init__(self, dim, base=10000):
super().__init__()
self.inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
self.seq_len_cached = None
self.cos_cached = None
self.sin_cached = None
def forward(self, x):
seq_len = x.shape[1]
if seq_len!= self.seq_len_cached:
self.seq_len_cached = seq_len
t = torch.arange(seq_len, device=x.device).type_as(self.inv_freq)
freqs = torch.outer(t, self.inv_freq).to(x.device)
self.cos_cached = freqs.cos().bfloat16()
self.sin_cached = freqs.sin().bfloat16()
return self.cos_cached[None, :, None, :], self.sin_cached[None, :, None, :]
def apply_rotary_emb(x, cos, sin):
assert x.ndim == 4 # multihead attention
d = x.shape[3]//2
x1 = x[..., :d]
x2 = x[..., d:]
y1 = x1 * cos + x2 * sin
y2 = x1 * (-sin) + x2 * cos
return torch.cat([y1, y2], 3).type_as(x)
# Define the CausalSelfAttention class
class CausalSelfAttention(torch.nn.Module):
def __init__(self, config):
super().__init__()
self.n_head = config.n_head
self.n_embd = config.n_embd
self.head_dim = self.n_embd // self.n_head
assert self.n_embd % self.n_head == 0
self.c_q = torch.nn.Linear(self.n_embd, self.n_embd, bias=False)
self.c_k = torch.nn.Linear(self.n_embd, self.n_embd, bias=False)
self.c_v = torch.nn.Linear(self.n_embd, self.n_embd, bias=False)
# output projection
self.c_proj = torch.nn.Linear(self.n_embd, self.n_embd, bias=False)
self.c_proj.weight.data.zero_() # zero init suggested by @Grad62304977
self.rotary = Rotary(self.head_dim)
self.lamb = torch.nn.Parameter(torch.tensor(0.5)) # @Grad62304977
def forward(self, x, v1=None):
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
q = self.c_q(x).view(B, T, self.n_head, self.head_dim)
k = self.c_k(x).view(B, T, self.n_head, self.head_dim)
v = self.c_v(x).view(B, T, self.n_head, self.head_dim)
if v1 is None:
v1 = v # This happens if we are in the first block. v needs to be accessed by subsequent blocks
v = (1 - self.lamb) * v + self.lamb * v1.view_as(v) # @Grad62304977
cos, sin = self.rotary(q)
q, k = F.rms_norm(q, (q.size(-1),)), F.rms_norm(k, (k.size(-1),)) # QK norm suggested by @Grad62304977
q, k = apply_rotary_emb(q, cos, sin), apply_rotary_emb(k, cos, sin)
y = F.scaled_dot_product_attention(q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2), is_causal=True)
y = y.transpose(1, 2).contiguous().view_as(x) # re-assemble all head outputs side by side
y = self.c_proj(y)
return y, v1
# Define the MLP class
class MLP(torch.nn.Module):
def __init__(self, config):
super().__init__()
self.c_fc = torch.nn.Linear(config.n_embd, 4 * config.n_embd, bias=False)
self.c_proj = torch.nn.Linear(4 * config.n_embd, config.n_embd, bias=False)
self.c_proj.weight.data.zero_() # zero init suggested by @Grad62304977
def forward(self, x):
x = self.c_fc(x)
x = F.relu(x).square() # https://arxiv.org/abs/2109.08668v2; ~1-2% better than GELU; suggested by @SKYLINEZ007 and @Grad62304977
x = self.c_proj(x)
return x
# Define the Block class
class Block(torch.nn.Module):
def __init__(self, config):
super().__init__()
self.attn = CausalSelfAttention(config)
self.mlp = MLP(config)
self.lambdas = torch.nn.Parameter(torch.tensor([1., 0.]))
def forward(self, x, v1, x0):
x = self.lambdas[0] * x + self.lambdas[1] * x0
x1, v1 = self.attn(F.rms_norm(x, (x.size(-1),)), v1)
x = x + x1
x = x + self.mlp(F.rms_norm(x, (x.size(-1),)))
return x, v1
# Define the GPT class
class GPT(torch.nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.transformer = torch.nn.ModuleDict(dict(
wte = torch.nn.Embedding(config.vocab_size, config.n_embd),
h = torch.nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
))
self.lm_head = torch.nn.Linear(config.n_embd, config.vocab_size, bias=False)
self.lm_head.weight.data.zero_() # @Grad62304977
def forward(self, idx, targets=None, return_logits=True):
# forward the GPT model itself
x = self.transformer.wte(idx) # token embeddings of shape (b, t, n_embd)
x = F.rms_norm(x, (x.size(-1),)) # @Grad62304977
x0 = x
v1 = None
for block in self.transformer.h:
x, v1 = block(x, v1, x0)
x = F.rms_norm(x, (x.size(-1),))
if targets is not None:
# if we are given some desired targets also calculate the loss
logits = self.lm_head(x)
logits = 30 * torch.tanh(logits / 30) # @Grad62304977
logits = logits.float() # use tf32/fp32 for logits
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
else:
# inference-time mini-optimization: only forward the lm_head on the very last position
logits = self.lm_head(x[:, [-1], :]) # note: using list [-1] to preserve the time dim
logits = 30 * torch.tanh(logits / 30) # @Grad62304977
logits = logits.float() # use tf32/fp32 for logits
loss = None
# there are performance reasons why not returning logits is prudent, if not needed
if not return_logits:
logits = None
return logits, loss
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
"""
Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
the sequence max_new_tokens times, feeding the predictions back into the model each time.
Most likely you'll want to make sure to be in model.eval() mode of operation for this.
"""
for _ in range(max_new_tokens):
# if the sequence context is growing too long we must crop it at block_size
#idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
# forward the model to get the logits for the index in the sequence
logits, _ = self(idx)
# pluck the logits at the final step and scale by desired temperature
logits = logits[:, -1, :] / temperature
# optionally crop the logits to only the top k options
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
# apply softmax to convert logits to (normalized) probabilities
probs = F.softmax(logits, dim=-1)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1)
# append sampled index to the running sequence and continue
idx = torch.cat((idx, idx_next), dim=1)
return idx
# Load the trained parameters
def load_checkpoint(model, checkpoint_path):
checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
model.load_state_dict(dict([(n.removeprefix("_orig_mod."), p) for n, p in checkpoint['model'].items()]))
# Run LLM inference
#def run_inference(model, input_ids):
# input_ids = torch.tensor(input_ids).unsqueeze(0)
# return model.generate(input_ids, 50)
# Main function
def main():
config = GPTConfig()
model = GPT(config)
checkpoint_path = 'state_step003200.pt' # replace with your checkpoint path
load_checkpoint(model, checkpoint_path)
model.eval()
safetensors.torch.save_model(model, "nanogpt-speedrun-baseline.safetensors")
if __name__ == '__main__':
main()
|