Spaces:
Build error
Build error
# Copyright 2021 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import logging | |
from pathlib import Path | |
import numpy as np | |
import pandas as pd | |
import powerlaw | |
import streamlit as st | |
from scipy.stats import ks_2samp | |
from scipy.stats import zipf as zipf_lib | |
from .dataset_utils import CNT, PROP | |
# treating inf values as NaN as well | |
pd.set_option("use_inf_as_na", True) | |
logs = logging.getLogger(__name__) | |
logs.setLevel(logging.INFO) | |
logs.propagate = False | |
if not logs.handlers: | |
Path("./log_files").mkdir(exist_ok=True) | |
# Logging info to log file | |
file = logging.FileHandler("./log_files/zipf.log") | |
fileformat = logging.Formatter("%(asctime)s:%(message)s") | |
file.setLevel(logging.INFO) | |
file.setFormatter(fileformat) | |
# Logging debug messages to stream | |
stream = logging.StreamHandler() | |
streamformat = logging.Formatter("[data_measurements_tool] %(message)s") | |
stream.setLevel(logging.WARNING) | |
stream.setFormatter(streamformat) | |
logs.addHandler(file) | |
logs.addHandler(stream) | |
class Zipf: | |
def __init__(self, vocab_counts_df=pd.DataFrame()): | |
self.vocab_counts_df = vocab_counts_df | |
self.alpha = None | |
self.xmin = None | |
self.xmax = None | |
self.fit = None | |
self.ranked_words = {} | |
self.uniq_counts = [] | |
self.uniq_ranks = [] | |
self.uniq_fit_counts = None | |
self.term_df = None | |
self.pvalue = None | |
self.ks_test = None | |
self.distance = None | |
self.fit = None | |
self.predicted_zipf_counts = None | |
if not self.vocab_counts_df.empty: | |
logs.info("Fitting based on input vocab counts.") | |
self.calc_fit(vocab_counts_df) | |
logs.info("Getting predicted counts.") | |
self.predicted_zipf_counts = self.calc_zipf_counts(vocab_counts_df) | |
def load(self, zipf_dict): | |
self.set_xmin(zipf_dict["xmin"]) | |
self.set_xmax(zipf_dict["xmax"]) | |
self.set_alpha(zipf_dict["alpha"]) | |
self.set_ks_distance(zipf_dict["ks_distance"]) | |
self.set_p(zipf_dict["p-value"]) | |
self.set_unique_ranks(zipf_dict["uniq_ranks"]) | |
self.set_unique_counts(zipf_dict["uniq_counts"]) | |
def calc_fit(self, vocab_counts_df): | |
""" | |
Uses the powerlaw package to fit the observed frequencies to a zipfian distribution. | |
We use the KS-distance to fit, as that seems more appropriate that MLE. | |
:param vocab_counts_df: | |
:return: | |
""" | |
self.vocab_counts_df = vocab_counts_df | |
# TODO: These proportions may have already been calculated. | |
vocab_counts_df[PROP] = vocab_counts_df[CNT] / float(sum(vocab_counts_df[CNT])) | |
rank_column = vocab_counts_df[CNT].rank( | |
method="dense", numeric_only=True, ascending=False | |
) | |
vocab_counts_df["rank"] = rank_column.astype("int64") | |
observed_counts = vocab_counts_df[CNT].values | |
# Note another method for determining alpha might be defined by | |
# (Newman, 2005): alpha = 1 + n * sum(ln( xi / xmin )) ^ -1 | |
self.fit = powerlaw.Fit(observed_counts, fit_method="KS", discrete=True) | |
# This should probably be a pmf (not pdf); using discrete=True above. | |
# original_data=False uses only the fitted data (within xmin and xmax). | |
# pdf_bin_edges: The portion of the data within the bin. | |
# observed_pdf: The probability density function (normalized histogram) | |
# of the data. | |
pdf_bin_edges, observed_pdf = self.fit.pdf(original_data=False) | |
# See the 'Distribution' class described here for info: | |
# https://pythonhosted.org/powerlaw/#powerlaw.Fit.pdf | |
theoretical_distro = self.fit.power_law | |
# The probability density function (normalized histogram) of the | |
# theoretical distribution. | |
predicted_pdf = theoretical_distro.pdf() | |
# !!!! CRITICAL VALUE FOR ZIPF !!!! | |
self.alpha = theoretical_distro.alpha | |
# Exclusive xmin: The optimal xmin *beyond which* the scaling regime of | |
# the power law fits best. | |
self.xmin = theoretical_distro.xmin | |
self.xmax = theoretical_distro.xmax | |
self.distance = theoretical_distro.KS() | |
self.ks_test = ks_2samp(observed_pdf, predicted_pdf) | |
self.pvalue = self.ks_test[1] | |
logs.info("KS test:") | |
logs.info(self.ks_test) | |
def set_xmax(self, xmax): | |
""" | |
xmax is usually None, so we add some handling to set it as the | |
maximum rank in the dataset. | |
:param xmax: | |
:return: | |
""" | |
if xmax: | |
self.xmax = int(xmax) | |
elif self.uniq_counts: | |
self.xmax = int(len(self.uniq_counts)) | |
elif self.uniq_ranks: | |
self.xmax = int(len(self.uniq_ranks)) | |
def get_xmax(self): | |
""" | |
:return: | |
""" | |
if not self.xmax: | |
self.set_xmax(self.xmax) | |
return self.xmax | |
def set_p(self, p): | |
self.p = int(p) | |
def get_p(self): | |
return int(self.p) | |
def set_xmin(self, xmin): | |
self.xmin = xmin | |
def get_xmin(self): | |
if self.xmin: | |
return int(self.xmin) | |
return self.xmin | |
def set_alpha(self, alpha): | |
self.alpha = float(alpha) | |
def get_alpha(self): | |
return float(self.alpha) | |
def set_ks_distance(self, distance): | |
self.distance = float(distance) | |
def get_ks_distance(self): | |
return self.distance | |
def calc_zipf_counts(self, vocab_counts_df): | |
""" | |
The fit is based on an optimal xmin (minimum rank) | |
Let's use this to make count estimates for the zipf fit, | |
by multiplying the fitted pmf value by the sum of counts above xmin. | |
:return: array of count values following the fitted pmf. | |
""" | |
# TODO: Limit from above xmin to below xmax, not just above xmin. | |
counts = vocab_counts_df[CNT] | |
self.uniq_counts = list(pd.unique(counts)) | |
self.uniq_ranks = list(np.arange(1, len(self.uniq_counts) + 1)) | |
logs.info(self.uniq_counts) | |
logs.info(self.xmin) | |
logs.info(self.xmax) | |
# Makes sure they are ints if not None | |
xmin = self.get_xmin() | |
xmax = self.get_xmax() | |
self.uniq_fit_counts = self.uniq_counts[xmin + 1 : xmax] | |
pmf_mass = float(sum(self.uniq_fit_counts)) | |
zipf_counts = np.array( | |
[self.estimate_count(rank, pmf_mass) for rank in self.uniq_ranks] | |
) | |
return zipf_counts | |
def estimate_count(self, rank, pmf_mass): | |
return int(round(zipf_lib.pmf(rank, self.alpha) * pmf_mass)) | |
def set_unique_ranks(self, ranks): | |
self.uniq_ranks = ranks | |
def get_unique_ranks(self): | |
return self.uniq_ranks | |
def get_unique_fit_counts(self): | |
return self.uniq_fit_counts | |
def set_unique_counts(self, counts): | |
self.uniq_counts = counts | |
def get_unique_counts(self): | |
return self.uniq_counts | |
def set_axes(self, unique_counts, unique_ranks): | |
self.uniq_counts = unique_counts | |
self.uniq_ranks = unique_ranks | |
# TODO: Incorporate this function (not currently using) | |
def fit_others(self, fit): | |
st.markdown( | |
"_Checking log likelihood ratio to see if the data is better explained by other well-behaved distributions..._" | |
) | |
# The first value returned from distribution_compare is the log likelihood ratio | |
better_distro = False | |
trunc = fit.distribution_compare("power_law", "truncated_power_law") | |
if trunc[0] < 0: | |
st.markdown("Seems a truncated power law is a better fit.") | |
better_distro = True | |
lognormal = fit.distribution_compare("power_law", "lognormal") | |
if lognormal[0] < 0: | |
st.markdown("Seems a lognormal distribution is a better fit.") | |
st.markdown("But don't panic -- that happens sometimes with language.") | |
better_distro = True | |
exponential = fit.distribution_compare("power_law", "exponential") | |
if exponential[0] < 0: | |
st.markdown("Seems an exponential distribution is a better fit. Panic.") | |
better_distro = True | |
if not better_distro: | |
st.markdown("\nSeems your data is best fit by a power law. Celebrate!!") | |