# Copyright 2021 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import statistics import torch from os import mkdir from os.path import exists, isdir from os.path import join as pjoin import nltk import numpy as np import pandas as pd import plotly import plotly.express as px import plotly.figure_factory as ff import plotly.graph_objects as go import pyarrow.feather as feather from datasets import load_from_disk from nltk.corpus import stopwords from sklearn.feature_extraction.text import CountVectorizer from .dataset_utils import ( CNT, DEDUP_TOT, EMBEDDING_FIELD, LENGTH_FIELD, OUR_LABEL_FIELD, OUR_TEXT_FIELD, PROP, TEXT_NAN_CNT, TOKENIZED_FIELD, TXT_LEN, VOCAB, WORD, extract_field, load_truncated_dataset, ) from .embeddings import Embeddings from .npmi import nPMI from .zipf import Zipf pd.options.display.float_format = "{:,.3f}".format logs = logging.getLogger(__name__) logs.setLevel(logging.WARNING) logs.propagate = False if not logs.handlers: # Logging info to log file file = logging.FileHandler("./log_files/dataset_statistics.log") fileformat = logging.Formatter("%(asctime)s:%(message)s") file.setLevel(logging.INFO) file.setFormatter(fileformat) # Logging debug messages to stream stream = logging.StreamHandler() streamformat = logging.Formatter("[data_measurements_tool] %(message)s") stream.setLevel(logging.WARNING) stream.setFormatter(streamformat) logs.addHandler(file) logs.addHandler(stream) # TODO: Read this in depending on chosen language / expand beyond english nltk.download("stopwords") _CLOSED_CLASS = ( stopwords.words("english") + [ "t", "n", "ll", "d", "wasn", "weren", "won", "aren", "wouldn", "shouldn", "didn", "don", "hasn", "ain", "couldn", "doesn", "hadn", "haven", "isn", "mightn", "mustn", "needn", "shan", "would", "could", "dont", "u", ] + [str(i) for i in range(0, 21)] ) _IDENTITY_TERMS = [ "man", "woman", "non-binary", "gay", "lesbian", "queer", "trans", "straight", "cis", "she", "her", "hers", "he", "him", "his", "they", "them", "their", "theirs", "himself", "herself", ] # treating inf values as NaN as well pd.set_option("use_inf_as_na", True) _MIN_VOCAB_COUNT = 10 _TREE_DEPTH = 12 _TREE_MIN_NODES = 250 # as long as we're using sklearn - already pushing the resources _MAX_CLUSTER_EXAMPLES = 5000 _NUM_VOCAB_BATCHES = 2000 _CVEC = CountVectorizer(token_pattern="(?u)\\b\\w+\\b", lowercase=True) num_rows = 200000 class DatasetStatisticsCacheClass: def __init__( self, cache_dir, dset_name, dset_config, split_name, text_field, label_field, label_names, calculation=None, ): # This is only used for standalone runs for each kind of measurement. self.calculation = calculation self.our_text_field = OUR_TEXT_FIELD self.our_length_field = LENGTH_FIELD self.our_label_field = OUR_LABEL_FIELD self.our_tokenized_field = TOKENIZED_FIELD self.our_embedding_field = EMBEDDING_FIELD self.cache_dir = cache_dir ### What are we analyzing? # name of the Hugging Face dataset self.dset_name = dset_name # name of the dataset config self.dset_config = dset_config # name of the split to analyze self.split_name = split_name # which text fields are we analysing? self.text_field = text_field # which label fields are we analysing? self.label_field = label_field # what are the names of the classes? self.label_names = label_names ## Hugging Face dataset objects self.dset = None # original dataset # HF dataset with all of the self.text_field instances in self.dset self.text_dset = None # HF dataset with text embeddings in the same order as self.text_dset self.embeddings_dset = None # HF dataset with all of the self.label_field instances in self.dset self.label_dset = None ## Data frames # Tokenized text self.tokenized_df = [] # save sentence length histogram in the class so it doesn't ge re-computed self.fig_tok_length = None # Data Frame version of self.label_dset self.label_df = None # save label pie chart in the class so it doesn't ge re-computed self.fig_labels = None # Vocabulary with word counts in the dataset self.vocab_counts_df = None # Vocabulary filtered to remove stopwords self.vocab_counts_filtered_df = None ## General statistics and duplicates # Number of NaN values (NOT empty strings) self.text_nan_count = 0 # Number of text items that appear more than once in the dataset self.dedup_total = 0 # Duplicated text items along with their number of occurences ("count") self.text_dup_counts_df = None self.avg_length = None self.std_length = None self.general_stats_dict = None # clustering text by embeddings # the hierarchical clustering tree is represented as a list of nodes, # the first is the root self.node_list = [] # save tree figure in the class so it doesn't ge re-computed self.fig_tree = None # keep Embeddings object around to explore clusters self.embeddings = None # nPMI # Holds a nPMIStatisticsCacheClass object self.npmi_stats = None # TODO: Users ideally can type in whatever words they want. self.termlist = _IDENTITY_TERMS # termlist terms that are available more than _MIN_VOCAB_COUNT times self.available_terms = _IDENTITY_TERMS # TODO: Have lowercase be an option for a user to set. self.to_lowercase = True # The minimum amount of times a word should occur to be included in # word-count-based calculations (currently just relevant to nPMI) self.min_vocab_count = _MIN_VOCAB_COUNT # zipf self.z = None self.zipf_fig = None self.cvec = _CVEC # File definitions # path to the directory used for caching if not isinstance(text_field, str): text_field = "-".join(text_field) if isinstance(label_field, str): label_field = label_field else: label_field = "-".join(label_field) self.cache_path = pjoin( self.cache_dir, f"{dset_name}_{dset_config}_{split_name}_{text_field}_{label_field}", ) if not isdir(self.cache_path): logs.warning("Creating cache directory %s." % self.cache_path) mkdir(self.cache_path) self.dset_fid = pjoin(self.cache_path, "base_dset") self.text_dset_fid = pjoin(self.cache_path, "text_dset") self.tokenized_df_fid = pjoin(self.cache_path, "tokenized_df.feather") self.label_dset_fid = pjoin(self.cache_path, "label_dset") self.vocab_counts_df_fid = pjoin(self.cache_path, "vocab_counts.feather") self.general_stats_fid = pjoin(self.cache_path, "general_stats.json") self.text_duplicate_counts_df_fid = pjoin( self.cache_path, "text_dup_counts_df.feather" ) self.fig_tok_length_fid = pjoin(self.cache_path, "fig_tok_length.json") self.fig_labels_fid = pjoin(self.cache_path, "fig_labels.json") self.node_list_fid = pjoin(self.cache_path, "node_list.th") self.fig_tree_fid = pjoin(self.cache_path, "fig_tree.json") self.zipf_fid = pjoin(self.cache_path, "zipf_basic_stats.json") self.zipf_fig_fid = pjoin(self.cache_path, "zipf_fig.json") def get_base_dataset(self): """Gets a pointer to the truncated base dataset object.""" if not self.dset: self.dset = load_truncated_dataset( self.dset_name, self.dset_config, self.split_name, cache_name=self.dset_fid, use_cache=True, use_streaming=True, ) def get_dataset_peek(self): self.get_base_dataset() return self.dset[:100] def load_or_prepare_general_stats(self, use_cache=False): """Data structures used in calculating general statistics and duplicates""" # TODO: These probably don't need to be feather files, could be csv. # General statistics if ( use_cache and exists(self.general_stats_fid) and exists(self.text_duplicate_counts_df_fid) ): self.load_general_stats( self.general_stats_fid, self.text_duplicate_counts_df_fid ) else: ( self.text_nan_count, self.dedup_total, self.text_dup_counts_df, ) = self.prepare_general_text_stats() self.general_stats_dict = { TEXT_NAN_CNT: self.text_nan_count, DEDUP_TOT: self.dedup_total, } write_df(self.text_dup_counts_df, self.text_duplicate_counts_df_fid) write_json(self.general_stats_dict, self.general_stats_fid) def load_or_prepare_text_lengths(self, use_cache=False, save=True): # TODO: Everything here can be read from cache; it's in a transitory # state atm where just the fig is cached. Clean up. if use_cache and exists(self.fig_tok_length_fid): self.fig_tok_length = read_plotly(self.fig_tok_length_fid) if len(self.tokenized_df) == 0: self.tokenized_df = self.do_tokenization() self.tokenized_df[LENGTH_FIELD] = self.tokenized_df[TOKENIZED_FIELD].apply(len) self.avg_length = round( sum(self.tokenized_df[self.our_length_field]) / len(self.tokenized_df[self.our_length_field]), 1, ) self.std_length = round( statistics.stdev(self.tokenized_df[self.our_length_field]), 1 ) self.fig_tok_length = make_fig_lengths(self.tokenized_df, self.our_length_field) if save: write_plotly(self.fig_tok_length, self.fig_tok_length_fid) def load_or_prepare_embeddings(self, use_cache=False, save=True): if use_cache and exists(self.node_list_fid) and exists(self.fig_tree_fid): self.node_list = torch.load(self.node_list_fid) self.fig_tree = read_plotly(self.fig_tree_fid) elif use_cache and exists(self.node_list_fid): self.node_list = torch.load(self.node_list_fid) self.fig_tree = make_tree_plot(self.node_list, self.text_dset) if save: write_plotly(self.fig_tree, self.fig_tree_fid) else: self.embeddings = Embeddings(self, use_cache=use_cache) self.embeddings.make_hierarchical_clustering() self.node_list = self.embeddings.node_list self.fig_tree = make_tree_plot(self.node_list, self.text_dset) if save: torch.save(self.node_list, self.node_list_fid) write_plotly(self.fig_tree, self.fig_tree_fid) # get vocab with word counts def load_or_prepare_vocab(self, use_cache=True, save=True): """ Calculates the vocabulary count from the tokenized text. The resulting dataframes may be used in nPMI calculations, zipf, etc. :param use_cache: :return: """ if ( use_cache and exists(self.vocab_counts_df_fid) ): logs.info("Reading vocab from cache") self.load_vocab() self.vocab_counts_filtered_df = filter_vocab(self.vocab_counts_df) else: logs.info("Calculating vocab afresh") if len(self.tokenized_df) == 0: self.tokenized_df = self.do_tokenization() if save: logs.info("Writing out.") write_df(self.tokenized_df, self.tokenized_df_fid) word_count_df = count_vocab_frequencies(self.tokenized_df) logs.info("Making dfs with proportion.") self.vocab_counts_df = calc_p_word(word_count_df) self.vocab_counts_filtered_df = filter_vocab(self.vocab_counts_df) if save: logs.info("Writing out.") write_df(self.vocab_counts_df, self.vocab_counts_df_fid) logs.info("unfiltered vocab") logs.info(self.vocab_counts_df) logs.info("filtered vocab") logs.info(self.vocab_counts_filtered_df) def load_or_prepare_npmi_terms(self, use_cache=False): self.npmi_stats = nPMIStatisticsCacheClass(self, use_cache=use_cache) self.npmi_stats.load_or_prepare_npmi_terms() def load_or_prepare_zipf(self, use_cache=False, save=True): # TODO: Current UI only uses the fig, meaning the self.z here is irrelevant # when only reading from cache. Either the UI should use it, or it should # be removed when reading in cache if use_cache and exists(self.zipf_fig_fid) and exists(self.zipf_fid): with open(self.zipf_fid, "r") as f: zipf_dict = json.load(f) self.z = Zipf() self.z.load(zipf_dict) self.zipf_fig = read_plotly(self.zipf_fig_fid) elif use_cache and exists(self.zipf_fid): # TODO: Read zipf data so that the vocab is there. with open(self.zipf_fid, "r") as f: zipf_dict = json.load(f) self.z = Zipf() self.z.load(zipf_dict) self.zipf_fig = make_zipf_fig(self.vocab_counts_df, self.z) if save: write_plotly(self.zipf_fig, self.zipf_fig_fid) else: self.z = Zipf(self.vocab_counts_df) self.zipf_fig = make_zipf_fig(self.vocab_counts_df, self.z) if save: write_zipf_data(self.z, self.zipf_fid) write_plotly(self.zipf_fig, self.zipf_fig_fid) def prepare_general_text_stats(self): text_nan_count = int(self.tokenized_df.isnull().sum().sum()) dup_df = self.tokenized_df[self.tokenized_df.duplicated([self.our_text_field])] dedup_df = pd.DataFrame( dup_df.pivot_table( columns=[self.our_text_field], aggfunc="size" ).sort_values(ascending=False), columns=[CNT], ) dedup_df.index = dedup_df.index.map(str) dedup_df[OUR_TEXT_FIELD] = dedup_df.index dedup_total = sum(dedup_df[CNT]) return text_nan_count, dedup_total, dedup_df def load_general_stats(self, general_stats_fid, text_duplicate_counts_df_fid): general_stats = json.load(open(general_stats_fid, encoding="utf-8")) self.text_nan_count = general_stats[TEXT_NAN_CNT] self.dedup_total = general_stats[DEDUP_TOT] with open(text_duplicate_counts_df_fid, "rb") as f: self.text_dup_counts_df = feather.read_feather(f) def load_or_prepare_dataset(self, use_cache=True, save=True): """ Prepares the HF datasets and data frames containing the untokenized and tokenized text as well as the label values. self.tokenized_df is used further for calculating text lengths, word counts, etc. Args: use_cache: Used stored data if there; otherwise calculate afresh save: Store the calculated data to disk. Returns: """ self.load_or_prepare_text_dset(save, use_cache) self.load_or_prepare_tokenized_df(save, use_cache) def load_or_prepare_tokenized_df(self, save, use_cache): if (use_cache and exists(self.tokenized_df_fid)): self.tokenized_df = feather.read_feather(self.tokenized_df_fid) else: # tokenize all text instances self.tokenized_df = self.do_tokenization() if save: # save tokenized text write_df(self.tokenized_df, self.tokenized_df_fid) def load_or_prepare_text_dset(self, save, use_cache): if (use_cache and exists(self.text_dset_fid)): # load extracted text self.text_dset = load_from_disk(self.text_dset_fid) logs.warning("Loaded dataset from disk") logs.info(self.text_dset) # ...Or load it from the server and store it anew else: self.get_base_dataset() # extract all text instances self.text_dset = self.dset.map( lambda examples: extract_field( examples, self.text_field, OUR_TEXT_FIELD ), batched=True, remove_columns=list(self.dset.features), ) if save: # save extracted text instances logs.warning("Saving dataset to disk") self.text_dset.save_to_disk(self.text_dset_fid) def do_tokenization(self): """ Tokenizes the dataset :return: """ sent_tokenizer = self.cvec.build_tokenizer() def tokenize_batch(examples): # TODO: lowercase should be an option res = { TOKENIZED_FIELD: [ tuple(sent_tokenizer(text.lower())) for text in examples[OUR_TEXT_FIELD] ] } res[LENGTH_FIELD] = [len(tok_text) for tok_text in res[TOKENIZED_FIELD]] return res tokenized_dset = self.text_dset.map( tokenize_batch, batched=True, # remove_columns=[OUR_TEXT_FIELD], keep around to print ) tokenized_df = pd.DataFrame(tokenized_dset) return tokenized_df def set_label_field(self, label_field="label"): """ Setter for label_field. Used in the CLI when a user asks for information about labels, but does not specify the field; 'label' is assumed as a default. """ self.label_field = label_field def load_or_prepare_labels(self, use_cache=False, save=True): # TODO: This is in a transitory state for creating fig cache. # Clean up to be caching and reading everything correctly. """ Extracts labels from the Dataset :param use_cache: :return: """ # extracted labels if len(self.label_field) > 0: if use_cache and exists(self.fig_labels_fid): self.fig_labels = read_plotly(self.fig_labels_fid) elif use_cache and exists(self.label_dset_fid): # load extracted labels self.label_dset = load_from_disk(self.label_dset_fid) self.label_df = self.label_dset.to_pandas() self.fig_labels = make_fig_labels( self.label_df, self.label_names, OUR_LABEL_FIELD ) if save: write_plotly(self.fig_labels, self.fig_labels_fid) else: self.get_base_dataset() self.label_dset = self.dset.map( lambda examples: extract_field( examples, self.label_field, OUR_LABEL_FIELD ), batched=True, remove_columns=list(self.dset.features), ) self.label_df = self.label_dset.to_pandas() self.fig_labels = make_fig_labels( self.label_df, self.label_names, OUR_LABEL_FIELD ) if save: # save extracted label instances self.label_dset.save_to_disk(self.label_dset_fid) write_plotly(self.fig_labels, self.fig_labels_fid) def load_vocab(self): with open(self.vocab_counts_df_fid, "rb") as f: self.vocab_counts_df = feather.read_feather(f) # Handling for changes in how the index is saved. self.vocab_counts_df = self._set_idx_col_names(self.vocab_counts_df) def _set_idx_col_names(self, input_vocab_df): if input_vocab_df.index.name != VOCAB and VOCAB in input_vocab_df.columns: input_vocab_df = input_vocab_df.set_index([VOCAB]) input_vocab_df[VOCAB] = input_vocab_df.index return input_vocab_df class nPMIStatisticsCacheClass: """ "Class to interface between the app and the nPMI class by calling the nPMI class with the user's selections.""" def __init__(self, dataset_stats, use_cache=False): self.dstats = dataset_stats self.pmi_cache_path = pjoin(self.dstats.cache_path, "pmi_files") if not isdir(self.pmi_cache_path): logs.warning("Creating pmi cache directory %s." % self.pmi_cache_path) # We need to preprocess everything. mkdir(self.pmi_cache_path) self.joint_npmi_df_dict = {} self.termlist = self.dstats.termlist logs.info(self.termlist) self.use_cache = use_cache # TODO: Let users specify self.open_class_only = True self.min_vocab_count = self.dstats.min_vocab_count self.subgroup_files = {} self.npmi_terms_fid = pjoin(self.dstats.cache_path, "npmi_terms.json") self.available_terms = self.dstats.available_terms logs.info(self.available_terms) def load_or_prepare_npmi_terms(self, use_cache=False): """ Figures out what identity terms the user can select, based on whether they occur more than self.min_vocab_count times :param use_cache: :return: Identity terms occurring at least self.min_vocab_count times. """ # TODO: Add the user's ability to select subgroups. # TODO: Make min_vocab_count here value selectable by the user. if ( use_cache and exists(self.npmi_terms_fid) and json.load(open(self.npmi_terms_fid))["available terms"] != [] ): available_terms = json.load(open(self.npmi_terms_fid))["available terms"] else: true_false = [ term in self.dstats.vocab_counts_df.index for term in self.termlist ] word_list_tmp = [x for x, y in zip(self.termlist, true_false) if y] true_false_counts = [ self.dstats.vocab_counts_df.loc[word, CNT] >= self.min_vocab_count for word in word_list_tmp ] available_terms = [ word for word, y in zip(word_list_tmp, true_false_counts) if y ] logs.info(available_terms) with open(self.npmi_terms_fid, "w+") as f: json.dump({"available terms": available_terms}, f) self.available_terms = available_terms return available_terms def load_or_prepare_joint_npmi(self, subgroup_pair, use_cache=True): """ Run on-the fly, while the app is already open, as it depends on the subgroup terms that the user chooses :param subgroup_pair: :return: """ # Canonical ordering for subgroup_list subgroup_pair = sorted(subgroup_pair) subgroups_str = "-".join(subgroup_pair) if not isdir(self.pmi_cache_path): logs.warning("Creating cache") # We need to preprocess everything. # This should eventually all go into a prepare_dataset CLI mkdir(self.pmi_cache_path) joint_npmi_fid = pjoin(self.pmi_cache_path, subgroups_str + "_npmi.csv") subgroup_files = define_subgroup_files(subgroup_pair, self.pmi_cache_path) # Defines the filenames for the cache files from the selected subgroups. # Get as much precomputed data as we can. if use_cache and exists(joint_npmi_fid): # When everything is already computed for the selected subgroups. logs.info("Loading cached joint npmi") joint_npmi_df = self.load_joint_npmi_df(joint_npmi_fid) # When maybe some things have been computed for the selected subgroups. else: logs.info("Preparing new joint npmi") joint_npmi_df, subgroup_dict = self.prepare_joint_npmi_df( subgroup_pair, subgroup_files ) # Cache new results logs.info("Writing out.") for subgroup in subgroup_pair: write_subgroup_npmi_data(subgroup, subgroup_dict, subgroup_files) with open(joint_npmi_fid, "w+") as f: joint_npmi_df.to_csv(f) logs.info("The joint npmi df is") logs.info(joint_npmi_df) return joint_npmi_df def load_joint_npmi_df(self, joint_npmi_fid): """ Reads in a saved dataframe with all of the paired results. :param joint_npmi_fid: :return: paired results """ with open(joint_npmi_fid, "rb") as f: joint_npmi_df = pd.read_csv(f) joint_npmi_df = self._set_idx_cols_from_cache(joint_npmi_df) return joint_npmi_df.dropna() def prepare_joint_npmi_df(self, subgroup_pair, subgroup_files): """ Computs the npmi bias based on the given subgroups. Handles cases where some of the selected subgroups have cached nPMI computations, but other's don't, computing everything afresh if there are not cached files. :param subgroup_pair: :return: Dataframe with nPMI for the words, nPMI bias between the words. """ subgroup_dict = {} # When npmi is computed for some (but not all) of subgroup_list for subgroup in subgroup_pair: logs.info("Load or failing...") # When subgroup npmi has been computed in a prior session. cached_results = self.load_or_fail_cached_npmi_scores( subgroup, subgroup_files[subgroup] ) # If the function did not return False and we did find it, use. if cached_results: # FYI: subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df = cached_results # Holds the previous sessions' data for use in this session. subgroup_dict[subgroup] = cached_results logs.info("Calculating for subgroup list") joint_npmi_df, subgroup_dict = self.do_npmi(subgroup_pair, subgroup_dict) return joint_npmi_df.dropna(), subgroup_dict # TODO: Update pairwise assumption def do_npmi(self, subgroup_pair, subgroup_dict): """ Calculates nPMI for given identity terms and the nPMI bias between. :param subgroup_pair: List of identity terms to calculate the bias for :return: Subset of data for the UI :return: Selected identity term's co-occurrence counts with other words, pmi per word, and nPMI per word. """ logs.info("Initializing npmi class") npmi_obj = self.set_npmi_obj() # Canonical ordering used subgroup_pair = tuple(sorted(subgroup_pair)) # Calculating nPMI statistics for subgroup in subgroup_pair: # If the subgroup data is already computed, grab it. # TODO: Should we set idx and column names similarly to how we set them for cached files? if subgroup not in subgroup_dict: logs.info("Calculating statistics for %s" % subgroup) vocab_cooc_df, pmi_df, npmi_df = npmi_obj.calc_metrics(subgroup) # Store the nPMI information for the current subgroups subgroup_dict[subgroup] = (vocab_cooc_df, pmi_df, npmi_df) # Pair the subgroups together, indexed by all words that # co-occur between them. logs.info("Computing pairwise npmi bias") paired_results = npmi_obj.calc_paired_metrics(subgroup_pair, subgroup_dict) UI_results = make_npmi_fig(paired_results, subgroup_pair) return UI_results, subgroup_dict def set_npmi_obj(self): """ Initializes the nPMI class with the given words and tokenized sentences. :return: """ npmi_obj = nPMI(self.dstats.vocab_counts_df, self.dstats.tokenized_df) return npmi_obj def load_or_fail_cached_npmi_scores(self, subgroup, subgroup_fids): """ Reads cached scores from the specified subgroup files :param subgroup: string of the selected identity term :return: """ # TODO: Ordering of npmi, pmi, vocab triple should be consistent subgroup_npmi_fid, subgroup_pmi_fid, subgroup_cooc_fid = subgroup_fids if ( exists(subgroup_npmi_fid) and exists(subgroup_pmi_fid) and exists(subgroup_cooc_fid) ): logs.info("Reading in pmi data....") with open(subgroup_cooc_fid, "rb") as f: subgroup_cooc_df = pd.read_csv(f) logs.info("pmi") with open(subgroup_pmi_fid, "rb") as f: subgroup_pmi_df = pd.read_csv(f) logs.info("npmi") with open(subgroup_npmi_fid, "rb") as f: subgroup_npmi_df = pd.read_csv(f) subgroup_cooc_df = self._set_idx_cols_from_cache( subgroup_cooc_df, subgroup, "count" ) subgroup_pmi_df = self._set_idx_cols_from_cache( subgroup_pmi_df, subgroup, "pmi" ) subgroup_npmi_df = self._set_idx_cols_from_cache( subgroup_npmi_df, subgroup, "npmi" ) return subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df return False def _set_idx_cols_from_cache(self, csv_df, subgroup=None, calc_str=None): """ Helps make sure all of the read-in files can be accessed within code via standardized indices and column names. :param csv_df: :param subgroup: :param calc_str: :return: """ # The csv saves with this column instead of the index, so that's weird. if "Unnamed: 0" in csv_df.columns: csv_df = csv_df.set_index("Unnamed: 0") csv_df.index.name = WORD elif WORD in csv_df.columns: csv_df = csv_df.set_index(WORD) csv_df.index.name = WORD elif VOCAB in csv_df.columns: csv_df = csv_df.set_index(VOCAB) csv_df.index.name = WORD if subgroup and calc_str: csv_df.columns = [subgroup + "-" + calc_str] elif subgroup: csv_df.columns = [subgroup] elif calc_str: csv_df.columns = [calc_str] return csv_df def get_available_terms(self, use_cache=False): return self.load_or_prepare_npmi_terms(use_cache=use_cache) def dummy(doc): return doc def count_vocab_frequencies(tokenized_df): """ Based on an input pandas DataFrame with a 'text' column, this function will count the occurrences of all words. :return: [num_words x num_sentences] DataFrame with the rows corresponding to the different vocabulary words and the column to the presence (0 or 1) of that word. """ cvec = CountVectorizer( tokenizer=dummy, preprocessor=dummy, ) # We do this to calculate per-word statistics # Fast calculation of single word counts logs.info("Fitting dummy tokenization to make matrix using the previous tokenization") cvec.fit(tokenized_df[TOKENIZED_FIELD]) document_matrix = cvec.transform(tokenized_df[TOKENIZED_FIELD]) batches = np.linspace(0, tokenized_df.shape[0], _NUM_VOCAB_BATCHES).astype(int) i = 0 tf = [] while i < len(batches) - 1: logs.info("%s of %s vocab batches" % (str(i), str(len(batches)))) batch_result = np.sum( document_matrix[batches[i] : batches[i + 1]].toarray(), axis=0 ) tf.append(batch_result) i += 1 word_count_df = pd.DataFrame( [np.sum(tf, axis=0)], columns=cvec.get_feature_names() ).transpose() # Now organize everything into the dataframes word_count_df.columns = [CNT] word_count_df.index.name = WORD return word_count_df def calc_p_word(word_count_df): # p(word) word_count_df[PROP] = word_count_df[CNT] / float(sum(word_count_df[CNT])) vocab_counts_df = pd.DataFrame(word_count_df.sort_values(by=CNT, ascending=False)) vocab_counts_df[VOCAB] = vocab_counts_df.index return vocab_counts_df def filter_vocab(vocab_counts_df): # TODO: Add warnings (which words are missing) to log file? filtered_vocab_counts_df = vocab_counts_df.drop(_CLOSED_CLASS, errors="ignore") filtered_count = filtered_vocab_counts_df[CNT] filtered_count_denom = float(sum(filtered_vocab_counts_df[CNT])) filtered_vocab_counts_df[PROP] = filtered_count / filtered_count_denom return filtered_vocab_counts_df ## Figures ## def write_plotly(fig, fid): write_json(plotly.io.to_json(fig), fid) def read_plotly(fid): fig = plotly.io.from_json(json.load(open(fid, encoding="utf-8"))) return fig def make_fig_lengths(tokenized_df, length_field): fig_tok_length = px.histogram( tokenized_df, x=length_field, marginal="rug", hover_data=[length_field] ) return fig_tok_length def make_fig_labels(label_df, label_names, label_field): labels = label_df[label_field].unique() label_sums = [len(label_df[label_df[label_field] == label]) for label in labels] fig_labels = px.pie(label_df, values=label_sums, names=label_names) return fig_labels def make_zipf_fig_ranked_word_list(vocab_df, unique_counts, unique_ranks): ranked_words = {} for count, rank in zip(unique_counts, unique_ranks): vocab_df[vocab_df[CNT] == count]["rank"] = rank ranked_words[rank] = ",".join( vocab_df[vocab_df[CNT] == count].index.astype(str) ) # Use the hovertext kw argument for hover text ranked_words_list = [wrds for rank, wrds in sorted(ranked_words.items())] return ranked_words_list def make_npmi_fig(paired_results, subgroup_pair): subgroup1, subgroup2 = subgroup_pair UI_results = pd.DataFrame() if "npmi-bias" in paired_results: UI_results["npmi-bias"] = paired_results["npmi-bias"].astype(float) UI_results[subgroup1 + "-npmi"] = paired_results["npmi"][ subgroup1 + "-npmi" ].astype(float) UI_results[subgroup1 + "-count"] = paired_results["count"][ subgroup1 + "-count" ].astype(int) if subgroup1 != subgroup2: UI_results[subgroup2 + "-npmi"] = paired_results["npmi"][ subgroup2 + "-npmi" ].astype(float) UI_results[subgroup2 + "-count"] = paired_results["count"][ subgroup2 + "-count" ].astype(int) return UI_results.sort_values(by="npmi-bias", ascending=True) def make_zipf_fig(vocab_counts_df, z): zipf_counts = z.calc_zipf_counts(vocab_counts_df) unique_counts = z.uniq_counts unique_ranks = z.uniq_ranks ranked_words_list = make_zipf_fig_ranked_word_list( vocab_counts_df, unique_counts, unique_ranks ) zmin = z.get_xmin() logs.info("zipf counts is") logs.info(zipf_counts) layout = go.Layout(xaxis=dict(range=[0, 100])) fig = go.Figure( data=[ go.Bar( x=z.uniq_ranks, y=z.uniq_counts, hovertext=ranked_words_list, name="Word Rank Frequency", ) ], layout=layout, ) fig.add_trace( go.Scatter( x=z.uniq_ranks[zmin : len(z.uniq_ranks)], y=zipf_counts[zmin : len(z.uniq_ranks)], hovertext=ranked_words_list[zmin : len(z.uniq_ranks)], line=go.scatter.Line(color="crimson", width=3), name="Zipf Predicted Frequency", ) ) # Customize aspect # fig.update_traces(marker_color='limegreen', # marker_line_width=1.5, opacity=0.6) fig.update_layout(title_text="Word Counts, Observed and Predicted by Zipf") fig.update_layout(xaxis_title="Word Rank") fig.update_layout(yaxis_title="Frequency") fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.10)) return fig def make_tree_plot(node_list, text_dset): nid_map = dict([(node["nid"], nid) for nid, node in enumerate(node_list)]) for nid, node in enumerate(node_list): node["label"] = node.get( "label", f"{nid:2d} - {node['weight']:5d} items
" + "
".join( [ "> " + txt[:64] + ("..." if len(txt) >= 63 else "") for txt in list( set(text_dset.select(node["example_ids"])[OUR_TEXT_FIELD]) )[:5] ] ), ) # make plot nodes # TODO: something more efficient than set to remove duplicates labels = [node["label"] for node in node_list] root = node_list[0] root["X"] = 0 root["Y"] = 0 def rec_make_coordinates(node): total_weight = 0 add_weight = len(node["example_ids"]) - sum( [child["weight"] for child in node["children"]] ) for child in node["children"]: child["X"] = node["X"] + total_weight child["Y"] = node["Y"] - 1 total_weight += child["weight"] + add_weight / len(node["children"]) rec_make_coordinates(child) rec_make_coordinates(root) E = [] # list of edges Xn = [] Yn = [] Xe = [] Ye = [] for nid, node in enumerate(node_list): Xn += [node["X"]] Yn += [node["Y"]] for child in node["children"]: E += [(nid, nid_map[child["nid"]])] Xe += [node["X"], child["X"], None] Ye += [node["Y"], child["Y"], None] # make figure fig = go.Figure() fig.add_trace( go.Scatter( x=Xe, y=Ye, mode="lines", line=dict(color="rgb(210,210,210)", width=1), hoverinfo="none", ) ) fig.add_trace( go.Scatter( x=Xn, y=Yn, mode="markers", name="nodes", marker=dict( symbol="circle-dot", size=18, color="#6175c1", line=dict(color="rgb(50,50,50)", width=1) # '#DB4551', ), text=labels, hoverinfo="text", opacity=0.8, ) ) return fig ## Input/Output ### def define_subgroup_files(subgroup_list, pmi_cache_path): """ Sets the file ids for the input identity terms :param subgroup_list: List of identity terms :return: """ subgroup_files = {} for subgroup in subgroup_list: # TODO: Should the pmi, npmi, and count just be one file? subgroup_npmi_fid = pjoin(pmi_cache_path, subgroup + "_npmi.csv") subgroup_pmi_fid = pjoin(pmi_cache_path, subgroup + "_pmi.csv") subgroup_cooc_fid = pjoin(pmi_cache_path, subgroup + "_vocab_cooc.csv") subgroup_files[subgroup] = ( subgroup_npmi_fid, subgroup_pmi_fid, subgroup_cooc_fid, ) return subgroup_files ## Input/Output ## def intersect_dfs(df_dict): started = 0 new_df = None for key, df in df_dict.items(): if df is None: continue for key2, df2 in df_dict.items(): if df2 is None: continue if key == key2: continue if started: new_df = new_df.join(df2, how="inner", lsuffix="1", rsuffix="2") else: new_df = df.join(df2, how="inner", lsuffix="1", rsuffix="2") started = 1 return new_df.copy() def write_df(df, df_fid): feather.write_feather(df, df_fid) def write_json(json_dict, json_fid): with open(json_fid, "w", encoding="utf-8") as f: json.dump(json_dict, f) def write_subgroup_npmi_data(subgroup, subgroup_dict, subgroup_files): """ Saves the calculated nPMI statistics to their output files. Includes the npmi scores for each identity term, the pmi scores, and the co-occurrence counts of the identity term with all the other words :param subgroup: Identity term :return: """ subgroup_fids = subgroup_files[subgroup] subgroup_npmi_fid, subgroup_pmi_fid, subgroup_cooc_fid = subgroup_fids subgroup_dfs = subgroup_dict[subgroup] subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df = subgroup_dfs with open(subgroup_npmi_fid, "w+") as f: subgroup_npmi_df.to_csv(f) with open(subgroup_pmi_fid, "w+") as f: subgroup_pmi_df.to_csv(f) with open(subgroup_cooc_fid, "w+") as f: subgroup_cooc_df.to_csv(f) def write_zipf_data(z, zipf_fid): zipf_dict = {} zipf_dict["xmin"] = int(z.xmin) zipf_dict["xmax"] = int(z.xmax) zipf_dict["alpha"] = float(z.alpha) zipf_dict["ks_distance"] = float(z.distance) zipf_dict["p-value"] = float(z.ks_test.pvalue) zipf_dict["uniq_counts"] = [int(count) for count in z.uniq_counts] zipf_dict["uniq_ranks"] = [int(rank) for rank in z.uniq_ranks] with open(zipf_fid, "w+", encoding="utf-8") as f: json.dump(zipf_dict, f)