#!/usr/bin/env python3 # -*- coding:utf-8 -*- import requests.exceptions import zipfile import streamlit as st from streamlit.components.v1 import html from n4a_analytics_lib.analytics import (GlobalStatistics, IaaStatistics) from n4a_analytics_lib.constants import (DESCRIPTION) # Set application st.set_page_config(layout="wide") # sidebar: meta, inputs etc. sidebar = st.sidebar # cols: display results col1, col2 = st.columns(2) # description sidebar.markdown(DESCRIPTION) # to st components #def clear_cache(): # st.session_state = {} def check_login(username, password): if (len(username) == 0) or (len(password) == 0): return False return True def logout(): pass # Level to analyze option = sidebar.selectbox('Which statistics level?', ('Inter-Annotator Agreement results', 'Global project statistics')) # IAA results view if option == "Inter-Annotator Agreement results": annotations = sidebar.file_uploader("Upload IAA annotations (.zip format only): ") baseline_text = sidebar.file_uploader("Upload baseline text (.txt format only): ") if baseline_text is not None and annotations is not None: project_analyzed = IaaStatistics(zip_project=annotations, baseline_text=baseline_text.getvalue()) baseline_analyzer = project_analyzed.analyze_text() col2.markdown(f""" ### BASELINE TEXT: {baseline_text.name} - sentences: {baseline_analyzer[0]} - words: {baseline_analyzer[1]} - characters: {baseline_analyzer[2]} """) #print(project_analyzed.annotations_per_coders) commune_mentions = [l for i,j in project_analyzed.mentions_per_coder.items() for l in j] commune_mentions = list(dict.fromkeys(commune_mentions)) #print(commune_mentions) #print(project_analyzed.annotations) #print(project_analyzed.labels_per_coder) import pandas as pd from collections import defaultdict, Counter from itertools import combinations import seaborn as sn import matplotlib as plt import matplotlib.pyplot as pylt dicts_coders = [] for coder, annotations in project_analyzed.annotations_per_coders.items(): nombre_annotations = [] # print(f'* {coder}') for annotation, label in annotations.items(): nombre_annotations.append(label) # print(f"Nombre total d'annotations : {len(nombre_annotations)}") dict_coder = dict(Counter(nombre_annotations)) dicts_coders.append(dict_coder) # print(f'==========================') labels = [label for label in dicts_coders[0]] from n4a_analytics_lib.metrics_utils import interpret_kappa, fleiss_kappa_function, cohen_kappa_function df = pd.DataFrame(project_analyzed.annotations_per_coders, index=commune_mentions) for ann in project_analyzed.annotators: df[ann] = 'None' for mention, value in project_analyzed.annotations_per_coders[ann].items(): df.loc[mention, ann] = value total_annotations = len(df) # print(f'* Total des annotations : {total_annotations}') df_n = df.apply(pd.Series.value_counts, 1).fillna(0).astype(int) matrix = df_n.values pairs = list(combinations(project_analyzed.annotations_per_coders, 2)) # Display in app #cont_kappa = st.container() st.title("Inter-Annotator Agreement (IAA) results") #tab1, tab2, tab3, tab4, tab5 = st.tabs( # ["📈 IAA metrics", "🗃 IAA Metrics Legend", "✔️ Agree annotations", "❌ Disagree annotations", # "🏷️ Global Labels Statistics"]) st.markdown("## 📈 IAA metrics") col1_kappa, col2_kappa = st.columns(2) col1_kappa.subheader("Fleiss Kappa (global score for group):") col1_kappa.markdown(interpret_kappa(round(fleiss_kappa_function(matrix), 2)), unsafe_allow_html=True) col1_kappa.subheader("Cohen Kappa Annotators Matrix (score between annotators):") # tab1.dataframe(df) data = [] for coder_1, coder_2 in pairs: cohen_function = cohen_kappa_function(project_analyzed.labels_per_coder[coder_1], project_analyzed.labels_per_coder[coder_2]) data.append(((coder_1, coder_2), cohen_function)) col1_kappa.markdown(f"* {coder_1} <> {coder_2} : {interpret_kappa(cohen_function)}", unsafe_allow_html=True) # print(f"* {coder_1} <> {coder_2} : {cohen_function}") intermediary = defaultdict(Counter) for (src, tgt), count in data: intermediary[src][tgt] = count letters = sorted({key for inner in intermediary.values() for key in inner} | set(intermediary.keys())) confusion_matrix = [[intermediary[src][tgt] for tgt in letters] for src in letters] import numpy as np df_cm = pd.DataFrame(confusion_matrix, letters, letters) mask = df_cm.values == 0 sn.set(font_scale=0.7) # for label size colors = ["#e74c3c", "#f39c12", "#f4d03f", "#5dade2", "#58d68d", "#28b463"] width = st.slider("matrix width", 1, 10, 14) height = st.slider("matrix height", 1, 10, 4) fig, ax = pylt.subplots(figsize=(width, height)) sn.heatmap(df_cm, cmap=colors, annot=True, mask=mask, annot_kws={"size": 7}, vmin=0, vmax=1, ax=ax) # font size # plt.show() st.pyplot(ax.figure) col2_kappa.markdown("""

