uie-nano / modeling_uie.py
Casually's picture
Upload modeling_uie.py
6566a8b
# -*- coding: utf-8 -*-
import math
import re
from dataclasses import dataclass
from typing import Optional, Tuple, List, Union, Dict
import numpy as np
import torch
import torch.nn as nn
from transformers import ErnieModel, ErniePreTrainedModel, PretrainedConfig, PreTrainedTokenizerFast
from transformers.utils import ModelOutput
@dataclass
class UIEModelOutput(ModelOutput):
"""
Output class for outputs of UIE.
Args:
loss (`torch.FloatTensor` of shape `(1),`, *optional*, returned when `labels` is provided):
Total span extraction loss is the sum of a Cross-Entropy for the start and end positions.
start_prob (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
Span-start scores (after Sigmoid).
end_prob (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
Span-end scores (after Sigmoid).
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding
layer, + one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
Attentions weights after the attention softmax, used to compute the weighted average in the
self-attention heads.
"""
loss: Optional[torch.FloatTensor] = None
start_prob: torch.FloatTensor = None
end_prob: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
class UIE(ErniePreTrainedModel):
"""
UIE model based on Bert model.
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
etc.)
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
and behavior.
Parameters:
config ([`PretrainedConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
def __init__(self, config: PretrainedConfig):
super(UIE, self).__init__(config)
self.encoder = ErnieModel(config)
self.config = config
hidden_size = self.config.hidden_size
self.linear_start = nn.Linear(hidden_size, 1)
self.linear_end = nn.Linear(hidden_size, 1)
self.sigmoid = nn.Sigmoid()
self.post_init()
def forward(self, input_ids: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
start_positions: Optional[torch.Tensor] = None,
end_positions: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None
):
"""
Args:
input_ids (`torch.LongTensor` of shape `({0})`):
Indices of input sequence tokens in the vocabulary.
Indices can be obtained using [`BertTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids)
attention_mask (`torch.FloatTensor` of shape `({0})`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
token_type_ids (`torch.LongTensor` of shape `({0})`, *optional*):
Segment token indices to indicate first and second portions of the inputs. Indices are selected in `[0,
1]`:
- 0 corresponds to a *sentence A* token,
- 1 corresponds to a *sentence B* token.
[What are token type IDs?](../glossary#token-type-ids)
position_ids (`torch.LongTensor` of shape `({0})`, *optional*):
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
config.max_position_embeddings - 1]`.
[What are position IDs?](../glossary#position-ids)
head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
inputs_embeds (`torch.FloatTensor` of shape `({0}, hidden_size)`, *optional*):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
model's internal embedding lookup matrix.
start_positions (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for position (index) of the start of the labelled span for computing the token classification loss.
Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
are not taken into account for computing the loss.
end_positions (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for position (index) of the end of the labelled span for computing the token classification loss.
Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
are not taken into account for computing the loss.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.encoder(
input_ids=input_ids,
token_type_ids=token_type_ids,
position_ids=position_ids,
attention_mask=attention_mask,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict
)
sequence_output = outputs[0]
start_logits = self.linear_start(sequence_output)
start_logits = torch.squeeze(start_logits, -1)
start_prob = self.sigmoid(start_logits)
end_logits = self.linear_end(sequence_output)
end_logits = torch.squeeze(end_logits, -1)
end_prob = self.sigmoid(end_logits)
total_loss = None
if start_positions is not None and end_positions is not None:
loss_fct = nn.BCELoss()
start_loss = loss_fct(start_prob, start_positions)
end_loss = loss_fct(end_prob, end_positions)
total_loss = (start_loss + end_loss) / 2.0
if not return_dict:
output = (start_prob, end_prob) + outputs[2:]
return ((total_loss,) + output) if total_loss is not None else output
return UIEModelOutput(
loss=total_loss,
start_prob=start_prob,
end_prob=end_prob,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
def predict(self, schema: Union[Dict, List[str], str], input_texts: Union[List[str], str],
tokenizer: PreTrainedTokenizerFast, max_length: int = 512, batch_size: int = 32,
position_prob: int = 0.5, progress_hook=None) -> List[Dict]:
"""
Args:
schema (Union[Dict, List[str], str]): 抽取目标
input_texts (input_texts: Union[List[str], str]): 待抽取文本
tokenizer (PreTrainedTokenizerFast):
max_length (int):
batch_size (int):
position_prob (float):
progress_hook:
Returns:
result (List[Dict]):
"""
predictor = UIEPredictor(self, tokenizer=tokenizer, schema=schema, max_length=max_length,
position_prob=position_prob, batch_size=batch_size, hook=progress_hook)
input_texts = [input_texts] if isinstance(input_texts, str) else input_texts
return predictor.predict(input_texts)
class UIEPredictor(object):
def __init__(self, model, tokenizer, schema, max_length=512, position_prob=0.5, batch_size=32, hook=None):
self.model = model
self._tokenizer = tokenizer
self._position_prob = position_prob
self.max_length = max_length
self._batch_size = batch_size
self._multilingual = getattr(self.model.config, 'multilingual', False)
self._schema_tree = self.set_schema(schema)
self._hook = hook
def set_schema(self, schema):
if isinstance(schema, dict) or isinstance(schema, str):
schema = [schema]
return self._build_tree(schema)
@classmethod
def _build_tree(cls, schema, name="root"):
"""
Build the schema tree.
"""
schema_tree = SchemaTree(name)
for s in schema:
if isinstance(s, str):
schema_tree.add_child(SchemaTree(s))
elif isinstance(s, dict):
for k, v in s.items():
if isinstance(v, str):
child = [v]
elif isinstance(v, list):
child = v
else:
raise TypeError(
"Invalid schema, value for each key:value pairs should be list or string"
"but {} received".format(type(v))
)
schema_tree.add_child(cls._build_tree(child, name=k))
else:
raise TypeError("Invalid schema, element should be string or dict, " "but {} received".format(type(s)))
return schema_tree
def _single_stage_predict(self, inputs):
input_texts = []
prompts = []
for i in range(len(inputs)):
input_texts.append(inputs[i]["text"])
prompts.append(inputs[i]["prompt"])
# max predict length should exclude the length of prompt and summary tokens
max_predict_len = self.max_length - len(max(prompts)) - 3
short_input_texts, self.input_mapping = Utils.auto_splitter(input_texts, max_predict_len, split_sentence=False)
short_texts_prompts = []
for k, v in self.input_mapping.items():
short_texts_prompts.extend([prompts[k] for _ in range(len(v))])
short_inputs = [
{"text": short_input_texts[i], "prompt": short_texts_prompts[i]} for i in range(len(short_input_texts))
]
prompts = []
texts = []
for s in short_inputs:
prompts.append(s["prompt"])
texts.append(s["text"])
if self._multilingual:
padding_type = "max_length"
else:
padding_type = "longest"
encoded_inputs = self._tokenizer(
text=prompts,
text_pair=texts,
stride=2,
truncation=True,
max_length=self.max_length,
padding=padding_type,
add_special_tokens=True,
return_offsets_mapping=True,
return_tensors="np")
offset_maps = encoded_inputs["offset_mapping"]
start_probs = []
end_probs = []
for idx in range(0, len(texts), self._batch_size):
l, r = idx, idx + self._batch_size
input_ids = encoded_inputs["input_ids"][l:r]
token_type_ids = encoded_inputs["token_type_ids"][l:r]
attention_mask = encoded_inputs["attention_mask"][l:r]
if self._multilingual:
input_ids = np.array(
input_ids, dtype="int64")
attention_mask = np.array(
attention_mask, dtype="int64")
position_ids = (np.cumsum(np.ones_like(input_ids), axis=1)
- np.ones_like(input_ids)) * attention_mask
input_dict = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"position_ids": position_ids
}
else:
input_dict = {
"input_ids": np.array(
input_ids, dtype="int64"),
"token_type_ids": np.array(
token_type_ids, dtype="int64"),
"attention_mask": np.array(
attention_mask, dtype="int64")
}
start_prob, end_prob = self._infer(input_dict)
start_prob = start_prob.tolist()
end_prob = end_prob.tolist()
start_probs.extend(start_prob)
end_probs.extend(end_prob)
if self._hook is not None:
self._hook.update(1)
start_ids_list = Utils.get_bool_ids_greater_than(start_probs, limit=self._position_prob, return_prob=True)
end_ids_list = Utils.get_bool_ids_greater_than(end_probs, limit=self._position_prob, return_prob=True)
sentence_ids = []
probs = []
for start_ids, end_ids, offset_map in zip(start_ids_list, end_ids_list, offset_maps.tolist()):
span_list = Utils.get_span(start_ids, end_ids, with_prob=True)
sentence_id, prob = Utils.get_id_and_prob(span_list, offset_map)
sentence_ids.append(sentence_id)
probs.append(prob)
results = Utils.convert_ids_to_results(short_inputs, sentence_ids, probs)
results = Utils.auto_joiner(results, short_input_texts, self.input_mapping)
return results
def _multi_stage_predict(self, data):
"""
Traversal the schema tree and do multi-stage prediction.
Args:
data (list): a list of strings
Returns:
list: a list of predictions, where the list's length
equals to the length of `data`
"""
results = [{} for _ in range(len(data))]
# input check to early return
if len(data) < 1 or self._schema_tree is None:
return results
_pre_node_total = len(data) // self._batch_size + (1 if len(data) % self._batch_size else 0)
_finish_node = 0
if self._hook is not None:
self._hook.reset(total=self._schema_tree.shape * _pre_node_total)
# copy to stay `self._schema_tree` unchanged
schema_list = self._schema_tree.children[:]
while len(schema_list) > 0:
node = schema_list.pop(0)
examples = []
input_map = {}
cnt = 0
idx = 0
if not node.prefix:
for one_data in data:
examples.append({"text": one_data, "prompt": Utils.dbc2sbc(node.name)})
input_map[cnt] = [idx]
idx += 1
cnt += 1
else:
for pre, one_data in zip(node.prefix, data):
if len(pre) == 0:
input_map[cnt] = []
else:
for p in pre:
examples.append({"text": one_data, "prompt": Utils.dbc2sbc(p + node.name)})
input_map[cnt] = [i + idx for i in range(len(pre))]
idx += len(pre)
cnt += 1
if len(examples) == 0:
result_list = []
else:
result_list = self._single_stage_predict(examples)
if not node.parent_relations:
relations = [[] for _ in range(len(data))]
for k, v in input_map.items():
for idx in v:
if len(result_list[idx]) == 0:
continue
if node.name not in results[k].keys():
results[k][node.name] = result_list[idx]
else:
results[k][node.name].extend(result_list[idx])
if node.name in results[k].keys():
relations[k].extend(results[k][node.name])
else:
relations = node.parent_relations
for k, v in input_map.items():
for i in range(len(v)):
if len(result_list[v[i]]) == 0:
continue
if "relations" not in relations[k][i].keys():
relations[k][i]["relations"] = {node.name: result_list[v[i]]}
elif node.name not in relations[k][i]["relations"].keys():
relations[k][i]["relations"][node.name] = result_list[v[i]]
else:
relations[k][i]["relations"][node.name].extend(result_list[v[i]])
new_relations = [[] for _ in range(len(data))]
for i in range(len(relations)):
for j in range(len(relations[i])):
if "relations" in relations[i][j].keys() and node.name in relations[i][j]["relations"].keys():
for k in range(len(relations[i][j]["relations"][node.name])):
new_relations[i].append(relations[i][j]["relations"][node.name][k])
relations = new_relations
prefix = [[] for _ in range(len(data))]
for k, v in input_map.items():
for idx in v:
for i in range(len(result_list[idx])):
prefix[k].append(result_list[idx][i]["text"] + "的")
for child in node.children:
child.prefix = prefix
child.parent_relations = relations
schema_list.append(child)
_finish_node += 1
if self._hook is not None:
self._hook.n = _finish_node * _pre_node_total
if self._hook is not None:
self._hook.close()
return results
def _infer(self, input_dict):
for input_name, input_value in input_dict.items():
input_dict[input_name] = torch.LongTensor(input_value).to(self.model.device)
outputs = self.model(**input_dict)
return outputs.start_prob.detach().cpu().numpy(), outputs.end_prob.detach().cpu().numpy()
def predict(self, input_data):
results = self._multi_stage_predict(data=input_data)
return results
class SchemaTree(object):
"""
Implementataion of SchemaTree
"""
def __init__(self, name="root", children=None):
self.name = name
self.children = []
self.prefix = None
self.parent_relations = None
if children is not None:
for child in children:
self.add_child(child)
self._total_nodes = 0
@property
def shape(self):
return len(self.children) + sum([child.shape for child in self.children])
def __repr__(self):
return self.name
def add_child(self, node):
assert isinstance(node, SchemaTree), "The children of a node should be an instacne of SchemaTree."
self._total_nodes += 1
self.children.append(node)
class Utils:
@classmethod
def dbc2sbc(cls, s):
rs = ""
for char in s:
code = ord(char)
if code == 0x3000:
code = 0x0020
else:
code -= 0xFEE0
if not (0x0021 <= code <= 0x7E):
rs += char
continue
rs += chr(code)
return rs
@classmethod
def cut_chinese_sent(cls, para):
"""
Cut the Chinese sentences more precisely, reference to
"https://blog.csdn.net/blmoistawinde/article/details/82379256".
"""
para = re.sub(r'([。!??])([^”’])', r"\1\n\2", para) # 单字符断句符
para = re.sub(r'(\.{6})([^”’])', r"\1\n\2", para) # 英文省略号
para = re.sub(r'(…{2})([^”’])', r"\1\n\2", para) # 中文省略号
para = re.sub(r'([。!??][”’])([^,。!??])', r'\1\n\2', para)
para = para.rstrip()
return para.split("\n")
@classmethod
def get_bool_ids_greater_than(cls, probs, limit=0.5, return_prob=False):
"""
Get idx of the last dimension in probability arrays, which is greater than a limitation.
Args:
probs (List[List[float]]): The input probability arrays.
limit (float): The limitation for probability.
return_prob (bool): Whether to return the probability
Returns:
List[List[int]]: The index of the last dimension meet the conditions.
"""
probs = np.array(probs)
dim_len = len(probs.shape)
if dim_len > 1:
result = []
for p in probs:
result.append(cls.get_bool_ids_greater_than(p, limit, return_prob))
return result
else:
result = []
for i, p in enumerate(probs):
if p > limit:
if return_prob:
result.append((i, p))
else:
result.append(i)
return result
@classmethod
def get_span(cls, start_ids, end_ids, with_prob=False):
"""
Get span set from position start and end list.
Args:
start_ids (List[int]/List[tuple]): The start index list.
end_ids (List[int]/List[tuple]): The end index list.
with_prob (bool): If True, each element for start_ids and end_ids is a tuple as like: (index, probability).
Returns:
set: The span set without overlapping, every id can only be used once .
"""
if with_prob:
start_ids = sorted(start_ids, key=lambda x: x[0])
end_ids = sorted(end_ids, key=lambda x: x[0])
else:
start_ids = sorted(start_ids)
end_ids = sorted(end_ids)
start_pointer = 0
end_pointer = 0
len_start = len(start_ids)
len_end = len(end_ids)
couple_dict = {}
while start_pointer < len_start and end_pointer < len_end:
if with_prob:
start_id = start_ids[start_pointer][0]
end_id = end_ids[end_pointer][0]
else:
start_id = start_ids[start_pointer]
end_id = end_ids[end_pointer]
if start_id == end_id:
couple_dict[end_ids[end_pointer]] = start_ids[start_pointer]
start_pointer += 1
end_pointer += 1
continue
if start_id < end_id:
couple_dict[end_ids[end_pointer]] = start_ids[start_pointer]
start_pointer += 1
continue
if start_id > end_id:
end_pointer += 1
continue
result = [(couple_dict[end], end) for end in couple_dict]
result = set(result)
return result
@classmethod
def get_id_and_prob(cls, span_set, offset_mapping: np.array):
"""
Return text id and probability of predicted spans
Args:
span_set (set): set of predicted spans.
offset_mapping (numpy.array): list of pair preserving the
index of start and end char in original text pair (prompt + text) for each token.
Returns:
sentence_id (list[tuple]): index of start and end char in original text.
prob (list[float]): probabilities of predicted spans.
"""
prompt_end_token_id = offset_mapping[1:].index([0, 0])
bias = offset_mapping[prompt_end_token_id][1] + 1
for index in range(1, prompt_end_token_id + 1):
offset_mapping[index][0] -= bias
offset_mapping[index][1] -= bias
sentence_id = []
prob = []
for start, end in span_set:
prob.append(start[1] * end[1])
start_id = offset_mapping[start[0]][0]
end_id = offset_mapping[end[0]][1]
sentence_id.append((start_id, end_id))
return sentence_id, prob
@classmethod
def auto_splitter(cls, input_texts, max_text_len, split_sentence=False):
"""
Split the raw texts automatically for model inference.
Args:
input_texts (List[str]): input raw texts.
max_text_len (int): cutting length.
split_sentence (bool): If True, sentence-level split will be performed.
return:
short_input_texts (List[str]): the short input texts for model inference.
input_mapping (dict): mapping between raw text and short input texts.
"""
input_mapping = {}
short_input_texts = []
cnt_org = 0
cnt_short = 0
for text in input_texts:
if not split_sentence:
sens = [text]
else:
sens = Utils.cut_chinese_sent(text)
for sen in sens:
lens = len(sen)
if lens <= max_text_len:
short_input_texts.append(sen)
if cnt_org not in input_mapping.keys():
input_mapping[cnt_org] = [cnt_short]
else:
input_mapping[cnt_org].append(cnt_short)
cnt_short += 1
else:
temp_text_list = [sen[i: i + max_text_len] for i in range(0, lens, max_text_len)]
short_input_texts.extend(temp_text_list)
short_idx = cnt_short
cnt_short += math.ceil(lens / max_text_len)
temp_text_id = [short_idx + i for i in range(cnt_short - short_idx)]
if cnt_org not in input_mapping.keys():
input_mapping[cnt_org] = temp_text_id
else:
input_mapping[cnt_org].extend(temp_text_id)
cnt_org += 1
return short_input_texts, input_mapping
@classmethod
def convert_ids_to_results(cls, examples, sentence_ids, probs):
"""
Convert ids to raw text in a single stage.
"""
results = []
for example, sentence_id, prob in zip(examples, sentence_ids, probs):
if len(sentence_id) == 0:
results.append([])
continue
result_list = []
text = example["text"]
prompt = example["prompt"]
for i in range(len(sentence_id)):
start, end = sentence_id[i]
if start < 0 and end >= 0:
continue
if end < 0:
start += len(prompt) + 1
end += len(prompt) + 1
result = {"text": prompt[start:end], "probability": prob[i]}
result_list.append(result)
else:
result = {"text": text[start:end], "start": start, "end": end, "probability": prob[i]}
result_list.append(result)
results.append(result_list)
return results
@classmethod
def auto_joiner(cls, short_results, short_inputs, input_mapping):
concat_results = []
is_cls_task = False
for short_result in short_results:
if not short_result:
continue
elif "start" not in short_result[0].keys() and "end" not in short_result[0].keys():
is_cls_task = True
break
else:
break
for k, vs in input_mapping.items():
if is_cls_task:
cls_options = {}
for v in vs:
if len(short_results[v]) == 0:
continue
if short_results[v][0]["text"] not in cls_options.keys():
cls_options[short_results[v][0]["text"]] = [1, short_results[v][0]["probability"]]
else:
cls_options[short_results[v][0]["text"]][0] += 1
cls_options[short_results[v][0]["text"]][1] += short_results[v][0]["probability"]
if len(cls_options) != 0:
cls_res, cls_info = max(cls_options.items(), key=lambda x: x[1])
concat_results.append([{"text": cls_res, "probability": cls_info[1] / cls_info[0]}])
else:
concat_results.append([])
else:
offset = 0
single_results = []
for v in vs:
if v == 0:
single_results = short_results[v]
offset += len(short_inputs[v])
else:
for i in range(len(short_results[v])):
if "start" not in short_results[v][i] or "end" not in short_results[v][i]:
continue
short_results[v][i]["start"] += offset
short_results[v][i]["end"] += offset
offset += len(short_inputs[v])
single_results.extend(short_results[v])
concat_results.append(single_results)
return concat_results