lhoestq's picture
lhoestq HF staff
default temperature
3642076
raw
history blame
6.91 kB
import json
import logging
import regex
import time
from itertools import chain, islice
from pathlib import Path
from typing import Annotated, Iterator
import ijson
import outlines
import torch
from pydantic import BaseModel, StringConstraints, conlist, conset
from outlines import generate, models
from outlines.generate.api import SequenceGenerator
from transformers import AutoTokenizer
from fsm import replace_fields
from samplers import PenalizedMultinomialSampler
from utils import StringIteratorIO
logger = logging.getLogger(__name__)
logger.warning("Loading model...")
if torch.backends.mps.is_available():
device = "mps"
model_id = "Qwen/Qwen1.5-0.5B-Chat"
batch_size = 4
else:
device = "cuda"
model_id = "google/gemma-2b-it"
batch_size = 4
model = models.transformers(model_id, device=device)
tokenizer = AutoTokenizer.from_pretrained(model_id)
sampler = PenalizedMultinomialSampler()
low_temperature_sampler = PenalizedMultinomialSampler(temperature=0.3)
empty_tokens = [token_id for token_id in range(tokenizer.vocab_size) if not tokenizer.decode([token_id], skip_special_tokens=True).strip()]
sampler.set_max_repeats(empty_tokens, 1)
disallowed_patterns = [regex.compile(r"\p{Han}")] # focus on english for now
disallowed_tokens = [token_id for token_id in range(tokenizer.vocab_size) if any(pattern.match(tokenizer.decode([token_id], skip_special_tokens=True)) for pattern in disallowed_patterns)]
sampler.set_max_repeats(disallowed_tokens, 0)
# This Sample & Dataset models ztr just templated with placeholder fields
class Sample(BaseModel):
# We use get_samples_generator() to replace the placeholder with the requested fields
ABCDabcd12: str
EFGHefgh34: str
IJKLijkl56: str
MNOPmnop78: str
QRSTqrst90: str
# PS: don't use StringConstraints with max_length here since it creates a fsm that is too big
class Dataset(BaseModel):
# We use get_samples_generator() to set the length to infinity
data: conlist(Sample, min_length=2, max_length=3) # type: ignore
samples_generator_template = generate.json(model, Dataset, sampler=sampler)
class Columns(BaseModel):
columns: conset(Annotated[str, StringConstraints(pattern=r'[a-z0-9_]+')], min_length=2, max_length=len(Sample.model_fields) - 1) # type: ignore
columns_generator = generate.json(model, Columns, sampler=low_temperature_sampler)
def get_samples_generator(new_fields: list[str]) -> SequenceGenerator:
fsm=samples_generator_template.fsm
fsm = replace_fields( # replace the placeholder fields by the real fields
fsm=samples_generator_template.fsm,
model=Sample,
new_fields=new_fields,
tokenizer=tokenizer,
make_infinite_loop=True # to generate as many samples as we want
)
return SequenceGenerator(
fsm=fsm,
model=samples_generator_template.model,
sampler=samples_generator_template.sampler,
device=device
)
@outlines.prompt
def columns_prompt(filename: str):
"""I would like to create a JSON file named {{ filename }}.json for a dataset of realistic data.
Give an example of column names / columns for this dataset to populate a SQL schema.
Please reply in JSON format and place the columns in a field named "columns".
"""
@outlines.prompt
def samples_prommpt(filename: str, prompt: str, columns: str):
"""I would like to create a JSON file named {{ filename }}.json for a dataset of realistic data.
Give an example of content using a JSON field named "data" with samples with columns {{ columns }}.
{{ prompt }}
"""
def stream_json_objects_from_batched_tokens_generator(batched_tokens_generator: Iterator[list[str]], json_field: str) -> Iterator[dict]:
first_batch = next(batched_tokens_generator)
batch_size = len(first_batch)
streams = [""] * batch_size
skips = [0] * batch_size
for tokens_batch in chain([first_batch], batched_tokens_generator):
for stream_idx, token in enumerate(tokens_batch):
streams[stream_idx] += token
if '"' in token or "}" in token:
try:
for stream_sample in islice(ijson.items(StringIteratorIO(streams[stream_idx].__iter__()), json_field + ".item", buf_size=1), skips[stream_idx], None):
yield stream_sample
skips[stream_idx] = +1
except ijson.IncompleteJSONError:
pass
def stream_jsonl_file(filename: str, prompt: str, columns: list[str], seed: int, size: int) -> Iterator[str]:
filename = Path(filename).stem
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=})")
_start = time.time()
rng = torch.Generator(device=model.device)
rng.manual_seed(seed)
if not columns:
messages = [
{"role": "user", "content": columns_prompt(filename=filename)}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) Generating columns...")
columns_generator_tokens = columns_generator.stream(text, rng=rng)
for column in ijson.items(StringIteratorIO(columns_generator_tokens), "columns.item", buf_size=16):
columns.append(column)
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) Generating columns... DONE (total={time.time() - _start:.02f}s)")
columns = [
tokenizer.decode(tokenizer.encode(column, add_special_tokens=False)[:len(orig_field)], skip_special_tokens=True)
for column, orig_field in zip(columns, Sample.model_fields)
]
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating JSON regex guide...")
samples_generator = get_samples_generator(new_fields=columns)
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating JSON regex guide... DONE (total={time.time() - _start:.02f}s)")
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating samples...")
messages = [
{"role": "user", "content": samples_prommpt(filename=filename, prompt=prompt, columns="'" + "', '".join(columns) + "'")}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
batched_samples_generator_tokens = samples_generator.stream([text] * batch_size, rng=rng)
json_field = list(Dataset.model_fields)[0]
for _, sample in zip(range(size), stream_json_objects_from_batched_tokens_generator(batched_samples_generator_tokens, json_field=json_field)):
yield json.dumps(sample, ensure_ascii=False) + "\n"
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating samples... DONE (total={time.time() - _start:.02f}s)")