🗃 IAA Metrics Legend

Kappa interpretation legend
Kappa score(k) Agreement
k < 0 Less chance agreement
0.01 < k < 0.20 Slight agreement
0.21 < k < 0.40 Fair agreement
0.41 < k < 0.60 Moderate agreement
0.61 < k < 0.80 Substantial agreement
0.81 < k < 0.99 Almost perfect agreement
""" , unsafe_allow_html = True) ## commune @st.cache def convert_df(df_ex): return df_ex.to_csv(encoding="utf-8").encode('utf-8') ## Agree part columns_to_compare = project_analyzed.annotators def check_all_equal(iterator): return len(set(iterator)) <= 1 df_agree = df[df[columns_to_compare].apply(lambda row: check_all_equal(row), axis=1)] total_unanime = len(df_agree) csv_agree = convert_df(df_agree) st.subheader("✔️ Agree annotations") st.markdown(f"{total_unanime} / {len(df)} annotations ({round((total_unanime / len(df)) * 100, 2)} %)") st.download_button( "Press to Download CSV", csv_agree, "csv_annotators_agree.csv", "text/csv", key='download-csv-1' ) st.dataframe(df_agree) ## Disagree part def check_all_not_equal(iterator): return len(set(iterator)) > 1 df_disagree = df[df[columns_to_compare].apply(lambda row: check_all_not_equal(row), axis=1)] total_desaccord = len(df_disagree) csv_disagree = convert_df(df_disagree) st.subheader("❌ Disagree annotations") st.markdown( f"{total_desaccord} / {len(df)} annotations ({round((total_desaccord / len(df)) * 100, 2)} %)") st.download_button( "Press to Download CSV", csv_disagree, "csv_annotators_disagree.csv", "text/csv", key='download-csv-2' ) st.dataframe(df_disagree) ## alignement chart labels def count_total_annotations_label(dataframe, labels): pairs = [] for label in labels: total = dataframe.astype(object).eq(label).any(1).sum() pairs.append((label, total)) return pairs totals_annotations_per_labels = count_total_annotations_label(df, labels) # Récupérer le nombre de mention portant la même classe selon les annotateurs def total_agree_disagree_per_label(dataframe, pairs_totals_labels): new_pairs = [] for t in pairs_totals_labels: # t[0] : label # t[1] : total_rows_with_label agree_res = df[df.nunique(1).eq(1)].eq(t[0]).any(1).sum() disagree_res = t[1] - agree_res agree_percent = (agree_res / t[1]) * 100 disagree_percent = (disagree_res / t[1]) * 100 new_pairs.append((t[0], t[1], agree_percent, disagree_percent)) return new_pairs to_pie = total_agree_disagree_per_label(df, totals_annotations_per_labels) def plot_pies(tasks_to_pie): my_labels = 'agree', 'disagree' my_colors = ['#47DBCD', '#F5B14C'] my_explode = (0, 0.1) counter = 0 fig, axes = pylt.subplots(1, len(tasks_to_pie), figsize=(20, 3)) for t in tasks_to_pie: tasks = [t[2], t[3]] axes[counter].pie(tasks, autopct='%1.1f%%', startangle=15, shadow=True, colors=my_colors, explode=my_explode) axes[counter].set_title(t[0]) axes[counter].axis('equal') counter += 1 fig.set_facecolor("white") fig.legend(labels=my_labels, loc="center right", borderaxespad=0.1, title="Labels alignement") # plt.savefig(f'./out/pie_alignement_labels_{filename_no_extension}.png', dpi=400) return fig f = plot_pies(to_pie) st.subheader("🏷️ Global Labels Statistics") st.pyplot(f.figure) # global project results view # st_session = {"gs_local":True, "gs_remote":False, "gs_obj":} def display_data(): col1.metric("Total curated annotations", f"{st.session_state['gs_obj'].total_annotations_project} Named entities") col1.dataframe(st.session_state['gs_obj'].df_i) selected_data = col1.selectbox('Select specific data to display bar plot:', st.session_state['gs_obj'].documents, key="selector_data") col2.pyplot(st.session_state['gs_obj'].create_plot(selected_data)) def init_session_statistics(remote: bool, local: bool, data: tuple) -> None: # clear session st.session_state = {} # create a session variable st.session_state["gs_local"] = local st.session_state["gs_remote"] = remote # create a new object: # if remote fetch data from API Host first if remote and not(local): st.success('Fetch curated documents from host INCEpTION API in progress...') fetch_curated_data_from_remote( username=data[0], password=data[1] ) if local and not(remote): st.session_state["gs_obj"] = GlobalStatistics(zip_project=data, remote=False) from pycaprio import Pycaprio, mappings from zipfile import ZipFile import io import requests def fetch_curated_data_from_remote(username: str, password: str, endpoint: str = "https://inception.dhlab.epfl.ch/prod", project_title: str = "ner4archives-template"): # open a client try: client = Pycaprio(inception_host=endpoint, authentication=(str(username), str(password))) except requests.exceptions.JSONDecodeError: # username / password incorrect st.error('Username or Password is incorrect please retry.') # get project object project_name = [p for p in client.api.projects() if p.project_name == project_title] # get all documents from project documents = client.api.documents(project_name[0].project_id) curations = [] zipfiles = [] count = 0 flag = "a" # iterate over all documents and retrieve only curated into ZIP container for document in documents: if count > 0: flag = "r" if document.document_state == mappings.DocumentState.CURATION_COMPLETE: curated_content = client.api.curation(project_name[0].project_id, document, curation_format=mappings.InceptionFormat.UIMA_CAS_XMI_XML_1_1) curations.append(curated_content) for curation in curations: z = ZipFile(io.BytesIO(curation), mode=flag) zipfiles.append(z) count += 1 # Merge all zip in one with zipfiles[0] as z1: for fname in zipfiles[1:]: zf = fname # print(zf.namelist()) for n in zf.namelist(): if n not in z1.namelist(): z1.writestr(n, zf.open(n).read()) # Create a new object st.session_state["gs_obj"] = GlobalStatistics(zip_project=z1, remote=True) if option == "Global project statistics": # User input controllers mode = sidebar.radio("Choose mode to retrieve curated data: ", ( "Local directory", "INCEpTION API Host remote" )) data = None if mode == "Local directory": project = sidebar.file_uploader("Folder that contains curated annotations in XMI 1.1 (.zip format only): ", type="zip") data = project if mode == "INCEpTION API Host remote": username = sidebar.text_input("Username: ") password = sidebar.text_input("Password: ", type='password') data = (username, password) # Validate inputs btn_process = sidebar.button('Process', key='process') # Access data with local ressources if btn_process and mode == "Local directory": if data is not None: # create a new session init_session_statistics(remote=False, local=True, data=data) # Access data with remote ressources if btn_process and mode == "INCEpTION API Host remote": if data is not None: if check_login(username=data[0], password=data[1]): # create a new session init_session_statistics(remote=True, local=False, data=data) else: st.error("Sorry! Username or Password is empty.") # Change data values and visualize new plot if "gs_obj" in st.session_state: if st.session_state["gs_local"] or st.session_state["gs_remote"]: display_data()