summary-simi-check4qee / lex_rank_util.py
hellopahe
add limits of length 10
261e2d7
raw
history blame
3.96 kB
"""
LexRank implementation
Source: https://github.com/crabcamp/lexrank/tree/dev
"""
import numpy as np
from scipy.sparse.csgraph import connected_components
from scipy.special import softmax
import logging, math
STOP_WORDS = ["δ½œθ€…", "η‰ˆζƒζ‰€ζœ‰", "η‰ˆζ¬Šζ‰€ζœ‰", "ζŠ•θ΅„ζœ‰ι£Žι™©", "ζŠ•θ³‡ζœ‰ι’¨ιšͺ", "http", "https", "ζ₯源"]
logger = logging.getLogger(__name__)
def degree_centrality_scores(
similarity_matrix,
threshold=None,
increase_power=True,
):
if not (
threshold is None
or isinstance(threshold, float)
and 0 <= threshold < 1
):
raise ValueError(
'\'threshold\' should be a floating-point number '
'from the interval [0, 1) or None',
)
if threshold is None:
markov_matrix = create_markov_matrix(similarity_matrix)
else:
markov_matrix = create_markov_matrix_discrete(
similarity_matrix,
threshold,
)
scores = stationary_distribution(
markov_matrix,
increase_power=increase_power,
normalized=False,
)
return scores
def _power_method(transition_matrix, increase_power=True, max_iter=10000):
eigenvector = np.ones(len(transition_matrix))
if len(eigenvector) == 1:
return eigenvector
transition = transition_matrix.transpose()
for _ in range(max_iter):
eigenvector_next = np.dot(transition, eigenvector)
if np.allclose(eigenvector_next, eigenvector):
return eigenvector_next
eigenvector = eigenvector_next
if increase_power:
transition = np.dot(transition, transition)
logger.warning("Maximum number of iterations for power method exceeded without convergence!")
return eigenvector_next
def connected_nodes(matrix):
_, labels = connected_components(matrix)
groups = []
for tag in np.unique(labels):
group = np.where(labels == tag)[0]
groups.append(group)
return groups
def create_markov_matrix(weights_matrix):
n_1, n_2 = weights_matrix.shape
if n_1 != n_2:
raise ValueError('\'weights_matrix\' should be square')
row_sum = weights_matrix.sum(axis=1, keepdims=True)
# normalize probability distribution differently if we have negative transition values
if np.min(weights_matrix) <= 0:
return softmax(weights_matrix, axis=1)
return weights_matrix / row_sum
def create_markov_matrix_discrete(weights_matrix, threshold):
discrete_weights_matrix = np.zeros(weights_matrix.shape)
ixs = np.where(weights_matrix >= threshold)
discrete_weights_matrix[ixs] = 1
return create_markov_matrix(discrete_weights_matrix)
def stationary_distribution(
transition_matrix,
increase_power=True,
normalized=True,
):
n_1, n_2 = transition_matrix.shape
if n_1 != n_2:
raise ValueError('\'transition_matrix\' should be square')
distribution = np.zeros(n_1)
grouped_indices = connected_nodes(transition_matrix)
for group in grouped_indices:
t_matrix = transition_matrix[np.ix_(group, group)]
eigenvector = _power_method(t_matrix, increase_power=increase_power)
distribution[group] = eigenvector
if normalized:
distribution /= n_1
return distribution
def find_siblings_by_index(sentences: [str], central_indices: [int], siblings: int, num: int):
ret = []
for idx in central_indices:
if num < 0:
break
head = max(idx - siblings, 0)
tail = min(idx + siblings + 1, len(sentences))
for i in range(head, tail):
if i not in ret and check_valid_sentence(sentences[i]):
ret.append(i)
num -= 1
print(ret)
return ret
def check_valid_sentence(content: str):
if len(content) < 10:
return False
for stop_word in STOP_WORDS:
if stop_word in content:
return False
return True