limafang
commit
a81203e
raw
history blame
8.82 kB
import paddle
import numpy as np
import random
from paddlenlp.transformers import SkepTokenizer, SkepModel
import gradio as gr
from seqeval.metrics.sequence_labeling import get_entities
label_ext_path = "./data/data121190/label_ext.dict"
label_cls_path = "./data/data121242/label_cls.dict"
ext_model_path = "./best_ext.pdparams"
cls_model_path = "./best_cls.pdparams"
def set_seed(seed):
paddle.seed(seed)
random.seed(seed)
np.random.seed(seed)
def format_print(results):
for result in results:
aspect, opinion = result[0], set(result[1:])
print(f"aspect: {aspect}, opinion: {opinion}\n")
def decoding(text, tag_seq):
assert len(text) == len(tag_seq), f"text len: {len(text)}, tag_seq len: {len(tag_seq)}"
puncs = list(",.?;!,。?;!")
splits = [idx for idx in range(len(text)) if text[idx] in puncs]
prev = 0
sub_texts, sub_tag_seqs = [], []
for i, split in enumerate(splits):
sub_tag_seqs.append(tag_seq[prev:split])
sub_texts.append(text[prev:split])
prev = split
sub_tag_seqs.append(tag_seq[prev:])
sub_texts.append((text[prev:]))
ents_list = []
for sub_text, sub_tag_seq in zip(sub_texts, sub_tag_seqs):
ents = get_entities(sub_tag_seq, suffix=False)
ents_list.append((sub_text, ents))
aps = []
no_a_words = []
for sub_tag_seq, ent_list in ents_list:
sub_aps = []
sub_no_a_words = []
# print(ent_list)
for ent in ent_list:
ent_name, start, end = ent
if ent_name == "Aspect":
aspect = sub_tag_seq[start:end+1]
sub_aps.append([aspect])
if len(sub_no_a_words) > 0:
sub_aps[-1].extend(sub_no_a_words)
sub_no_a_words.clear()
else:
ent_name == "Opinion"
opinion = sub_tag_seq[start:end + 1]
if len(sub_aps) > 0:
sub_aps[-1].append(opinion)
else:
sub_no_a_words.append(opinion)
if sub_aps:
aps.extend(sub_aps)
if len(no_a_words) > 0:
aps[-1].extend(no_a_words)
no_a_words.clear()
elif sub_no_a_words:
if len(aps) > 0:
aps[-1].extend(sub_no_a_words)
else:
no_a_words.extend(sub_no_a_words)
if no_a_words:
no_a_words.insert(0, "None")
aps.append(no_a_words)
return aps
def is_aspect_first(text, aspect, opinion_word):
return text.find(aspect) <= text.find(opinion_word)
def concate_aspect_and_opinion(text, aspect, opinion_words):
aspect_text = ""
for opinion_word in opinion_words:
if is_aspect_first(text, aspect, opinion_word):
aspect_text += aspect+opinion_word+","
else:
aspect_text += opinion_word+aspect+","
aspect_text = aspect_text[:-1]
return aspect_text
def format_print(results):
for result in results:
aspect, opinions, sentiment = result["aspect"], result["opinions"], result["sentiment"]
print(f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}")
print()
return f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}"
def is_target_first(text, target, word):
return text.find(target) <= text.find(word)
def ext_load_dict(dict_path):
with open(dict_path, "r", encoding="utf-8") as f:
words = [word.strip() for word in f.readlines()]
word2id = dict(zip(words, range(len(words))))
id2word = dict((v, k) for k, v in word2id.items())
return word2id, id2word
def cls_load_dict(dict_path):
with open(dict_path, "r", encoding="utf-8") as f:
words = [word.strip() for word in f.readlines()]
word2id = dict(zip(words, range(len(words))))
id2word = dict((v, k) for k, v in word2id.items())
return word2id, id2word
def read(data_path):
with open(data_path, "r", encoding="utf-8") as f:
for line in f.readlines():
items = line.strip().split("\t")
assert len(items) == 3
example = {"label": int(
items[0]), "target_text": items[1], "text": items[2]}
yield example
def convert_example_to_feature(example, tokenizer, label2id, max_seq_len=512, is_test=False):
encoded_inputs = tokenizer(
example["target_text"], text_pair=example["text"], max_seq_len=max_seq_len, return_length=True)
if not is_test:
label = example["label"]
return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"], label
return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"]
class SkepForTokenClassification(paddle.nn.Layer):
def __init__(self, skep, num_classes=2, dropout=None):
super(SkepForTokenClassification, self).__init__()
self.num_classes = num_classes
self.skep = skep
self.dropout = paddle.nn.Dropout(
dropout if dropout is not None else self.skep.config["hidden_dropout_prob"])
self.classifier = paddle.nn.Linear(
self.skep.config["hidden_size"], num_classes)
def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None):
sequence_output, _ = self.skep(
input_ids, token_type_ids=token_type_ids, position_ids=position_ids, attention_mask=attention_mask)
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
return logits
class SkepForSequenceClassification(paddle.nn.Layer):
def __init__(self, skep, num_classes=2, dropout=None):
super(SkepForSequenceClassification, self).__init__()
self.num_classes = num_classes
self.skep = skep
self.dropout = paddle.nn.Dropout(
dropout if dropout is not None else self.skep.config["hidden_dropout_prob"])
self.classifier = paddle.nn.Linear(
self.skep.config["hidden_size"], num_classes)
def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None):
_, pooled_output = self.skep(input_ids, token_type_ids=token_type_ids,
position_ids=position_ids, attention_mask=attention_mask)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
return logits
# load dict
model_name = "skep_ernie_1.0_large_ch"
target1_dir = "./skepTokenizer"
target2_dir = "./skepModel"
ext_label2id, ext_id2label = ext_load_dict(label_ext_path)
cls_label2id, cls_id2label = cls_load_dict(label_cls_path)
tokenizer = SkepTokenizer.from_pretrained(target1_dir)
print("label dict loaded.")
# load ext model
ext_state_dict = paddle.load(ext_model_path)
ext_skep = SkepModel.from_pretrained(target2_dir)
ext_model = SkepForTokenClassification(ext_skep, num_classes=len(ext_label2id))
ext_model.load_dict(ext_state_dict)
print("extraction model loaded.")
# load cls model
cls_state_dict = paddle.load(cls_model_path)
cls_skep = ext_skep
cls_model = SkepForSequenceClassification(
cls_skep, num_classes=len(cls_label2id))
cls_model.load_dict(cls_state_dict)
print("classification model loaded.")
def predict(input_text):
ext_model.eval()
cls_model.eval()
# processing input text
encoded_inputs = tokenizer(list(input_text), is_split_into_words=True, max_seq_len=max_seq_len,)
input_ids = paddle.to_tensor([encoded_inputs["input_ids"]])
token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]])
# extract aspect and opinion words
logits = ext_model(input_ids, token_type_ids=token_type_ids)
predictions = logits.argmax(axis=2).numpy()[0]
tag_seq = [ext_id2label[idx] for idx in predictions][1:-1]
aps = decoding(input_text, tag_seq)
# predict sentiment for aspect with cls_model
results = []
for ap in aps:
aspect = ap[0]
opinion_words = list(set(ap[1:]))
aspect_text = concate_aspect_and_opinion(input_text, aspect, opinion_words)
encoded_inputs = tokenizer(aspect_text, text_pair=input_text, max_seq_len=max_seq_len, return_length=True)
input_ids = paddle.to_tensor([encoded_inputs["input_ids"]])
token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]])
logits = cls_model(input_ids, token_type_ids=token_type_ids)
prediction = logits.argmax(axis=1).numpy()[0]
result = {"aspect": aspect, "opinions": opinion_words, "sentiment": cls_id2label[prediction]}
results.append(result)
# print results
return format_print(results)
max_seq_len = 512
gr.Interface(inputs=["text"],outputs=["text"],fn= predict).launch()