Spaces:
Running
on
Zero
Running
on
Zero
import json | |
import logging | |
import regex | |
import time | |
from itertools import chain, islice | |
from pathlib import Path | |
from typing import Annotated, Iterator | |
import ijson | |
import outlines | |
import torch | |
from pydantic import BaseModel, StringConstraints, conlist, conset | |
from outlines import generate, models | |
from outlines.generate.api import SequenceGenerator | |
from transformers import AutoTokenizer | |
from fsm import replace_fields | |
from samplers import PenalizedMultinomialSampler | |
from utils import StringIteratorIO | |
logger = logging.getLogger(__name__) | |
logger.warning("Loading model...") | |
if torch.backends.mps.is_available(): | |
device = "mps" | |
model_id = "Qwen/Qwen1.5-0.5B-Chat" | |
batch_size = 4 | |
else: | |
device = "cuda" | |
model_id = "google/gemma-2b-it" | |
batch_size = 4 | |
model = models.transformers(model_id, device=device) | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
sampler = PenalizedMultinomialSampler() | |
low_temperature_sampler = PenalizedMultinomialSampler(temperature=0.3) | |
empty_tokens = [token_id for token_id in range(tokenizer.vocab_size) if not tokenizer.decode([token_id], skip_special_tokens=True).strip()] | |
sampler.set_max_repeats(empty_tokens, 1) | |
disallowed_patterns = [regex.compile(r"\p{Han}")] # focus on english for now | |
disallowed_tokens = [token_id for token_id in range(tokenizer.vocab_size) if any(pattern.match(tokenizer.decode([token_id], skip_special_tokens=True)) for pattern in disallowed_patterns)] | |
sampler.set_max_repeats(disallowed_tokens, 0) | |
# This Sample & Dataset models ztr just templated with placeholder fields | |
class Sample(BaseModel): | |
# We use get_samples_generator() to replace the placeholder with the requested fields | |
ABCDabcd12: str | |
EFGHefgh34: str | |
IJKLijkl56: str | |
MNOPmnop78: str | |
QRSTqrst90: str | |
# PS: don't use StringConstraints with max_length here since it creates a fsm that is too big | |
class Dataset(BaseModel): | |
# We use get_samples_generator() to set the length to infinity | |
data: conlist(Sample, min_length=2, max_length=3) # type: ignore | |
samples_generator_template = generate.json(model, Dataset, sampler=sampler) | |
class Columns(BaseModel): | |
columns: conset(Annotated[str, StringConstraints(pattern=r'[a-z0-9_]+')], min_length=2, max_length=len(Sample.model_fields) - 1) # type: ignore | |
columns_generator = generate.json(model, Columns, sampler=low_temperature_sampler) | |
def get_samples_generator(new_fields: list[str]) -> SequenceGenerator: | |
fsm=samples_generator_template.fsm | |
fsm = replace_fields( # replace the placeholder fields by the real fields | |
fsm=samples_generator_template.fsm, | |
model=Sample, | |
new_fields=new_fields, | |
tokenizer=tokenizer, | |
make_infinite_loop=True # to generate as many samples as we want | |
) | |
return SequenceGenerator( | |
fsm=fsm, | |
model=samples_generator_template.model, | |
sampler=samples_generator_template.sampler, | |
device=device | |
) | |
def columns_prompt(filename: str): | |
"""I would like to create a JSON file named {{ filename }}.json for a dataset of realistic data. | |
Give an example of column names / columns for this dataset to populate a SQL schema. | |
Please reply in JSON format and place the columns in a field named "columns". | |
""" | |
def samples_prommpt(filename: str, prompt: str, columns: str): | |
"""I would like to create a JSON file named {{ filename }}.json for a dataset of realistic data. | |
Give an example of content using a JSON field named "data" with samples with columns {{ columns }}. | |
{{ prompt }} | |
""" | |
def stream_json_objects_from_batched_tokens_generator(batched_tokens_generator: Iterator[list[str]], json_field: str) -> Iterator[dict]: | |
first_batch = next(batched_tokens_generator) | |
batch_size = len(first_batch) | |
streams = [""] * batch_size | |
skips = [0] * batch_size | |
for tokens_batch in chain([first_batch], batched_tokens_generator): | |
for stream_idx, token in enumerate(tokens_batch): | |
streams[stream_idx] += token | |
if '"' in token or "}" in token: | |
try: | |
for stream_sample in islice(ijson.items(StringIteratorIO(streams[stream_idx].__iter__()), json_field + ".item", buf_size=1), skips[stream_idx], None): | |
yield stream_sample | |
skips[stream_idx] = +1 | |
except ijson.IncompleteJSONError: | |
pass | |
def stream_jsonl_file(filename: str, prompt: str, columns: list[str], seed: int, size: int) -> Iterator[str]: | |
filename = Path(filename).stem | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=})") | |
_start = time.time() | |
rng = torch.Generator(device=model.device) | |
rng.manual_seed(seed) | |
if not columns: | |
messages = [ | |
{"role": "user", "content": columns_prompt(filename=filename)} | |
] | |
text = tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) Generating columns...") | |
columns_generator_tokens = columns_generator.stream(text, rng=rng) | |
for column in ijson.items(StringIteratorIO(columns_generator_tokens), "columns.item", buf_size=16): | |
columns.append(column) | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) Generating columns... DONE (total={time.time() - _start:.02f}s)") | |
columns = [ | |
tokenizer.decode(tokenizer.encode(column, add_special_tokens=False)[:len(orig_field)], skip_special_tokens=True) | |
for column, orig_field in zip(columns, Sample.model_fields) | |
] | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating JSON regex guide...") | |
samples_generator = get_samples_generator(new_fields=columns) | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating JSON regex guide... DONE (total={time.time() - _start:.02f}s)") | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating samples...") | |
messages = [ | |
{"role": "user", "content": samples_prommpt(filename=filename, prompt=prompt, columns="'" + "', '".join(columns) + "'")} | |
] | |
text = tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
batched_samples_generator_tokens = samples_generator.stream([text] * batch_size, rng=rng) | |
json_field = list(Dataset.model_fields)[0] | |
for _, sample in zip(range(size), stream_json_objects_from_batched_tokens_generator(batched_samples_generator_tokens, json_field=json_field)): | |
yield json.dumps(sample, ensure_ascii=False) + "\n" | |
logger.warning(f"stream_response({filename=}, {prompt=}, {columns=}) - Generating samples... DONE (total={time.time() - _start:.02f}s)") | |