""" LexRank implementation Source: https://github.com/crabcamp/lexrank/tree/dev """ import numpy as np from scipy.sparse.csgraph import connected_components from scipy.special import softmax import logging, math STOP_WORDS = ["作者", "版权所有", "版權所有", "投资有风险", "投資有風險", "http", "https", "来源"] logger = logging.getLogger(__name__) def degree_centrality_scores( similarity_matrix, threshold=None, increase_power=True, ): if not ( threshold is None or isinstance(threshold, float) and 0 <= threshold < 1 ): raise ValueError( '\'threshold\' should be a floating-point number ' 'from the interval [0, 1) or None', ) if threshold is None: markov_matrix = create_markov_matrix(similarity_matrix) else: markov_matrix = create_markov_matrix_discrete( similarity_matrix, threshold, ) scores = stationary_distribution( markov_matrix, increase_power=increase_power, normalized=False, ) return scores def _power_method(transition_matrix, increase_power=True, max_iter=10000): eigenvector = np.ones(len(transition_matrix)) if len(eigenvector) == 1: return eigenvector transition = transition_matrix.transpose() for _ in range(max_iter): eigenvector_next = np.dot(transition, eigenvector) if np.allclose(eigenvector_next, eigenvector): return eigenvector_next eigenvector = eigenvector_next if increase_power: transition = np.dot(transition, transition) logger.warning("Maximum number of iterations for power method exceeded without convergence!") return eigenvector_next def connected_nodes(matrix): _, labels = connected_components(matrix) groups = [] for tag in np.unique(labels): group = np.where(labels == tag)[0] groups.append(group) return groups def create_markov_matrix(weights_matrix): n_1, n_2 = weights_matrix.shape if n_1 != n_2: raise ValueError('\'weights_matrix\' should be square') row_sum = weights_matrix.sum(axis=1, keepdims=True) # normalize probability distribution differently if we have negative transition values if np.min(weights_matrix) <= 0: return softmax(weights_matrix, axis=1) return weights_matrix / row_sum def create_markov_matrix_discrete(weights_matrix, threshold): discrete_weights_matrix = np.zeros(weights_matrix.shape) ixs = np.where(weights_matrix >= threshold) discrete_weights_matrix[ixs] = 1 return create_markov_matrix(discrete_weights_matrix) def stationary_distribution( transition_matrix, increase_power=True, normalized=True, ): n_1, n_2 = transition_matrix.shape if n_1 != n_2: raise ValueError('\'transition_matrix\' should be square') distribution = np.zeros(n_1) grouped_indices = connected_nodes(transition_matrix) for group in grouped_indices: t_matrix = transition_matrix[np.ix_(group, group)] eigenvector = _power_method(t_matrix, increase_power=increase_power) distribution[group] = eigenvector if normalized: distribution /= n_1 return distribution def find_siblings_by_index(sentences: [str], central_indices: [int], siblings: int, num: int): ret = [] for idx in central_indices: if num < 0: break head = max(idx - siblings, 0) tail = min(idx + siblings + 1, len(sentences)) for i in range(head, tail): if i not in ret and check_valid_sentence(sentences[i]): ret.append(i) num -= 1 print(ret) return ret def check_valid_sentence(content: str): if len(content) < 10: return False for stop_word in STOP_WORDS: if stop_word in content: return False return True