Biskwit's picture
upload model
16d3628 verified
raw
history blame contribute delete
No virus
5.79 kB
import torch, transformers
from typing import Any, Dict
from transformers import AutoTokenizer, AutoModelForCausalLM
import re
import unicodedata
class EndpointHandler:
def __init__(self, path):
tokenizer = AutoTokenizer.from_pretrained(path)
model = AutoModelForCausalLM.from_pretrained(
path, device_map="auto", torch_dtype=torch.bfloat16, load_in_4bit=True
)
self.pipeline = transformers.pipeline(
"text-generation", model=model, tokenizer=tokenizer
)
def __call__(self, data: Dict[str, Any]) -> Dict[str, str]:
# process input
inputs = data.pop("inputs", data)
# default parameters
parameters = {
"max_new_tokens": 128,
"do_sample": True,
"top_k": 10,
"temperature": 1.0,
"return_full_text": False,
}
# user parameters
parameters.update(data.pop("parameters", {}))
unique = isinstance(inputs, str)
inputs, denormalize_funcs = claire_text_preproc_conversation(inputs)
sequences = self.pipeline(inputs, **parameters)
if unique:
return [{"generated_text": denormalize_funcs(sequences[0]["generated_text"])}]
else:
assert len(denormalize_funcs) == len(sequences)
return [{"generated_text": denormalize_func(seq[0]["generated_text"])} for denormalize_func, seq in zip(denormalize_funcs, sequences)]
def claire_text_preproc_conversation(text):
if isinstance(text, (list, tuple)):
assert len(text)
# Apply and transpose
texts, denormalize_funcs = zip(*[claire_text_preproc_conversation(t) for t in text])
return list(texts), list(denormalize_funcs)
if not isinstance(text, str):
return text
text = format_special_characters(text)
text = re.sub(" - | -$|^- ", " ", text.strip(" "))
global _reverse_tag_transfo
_reverse_tag_transfo = {}
text = format_special_tags(text)
text = collapse_whitespaces_conversations(text)
if _reverse_tag_transfo:
reverse_tag_transfo = _reverse_tag_transfo.copy()
def denormalize_func(t):
for k, v in reverse_tag_transfo.items():
if k in t:
t = t.replace(k, v)
return t
return text, lambda x: denormalize_func(x)
else:
return text, lambda x: x
_brackets = re.compile(r"\[([^\]]*)\]")
_pattern_speaker = re.compile(r"[^\]]+:")
# Global variable to remember some normalizations that were done and apply it back
_reverse_tag_transfo = {}
_anonymized_prefix = None
def format_special_tags(text):
global _reverse_tag_transfo, _anonymized_prefix
_anonymized_prefix = None
text = re.sub(_brackets, _format_special_tags, text)
# At last the generic anonymization
if _anonymized_prefix:
_reverse_tag_transfo["[Intervenant "] = _anonymized_prefix
return text
def _format_special_tags(match):
content_within_brackets = match.group(1)
if re.match(_pattern_speaker, content_within_brackets):
return _format_tag(match.group())
else:
return ""
def _format_tag(text):
global _reverse_tag_transfo, _anonymized_prefix
if text.endswith(":]"):
anonymized_spk_prefixes = ["speaker", "spk", "locuteur"]
# Conversion "[speaker001:]" -> "[Intervenant 1:]"
for prefix in anonymized_spk_prefixes:
if text.lower().startswith("["+prefix):
try:
index = int(text[len(prefix)+1:-2])
except ValueError:
return text
new_spk_tag = f"[Intervenant {index}:]"
_reverse_tag_transfo[new_spk_tag] = text
if _anonymized_prefix is None:
prefix = "["+prefix
while len(prefix) < len(text) and text[len(prefix)] in " 0":
prefix += text[len(prefix)]
_anonymized_prefix = prefix
return "\n" + new_spk_tag
# Capitalize speaker name
speaker = text[1:-2]
speaker = capitalize(speaker)
new_spk_tag = f"[{speaker}:]"
if text != new_spk_tag:
_reverse_tag_transfo[new_spk_tag] = text
return "\n" + new_spk_tag
# if text == "[PII]":
# return "[Nom]"
# if text == "[NOISE]":
# return "[bruit]"
# if text == "[LAUGHTER]":
# return "[rire]"
return ""
def capitalize(text):
# Custom capitalization for first and last names
words = text.split(" ")
words = [w.capitalize() if (not w.isupper() or len(w) > 2) else w for w in words]
for i, w in enumerate(words):
for sep in "-", "'":
if sep in w:
words[i] = sep.join(
[x.capitalize() if not x.isupper() else x for x in w.split(sep)]
)
return " ".join(words)
def collapse_whitespaces_conversations(text):
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n ", "\n", text)
text = re.sub(r" ([\.,])", r"\1", text)
return text.lstrip().rstrip(" ")
def format_special_characters(text):
text = unicodedata.normalize("NFC", text)
for before, after in [
("…", "..."),
(r"[«“][^\S\r\n]*", '"'),
(r"[^\S\r\n]*[»”″„]", '"'),
(r"(``|'')", '"'),
(r"[’‘‛ʿ]", "'"),
("‚", ","),
(r"–", "-"),
("[  ]", " "), # unbreakable spaces
(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", ""), # non-printable characters
# ("·", "."),
(r"ᵉʳ", "er"),
(r"ᵉ", "e"),
]:
text = re.sub(before, after, text)
return text