|
|
|
"""GradioAppTest.ipynb |
|
|
|
Automatically generated by Colaboratory. |
|
|
|
Original file is located at |
|
https://colab.research.google.com/drive/1QhxoNhhM_kcaoQOyz5hsNWLcf2m2L225 |
|
""" |
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
from transformers import pipeline |
|
|
|
"""## JSON""" |
|
|
|
|
|
trainedProcess = "praksa" |
|
trainedProcessJSON = "Praksa" |
|
|
|
json = [ |
|
{ |
|
"name": "Praksa", |
|
"phases": [ |
|
{ |
|
"name": "Odabir preferencija", |
|
"alias": ["Prijava prakse", "Odabir zadatka", "Prvi korak"], |
|
"description": "Odabir preferencija je prvi korak u procesu polaganja prakse. Zahtjeva da student odabere zadatak sa popisa...", |
|
"duration": "1 mjesec", |
|
}, |
|
{ |
|
"name": "Ispunjavanje prijavnice", |
|
"description": "Ispunjavanje prijavnice je drugi korak u procesu polaganja prakse. Student mora ispuniti prijavnicu koja se nalazi na stranici kolegija...", |
|
"duration": "1 tjedan", |
|
}, |
|
{ |
|
"name": "Predaja dnevnika prakse", |
|
"alias": ["Završetak prakse", "Dnevnik"], |
|
"description": "Predaja dnevnika prakse zadnji je korak u procesu polaganja prakse. S završetkom rada, student predaje dnevnik prakse na stranicu kolegija...", |
|
"duration": "3 dana", |
|
}, |
|
], |
|
"duration": "2 mjeseca", |
|
}, |
|
{ |
|
"name": "Izrada završnog rada", |
|
"phases": [ |
|
{ |
|
"name": "Prijava teme", |
|
"alias": ["Prvi korak"], |
|
"description": "Prvi korak u procesu izrade završnog rada je prijava teme. Zahtjeva da student odabere mentora te prijavi temu sa popisa...", |
|
"duration": "5 dana", |
|
}, |
|
{ |
|
"name": "Ispuna obrasca", |
|
"description": "Student ispunjava obrazac sa prijavljenom temom...", |
|
"duration": "4 dana", |
|
}, |
|
{ |
|
"name": "Obrana rada", |
|
"description": "Student brani svoj rad pred komosijom...", |
|
"duration": "1 sat", |
|
}, |
|
], |
|
"duration": "3 mjeseca", |
|
}, |
|
] |
|
|
|
|
|
for process in json: |
|
for task in process["phases"]: |
|
if "alias" not in task: |
|
task["alias"] = [] |
|
|
|
"""## User intent recognition model |
|
|
|
CPU ~6m |
|
|
|
GPU ~3m |
|
""" |
|
|
|
|
|
training_epochs = 10 |
|
label_size = 6 |
|
|
|
|
|
UIDatasetURL = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vSPR-FPTMBcYRynP4JdwYQQ8dAhSx1x8i1LPckUcuIUUlrWT82b5Thqb1bBNnPeGJPxxX1CJAlFSd6F/pub?output=xlsx' |
|
|
|
|
|
|
|
|
|
|
|
|
|
"""### Data loading""" |
|
|
|
import tensorflow as tf |
|
import tensorflow_text as tft |
|
import tensorflow_hub as tfh |
|
import pandas as pd |
|
import numpy as np |
|
import seaborn as sns |
|
import matplotlib.pyplot as plt |
|
|
|
|
|
preprocessor = tfh.KerasLayer('https://tfhub.dev/google/universal-sentence-encoder-cmlm/multilingual-preprocess/2') |
|
|
|
|
|
model = tfh.KerasLayer('https://tfhub.dev/google/LaBSE/2') |
|
|
|
|
|
import pandas as pd |
|
data = pd.read_excel(UIDatasetURL) |
|
|
|
columns = ['text', 'intent', 'process'] |
|
data.columns = columns |
|
|
|
data = data[data["process"] == trainedProcess].drop(columns="process") |
|
|
|
"""#### Category merging""" |
|
|
|
|
|
data['intent'] = data['intent'].astype('category') |
|
data['intent_codes'] = data['intent'].cat.codes |
|
|
|
|
|
values = data['intent'].value_counts() |
|
plt.stem(values) |
|
|
|
"""#### Normalize data |
|
|
|
### Text preprocessing |
|
|
|
1. Remove punctuation |
|
2. Lowercase the text |
|
3. Apply tokenization |
|
4. Remove stopwords |
|
5. Apply lemmatizer |
|
""" |
|
|
|
import string |
|
import re |
|
import nltk |
|
import text_hr |
|
|
|
nltk.download('stopwords') |
|
nltk.download('wordnet') |
|
nltk.download('omw-1.4') |
|
from nltk.stem.porter import PorterStemmer |
|
from nltk.stem import WordNetLemmatizer |
|
|
|
def remove_punctuation(text): |
|
return "".join([i for i in text if i not in string.punctuation]) |
|
|
|
def tokenization(text): |
|
return re.split(r"\s+",text) |
|
|
|
stopwords = nltk.corpus.stopwords.words('english') |
|
def remove_stopwords(text): |
|
return [i for i in text if i not in stopwords] |
|
|
|
porter_stemmer = PorterStemmer() |
|
def stemming(text): |
|
return [porter_stemmer.stem(word) for word in text] |
|
|
|
wordnet_lemmatizer = WordNetLemmatizer() |
|
def lemmatizer(text): |
|
return [wordnet_lemmatizer.lemmatize(word) for word in text] |
|
|
|
data['text'] = data['text']\ |
|
.apply(lambda x: remove_punctuation(x))\ |
|
.apply(lambda x: x.lower())\ |
|
.apply(lambda x: tokenization(x))\ |
|
.apply(lambda x: lemmatizer(x)) |
|
|
|
stop_words_list_hr = [] |
|
for word_base, l_key, cnt, _suff_id, wform_key, wform in text_hr.get_all_std_words(): |
|
if word_base is not None: stop_words_list_hr.append(word_base) |
|
if wform is not None: stop_words_list_hr.append(wform) |
|
|
|
stop_words_list_hr = list(dict.fromkeys(stop_words_list_hr)) |
|
|
|
def remove_stopwords_hr(text): |
|
output = [i for i in text if i not in stop_words_list_hr] |
|
return output |
|
|
|
data['text'] = data['text'].apply(lambda x: remove_stopwords_hr(x)) |
|
|
|
data['text'] = data['text'].str.join(" ") |
|
|
|
"""### Split validation and training data |
|
|
|
Train 75%, validation 25% |
|
""" |
|
|
|
codes = data['intent_codes'].unique() |
|
|
|
|
|
CODES_REPR = data[["intent_codes", "intent"]].drop_duplicates().sort_values("intent_codes") |
|
|
|
|
|
def codeToIntent(prediction) -> str: |
|
""" Returns the intent of the prediction, not the code """ |
|
return CODES_REPR[CODES_REPR["intent_codes"] == prediction.argmax()].iloc[0]["intent"] |
|
|
|
preprocessed_validation_data = pd.DataFrame(columns=data.columns) |
|
preprocessed_train_data = pd.DataFrame(columns=data.columns) |
|
|
|
for c in codes: |
|
sample = data[data['intent_codes'] == c] |
|
sample = sample.sample(frac=1) |
|
|
|
val = sample.sample(frac=0) |
|
train = pd.concat([sample, val]).drop_duplicates(keep=False) |
|
preprocessed_validation_data = preprocessed_validation_data.append(val, ignore_index=True) |
|
preprocessed_train_data = preprocessed_train_data.append(train, ignore_index=True) |
|
|
|
|
|
train_data_eng = preprocessed_train_data[['text', 'intent_codes']] |
|
train_data_eng.columns = ['text', 'intent_codes'] |
|
|
|
validation_data_eng = preprocessed_validation_data[['text', 'intent_codes']] |
|
validation_data_eng.columns = ['text', 'intent_codes'] |
|
|
|
def df_to_dataset(df, shuffle=True, batch_size=16): |
|
df = df.copy() |
|
labels = df.pop('intent_codes') |
|
lables_cat = tf.keras.utils.to_categorical(labels, label_size) |
|
dataset = tf.data.Dataset.from_tensor_slices((dict(df), lables_cat)) |
|
if shuffle: |
|
dataset = dataset.shuffle(buffer_size=len(df)) |
|
dataset = dataset.batch(batch_size).prefetch(batch_size) |
|
return dataset |
|
|
|
_validation = train_data_eng |
|
train_data_eng = df_to_dataset(train_data_eng) |
|
|
|
|
|
validation_data_eng = df_to_dataset(_validation) |
|
|
|
"""### Model definition and training |
|
|
|
2 epochs training (testing purposes) |
|
""" |
|
|
|
|
|
def model_build(): |
|
inputs = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text') |
|
encoded_input = preprocessor(inputs) |
|
encoder_outputs = model(encoded_input) |
|
|
|
x = encoder_outputs['pooled_output'] |
|
x = tf.keras.layers.Dropout(0.1)(x) |
|
x = tf.keras.layers.Dense(128, activation='relu')(x) |
|
x = tf.keras.layers.Dropout(0.7)(x) |
|
outputs = tf.keras.layers.Dense(label_size, activation='softmax', name='classifier')(x) |
|
|
|
return tf.keras.Model(inputs, outputs) |
|
|
|
|
|
model_eng = model_build() |
|
model_eng.compile( |
|
optimizer = tf.keras.optimizers.Adam(0.001), |
|
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True), |
|
metrics = tf.keras.metrics.CategoricalAccuracy() |
|
) |
|
|
|
eng_history = model_eng.fit( |
|
train_data_eng, |
|
epochs = training_epochs, |
|
batch_size = 16, |
|
validation_data = validation_data_eng, |
|
) |
|
|
|
"""## Data extraction pipeline""" |
|
|
|
|
|
|
|
from transformers import pipeline |
|
|
|
pipe = pipeline("token-classification", model="rkrstacic/bpmn-task-extractor") |
|
|
|
"""## Sentence similarity""" |
|
|
|
|
|
|
|
import numpy as np |
|
from typing import List, Dict |
|
|
|
|
|
def predictNER(text: str) -> Dict: |
|
currentString = "".join([x["word"] for x in pipe(text) if x["entity"] != "LABEL_0"]) |
|
|
|
|
|
return { "Task": currentString.replace("▁", " ")[1:] } |
|
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
from typing import List |
|
import torch |
|
|
|
def getTaskSimilarityIndex(flatIndex: int, tasks) -> int: |
|
""" Get task index based on the flatten task list """ |
|
for index, task in enumerate(tasks): |
|
if flatIndex <= len(task["alias"]): |
|
return index |
|
|
|
flatIndex -= len(task["alias"]) + 1 |
|
|
|
return -1 |
|
|
|
def getFlattenTasks(tasks) -> List[str]: |
|
""" Returns the flatten version of task names and their aliases """ |
|
resTasks = [] |
|
|
|
for task in tasks: |
|
resTasks.append(task["name"]) |
|
resTasks = resTasks + task["alias"] |
|
|
|
return resTasks |
|
|
|
def taskSimilarity(text: str, tasks) -> int: |
|
""" Returns the task index which is the most similar to the text """ |
|
return getTaskSimilarityIndex(torch.argmax(util.pytorch_cos_sim( |
|
model.encode(text, convert_to_tensor=True), |
|
model.encode(getFlattenTasks(tasks), convert_to_tensor=True) |
|
)).item(), tasks) |
|
|
|
"""## Using the user intent model""" |
|
|
|
def preprocessText(text: str) -> str: |
|
""" Do the same preprocessing as the UI model training input data """ |
|
text = remove_punctuation(text) |
|
text = text.lower() |
|
text = tokenization(text) |
|
text = lemmatizer(text) |
|
text = remove_stopwords_hr(text) |
|
|
|
return " ".join(text) |
|
|
|
def predict_intent(text: str) -> str: |
|
""" Predict the text intent based on the abovetrained model """ |
|
return codeToIntent(model_eng.predict([preprocessText(text)], verbose=False)) |
|
|
|
def getPhases(phases) -> str: |
|
""" P1: Returns the formatted phases """ |
|
phases = [phase["name"].lower() for phase in phases] |
|
return ', '.join(phases[:-1]) + ' i ' + phases[-1] |
|
|
|
|
|
|
|
def getP1String(process) -> str: |
|
return f"Faze procesa za proces '{process['name']}' su: {getPhases(process['phases'])}" |
|
|
|
def getP2String(process) -> str: |
|
return f"Proces '{process['name']}' traje {process['duration']}" |
|
|
|
def getP3String(taskName: str, task) -> str: |
|
return f"Kratki opis '{taskName}': {task['description']}" |
|
|
|
def getP4String(taskName: str, task) -> str: |
|
return f"Proces '{taskName}' traje {task['duration']}" |
|
|
|
def getP5String(taskIndex: int, taskName: str, process) -> str: |
|
if len(process["phases"]) <= taskIndex + 1: |
|
return f"'{taskName}' je zadnji korak u procesu '{process['name']}'" |
|
|
|
return f"Nakon '{taskName}' je '{process['phases'][taskIndex + 1]['name'].lower()}'" |
|
|
|
def getP6String() -> str: |
|
return "Nažalost, ne razumijem Vaše pitanje" |
|
|
|
def print_result(text: str, process) -> None: |
|
""" Chatbot output messages based on intent """ |
|
intent = predict_intent(text) |
|
taskIndex = taskSimilarity(text, process["phases"]) |
|
task = process["phases"][taskIndex] |
|
taskName = task["name"].lower() |
|
|
|
|
|
if intent == 'P1': |
|
return(getP1String(process)) |
|
|
|
|
|
elif intent == 'P2': |
|
return(getP2String(process)) |
|
|
|
|
|
elif intent == 'P3': |
|
return(getP3String(taskName, task)) |
|
|
|
|
|
elif intent == 'P4': |
|
return(getP4String(taskName, task)) |
|
|
|
|
|
elif intent == 'P5': |
|
return(getP5String(taskIndex, taskName, process)) |
|
|
|
|
|
else: |
|
return(getP6String()) |
|
|
|
def chatbot(input_text) -> None: |
|
""" By: Rafael Krstačić """ |
|
processName = trainedProcessJSON |
|
currentProcess = None |
|
|
|
for process in json: |
|
if process["name"] == processName: |
|
currentProcess = process |
|
break |
|
else: |
|
raise KeyError("Process does not exist in json") |
|
|
|
return print_result(input_text, currentProcess) |
|
|
|
"""## Gradio app""" |
|
|
|
chatbot("Koliko traje predaja dnevnika prakse") |
|
|
|
iface = gr.Interface( |
|
fn=chatbot, |
|
inputs="text", |
|
outputs=["text"], |
|
title="Sentiment Analysis" |
|
) |
|
|
|
iface.launch() |