Spaces:
Running
on
T4
Running
on
T4
from typing import Dict | |
import numpy as np | |
from ..utils import add_end_docstrings, is_tf_available, is_torch_available, logging | |
from .base import PIPELINE_INIT_ARGS, GenericTensor, Pipeline, PipelineException | |
if is_tf_available(): | |
import tensorflow as tf | |
from ..tf_utils import stable_softmax | |
if is_torch_available(): | |
import torch | |
logger = logging.get_logger(__name__) | |
class FillMaskPipeline(Pipeline): | |
""" | |
Masked language modeling prediction pipeline using any `ModelWithLMHead`. See the [masked language modeling | |
examples](../task_summary#masked-language-modeling) for more information. | |
Example: | |
```python | |
>>> from transformers import pipeline | |
>>> fill_masker = pipeline(model="bert-base-uncased") | |
>>> fill_masker("This is a simple [MASK].") | |
[{'score': 0.042, 'token': 3291, 'token_str': 'problem', 'sequence': 'this is a simple problem.'}, {'score': 0.031, 'token': 3160, 'token_str': 'question', 'sequence': 'this is a simple question.'}, {'score': 0.03, 'token': 8522, 'token_str': 'equation', 'sequence': 'this is a simple equation.'}, {'score': 0.027, 'token': 2028, 'token_str': 'one', 'sequence': 'this is a simple one.'}, {'score': 0.024, 'token': 3627, 'token_str': 'rule', 'sequence': 'this is a simple rule.'}] | |
``` | |
Learn more about the basics of using a pipeline in the [pipeline tutorial](../pipeline_tutorial) | |
This mask filling pipeline can currently be loaded from [`pipeline`] using the following task identifier: | |
`"fill-mask"`. | |
The models that this pipeline can use are models that have been trained with a masked language modeling objective, | |
which includes the bi-directional models in the library. See the up-to-date list of available models on | |
[huggingface.co/models](https://huggingface.co/models?filter=fill-mask). | |
<Tip> | |
This pipeline only works for inputs with exactly one token masked. Experimental: We added support for multiple | |
masks. The returned values are raw model output, and correspond to disjoint probabilities where one might expect | |
joint probabilities (See [discussion](https://github.com/huggingface/transformers/pull/10222)). | |
</Tip> | |
<Tip> | |
This pipeline now supports tokenizer_kwargs. For example try: | |
```python | |
>>> from transformers import pipeline | |
>>> fill_masker = pipeline(model="bert-base-uncased") | |
>>> tokenizer_kwargs = {"truncation": True} | |
>>> fill_masker( | |
... "This is a simple [MASK]. " + "...with a large amount of repeated text appended. " * 100, | |
... tokenizer_kwargs=tokenizer_kwargs, | |
... ) | |
``` | |
</Tip> | |
""" | |
def get_masked_index(self, input_ids: GenericTensor) -> np.ndarray: | |
if self.framework == "tf": | |
masked_index = tf.where(input_ids == self.tokenizer.mask_token_id).numpy() | |
elif self.framework == "pt": | |
masked_index = torch.nonzero(input_ids == self.tokenizer.mask_token_id, as_tuple=False) | |
else: | |
raise ValueError("Unsupported framework") | |
return masked_index | |
def _ensure_exactly_one_mask_token(self, input_ids: GenericTensor) -> np.ndarray: | |
masked_index = self.get_masked_index(input_ids) | |
numel = np.prod(masked_index.shape) | |
if numel < 1: | |
raise PipelineException( | |
"fill-mask", | |
self.model.base_model_prefix, | |
f"No mask_token ({self.tokenizer.mask_token}) found on the input", | |
) | |
def ensure_exactly_one_mask_token(self, model_inputs: GenericTensor): | |
if isinstance(model_inputs, list): | |
for model_input in model_inputs: | |
self._ensure_exactly_one_mask_token(model_input["input_ids"][0]) | |
else: | |
for input_ids in model_inputs["input_ids"]: | |
self._ensure_exactly_one_mask_token(input_ids) | |
def preprocess( | |
self, inputs, return_tensors=None, tokenizer_kwargs=None, **preprocess_parameters | |
) -> Dict[str, GenericTensor]: | |
if return_tensors is None: | |
return_tensors = self.framework | |
if tokenizer_kwargs is None: | |
tokenizer_kwargs = {} | |
model_inputs = self.tokenizer(inputs, return_tensors=return_tensors, **tokenizer_kwargs) | |
self.ensure_exactly_one_mask_token(model_inputs) | |
return model_inputs | |
def _forward(self, model_inputs): | |
model_outputs = self.model(**model_inputs) | |
model_outputs["input_ids"] = model_inputs["input_ids"] | |
return model_outputs | |
def postprocess(self, model_outputs, top_k=5, target_ids=None): | |
# Cap top_k if there are targets | |
if target_ids is not None and target_ids.shape[0] < top_k: | |
top_k = target_ids.shape[0] | |
input_ids = model_outputs["input_ids"][0] | |
outputs = model_outputs["logits"] | |
if self.framework == "tf": | |
masked_index = tf.where(input_ids == self.tokenizer.mask_token_id).numpy()[:, 0] | |
outputs = outputs.numpy() | |
logits = outputs[0, masked_index, :] | |
probs = stable_softmax(logits, axis=-1) | |
if target_ids is not None: | |
probs = tf.gather_nd(tf.squeeze(probs, 0), target_ids.reshape(-1, 1)) | |
probs = tf.expand_dims(probs, 0) | |
topk = tf.math.top_k(probs, k=top_k) | |
values, predictions = topk.values.numpy(), topk.indices.numpy() | |
else: | |
masked_index = torch.nonzero(input_ids == self.tokenizer.mask_token_id, as_tuple=False).squeeze(-1) | |
# Fill mask pipeline supports only one ${mask_token} per sample | |
logits = outputs[0, masked_index, :] | |
probs = logits.softmax(dim=-1) | |
if target_ids is not None: | |
probs = probs[..., target_ids] | |
values, predictions = probs.topk(top_k) | |
result = [] | |
single_mask = values.shape[0] == 1 | |
for i, (_values, _predictions) in enumerate(zip(values.tolist(), predictions.tolist())): | |
row = [] | |
for v, p in zip(_values, _predictions): | |
# Copy is important since we're going to modify this array in place | |
tokens = input_ids.numpy().copy() | |
if target_ids is not None: | |
p = target_ids[p].tolist() | |
tokens[masked_index[i]] = p | |
# Filter padding out: | |
tokens = tokens[np.where(tokens != self.tokenizer.pad_token_id)] | |
# Originally we skip special tokens to give readable output. | |
# For multi masks though, the other [MASK] would be removed otherwise | |
# making the output look odd, so we add them back | |
sequence = self.tokenizer.decode(tokens, skip_special_tokens=single_mask) | |
proposition = {"score": v, "token": p, "token_str": self.tokenizer.decode([p]), "sequence": sequence} | |
row.append(proposition) | |
result.append(row) | |
if single_mask: | |
return result[0] | |
return result | |
def get_target_ids(self, targets, top_k=None): | |
if isinstance(targets, str): | |
targets = [targets] | |
try: | |
vocab = self.tokenizer.get_vocab() | |
except Exception: | |
vocab = {} | |
target_ids = [] | |
for target in targets: | |
id_ = vocab.get(target, None) | |
if id_ is None: | |
input_ids = self.tokenizer( | |
target, | |
add_special_tokens=False, | |
return_attention_mask=False, | |
return_token_type_ids=False, | |
max_length=1, | |
truncation=True, | |
)["input_ids"] | |
if len(input_ids) == 0: | |
logger.warning( | |
f"The specified target token `{target}` does not exist in the model vocabulary. " | |
"We cannot replace it with anything meaningful, ignoring it" | |
) | |
continue | |
id_ = input_ids[0] | |
# XXX: If users encounter this pass | |
# it becomes pretty slow, so let's make sure | |
# The warning enables them to fix the input to | |
# get faster performance. | |
logger.warning( | |
f"The specified target token `{target}` does not exist in the model vocabulary. " | |
f"Replacing with `{self.tokenizer.convert_ids_to_tokens(id_)}`." | |
) | |
target_ids.append(id_) | |
target_ids = list(set(target_ids)) | |
if len(target_ids) == 0: | |
raise ValueError("At least one target must be provided when passed.") | |
target_ids = np.array(target_ids) | |
return target_ids | |
def _sanitize_parameters(self, top_k=None, targets=None, tokenizer_kwargs=None): | |
preprocess_params = {} | |
if tokenizer_kwargs is not None: | |
preprocess_params["tokenizer_kwargs"] = tokenizer_kwargs | |
postprocess_params = {} | |
if targets is not None: | |
target_ids = self.get_target_ids(targets, top_k) | |
postprocess_params["target_ids"] = target_ids | |
if top_k is not None: | |
postprocess_params["top_k"] = top_k | |
if self.tokenizer.mask_token_id is None: | |
raise PipelineException( | |
"fill-mask", self.model.base_model_prefix, "The tokenizer does not define a `mask_token`." | |
) | |
return preprocess_params, {}, postprocess_params | |
def __call__(self, inputs, *args, **kwargs): | |
""" | |
Fill the masked token in the text(s) given as inputs. | |
Args: | |
args (`str` or `List[str]`): | |
One or several texts (or one list of prompts) with masked tokens. | |
targets (`str` or `List[str]`, *optional*): | |
When passed, the model will limit the scores to the passed targets instead of looking up in the whole | |
vocab. If the provided targets are not in the model vocab, they will be tokenized and the first | |
resulting token will be used (with a warning, and that might be slower). | |
top_k (`int`, *optional*): | |
When passed, overrides the number of predictions to return. | |
Return: | |
A list or a list of list of `dict`: Each result comes as list of dictionaries with the following keys: | |
- **sequence** (`str`) -- The corresponding input with the mask token prediction. | |
- **score** (`float`) -- The corresponding probability. | |
- **token** (`int`) -- The predicted token id (to replace the masked one). | |
- **token_str** (`str`) -- The predicted token (to replace the masked one). | |
""" | |
outputs = super().__call__(inputs, **kwargs) | |
if isinstance(inputs, list) and len(inputs) == 1: | |
return outputs[0] | |
return outputs | |