|
import sys |
|
import time |
|
from pathlib import Path |
|
from typing import Any, Literal, Optional |
|
|
|
import lightning as L |
|
import torch |
|
from lightning.fabric.plugins import BitsandbytesPrecision |
|
from lightning.fabric.strategies import FSDPStrategy |
|
|
|
from tsai_gpt.model import GPT, Block, Config |
|
from tsai_gpt.tokenizer import Tokenizer |
|
from tsai_gpt.utils import (get_default_supported_precision, gptq_quantization, |
|
load_checkpoint) |
|
|
|
L.seed_everything(1234) |
|
|
|
|
|
def multinomial_num_samples_1(probs: torch.Tensor) -> torch.Tensor: |
|
if torch._dynamo.is_compiling(): |
|
|
|
distribution = torch.empty_like(probs).exponential_(1) |
|
return torch.argmax(probs / distribution, dim=-1, keepdim=True) |
|
return torch.multinomial(probs, num_samples=1) |
|
|
|
|
|
def sample( |
|
logits: torch.Tensor, temperature: float = 1.0, top_k: Optional[int] = None |
|
) -> torch.Tensor: |
|
logits = logits[0, -1] |
|
|
|
if top_k is not None: |
|
v, i = torch.topk(logits, min(top_k, logits.size(-1))) |
|
|
|
logits = torch.full_like(logits, float("-inf")).scatter_(-1, i, v) |
|
|
|
if temperature > 0.0: |
|
probs = torch.nn.functional.softmax(logits / temperature, dim=-1) |
|
return multinomial_num_samples_1(probs) |
|
return torch.argmax(logits, dim=-1, keepdim=True) |
|
|
|
|
|
def next_token( |
|
model: GPT, input_pos: torch.Tensor, x: torch.Tensor, **kwargs: Any |
|
) -> torch.Tensor: |
|
logits = model(x, input_pos) |
|
next = sample(logits, **kwargs) |
|
return next.type_as(x) |
|
|
|
|
|
@torch.inference_mode() |
|
def generate( |
|
model: GPT, |
|
prompt: torch.Tensor, |
|
max_returned_tokens: int, |
|
*, |
|
temperature: float = 1.0, |
|
top_k: Optional[int] = None, |
|
eos_id: Optional[int] = None, |
|
) -> torch.Tensor: |
|
"""Takes a conditioning sequence (prompt) as input and continues to generate as many tokens as requested. |
|
|
|
The implementation of this function is modified from A. Karpathy's nanoGPT. |
|
|
|
Args: |
|
model: The model to use. |
|
prompt: Tensor of shape (T) with indices of the prompt sequence. |
|
max_returned_tokens: The maximum number of tokens to return (given plus generated). |
|
temperature: Scales the predicted logits by 1 / temperature. |
|
top_k: If specified, only sample among the tokens with the k highest probabilities. |
|
eos_id: If specified, stop generating any more token once the <eos> token is triggered. |
|
""" |
|
T = prompt.size(0) |
|
assert max_returned_tokens > T |
|
if model.max_seq_length < max_returned_tokens - 1: |
|
|
|
|
|
|
|
raise NotImplementedError( |
|
f"max_seq_length {model.max_seq_length} needs to be >= {max_returned_tokens - 1}" |
|
) |
|
|
|
device = prompt.device |
|
tokens = [prompt] |
|
input_pos = torch.tensor([T], device=device) |
|
token = next_token( |
|
model, |
|
torch.arange(0, T, device=device), |
|
prompt.view(1, -1), |
|
temperature=temperature, |
|
top_k=top_k, |
|
).clone() |
|
tokens.append(token) |
|
for _ in range(2, max_returned_tokens - T + 1): |
|
token = next_token( |
|
model, input_pos, token.view(1, -1), temperature=temperature, top_k=top_k |
|
).clone() |
|
tokens.append(token) |
|
if token == eos_id: |
|
break |
|
input_pos = input_pos.add_(1) |
|
return torch.cat(tokens) |
|
|
|
|
|
""" |
|
quantize (Optional[Literal["bnb.nf4", "bnb.nf4, optional): quantization method to use. Defaults to None. |
|
- "bnb.nf4", "bnb.nf4-dq", "bnb.fp4", "bnb.fp4-dq": 4-bit quantization bitsandbytes |
|
- "bnb.int8": 8-bit quantization bitsandbytes |
|
- "gptq.int4": 4-bit quantization GPTQ |
|
for more details see: https://github.com/facebookresearch/bitsandbytes, https://github.com/Lightning-AI/lit-gpt/blob/main/tutorials/quantize.md |
|
strategy (str, optional): Fabric strategy setting. Defaults to "auto". |
|
devices (int, optional): number of devices to be used. Defaults to 1. |
|
precision (Optional[str], optional): fabic precision settings. Defaults to None. |
|
""" |
|
|
|
chptk_path: str = "saved_model/last-iter-015000-ckpt.pth" |
|
tokenizer_path: str = "tokenizer_Llama-2-7b-chat-hf" |
|
quantize: Optional[ |
|
Literal["bnb.nf4", "bnb.nf4-dq", "bnb.fp4", "bnb.fp4-dq", "bnb.int8", "gptq.int4"] |
|
] = None |
|
strategy: str = "auto" |
|
devices: int = 1 |
|
precision: Optional[str] = None |
|
|
|
precision = precision or get_default_supported_precision(training=False) |
|
plugins = None |
|
if quantize is not None: |
|
if devices > 1: |
|
raise NotImplemented("Multi-GPU quantization is not supported yet.") |
|
if quantize.startswith("bnb."): |
|
if "mixed" in precision: |
|
raise ValueError("Quantization and mixed precision is not supported.") |
|
dtype = {"16-true": torch.float16, "bf16-true": torch.bfloat16, "32-true": torch.float32}[ |
|
precision |
|
] |
|
plugins = BitsandbytesPrecision(quantize[4:], dtype) |
|
precision = None |
|
|
|
if strategy == "fsdp": |
|
strategy = FSDPStrategy(auto_wrap_policy={Block}, cpu_offload=False) |
|
|
|
fabric = L.Fabric(devices=devices, strategy=strategy, precision=precision, plugins=plugins) |
|
fabric.launch() |
|
|
|
tokenizer = Tokenizer(Path("tokenizer_Llama-2-7b-chat-hf")) |
|
config = Config.from_name("pythia-160m") |
|
|
|
fabric.print(f"Loading model from {chptk_path}", file=sys.stderr) |
|
t0 = time.perf_counter() |
|
with fabric.init_module(empty_init=True), gptq_quantization(quantize == "gptq.int4"): |
|
model = GPT(config) |
|
fabric.print( |
|
f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.", file=sys.stderr |
|
) |
|
with fabric.init_tensor(): |
|
|
|
model.set_kv_cache(batch_size=1) |
|
|
|
model.eval() |
|
model = fabric.setup_module(model) |
|
|
|
t0 = time.perf_counter() |
|
load_checkpoint(fabric, model, chptk_path) |
|
fabric.print( |
|
f"Time to load the model weights: {time.perf_counter() - t0:.02f} seconds.", file=sys.stderr |
|
) |
|
|
|
|
|
def generate_from_prompt( |
|
prompt: str = "", |
|
max_new_tokens: int = 500, |
|
top_k: int = 200, |
|
temperature: float = 0.8, |
|
): |
|
"""Generate text from a prompt using pre-trained model |
|
|
|
Args: |
|
prompt (str, optional): Prompt string to be used for generating samples. Defaults to "". |
|
num_samples (int, optional): Number of samples to be generated. Defaults to 1. |
|
max_new_tokens (int, optional): number of generation steps to take. Defaults to 500. |
|
top_k (int, optional): top most preferable tokens to consider in the sampling process. Defaults to 200. |
|
temperature (float, optional): Control randomness for sampelling process. Defaults to 0.8. |
|
""" |
|
encoded = tokenizer.encode(prompt, device=fabric.device) |
|
prompt_length = encoded.size(0) |
|
max_returned_tokens = prompt_length + max_new_tokens |
|
with fabric.init_tensor(): |
|
|
|
model.max_seq_length = max_returned_tokens |
|
|
|
num_samples: int = 1 |
|
for i in range(num_samples): |
|
t0 = time.perf_counter() |
|
y = generate(model, encoded, max_returned_tokens, temperature=temperature, top_k=top_k) |
|
t = time.perf_counter() - t0 |
|
|
|
|
|
pred = tokenizer.decode(y) |
|
fabric.print(pred) |
|
tokens_generated = y.size(0) - prompt_length |
|
fabric.print( |
|
f"Time for inference {i + 1}: {t:.02f} sec total, {tokens_generated / t:.02f} tokens/sec", |
|
file=sys.stderr, |
|
) |
|
if fabric.device.type == "cuda": |
|
fabric.print( |
|
f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB", file=sys.stderr |
|
) |
|
|
|
return pred |
|
|