NCU / BrainPulse /recurrence_quantification_analysis.py
Łukasz Furman
update app.py
a59bdc5
from numba import jit
import numpy as np
from pyrqa.computation import RPComputation, RQAComputation
from pyrqa.time_series import TimeSeries, EmbeddedSeries
from pyrqa.settings import Settings
from pyrqa.analysis_type import Classic
from pyrqa.metric import EuclideanMetric
# from pyrqa.metric import Sigmoid
# from pyrqa.metric import Cosine
from pyrqa.neighbourhood import Unthresholded,FixedRadius
def get_results(recurrence_matrix,
minimum_diagonal_line_length,
minimum_vertical_line_length,
minimum_white_vertical_line_length):
number_of_vectors = recurrence_matrix.shape[0]
diagonal = diagonal_frequency_distribution(recurrence_matrix)
vertical = vertical_frequency_distribution(recurrence_matrix)
white = white_vertical_frequency_distribution(recurrence_matrix)
number_of_vert_lines = number_of_vertical_lines(vertical, minimum_vertical_line_length)
number_of_vert_lines_points = number_of_vertical_lines_points(vertical, minimum_vertical_line_length)
RR = recurrence_rate(recurrence_matrix)
DET = determinism(number_of_vectors, diagonal, minimum_diagonal_line_length)
L = average_diagonal_line_length(number_of_vectors, diagonal, minimum_diagonal_line_length)
Lmax = longest_diagonal_line_length(number_of_vectors, diagonal)
DIV = divergence(Lmax)
Lentr = entropy_diagonal_lines(number_of_vectors, diagonal, minimum_diagonal_line_length)
DET_RR = ratio_determinism_recurrence_rate(DET, RR)
LAM = laminarity(number_of_vectors, vertical, minimum_vertical_line_length)
V = average_vertical_line_length(number_of_vectors, vertical, minimum_vertical_line_length)
Vmax = longest_vertical_line_length(number_of_vectors, vertical)
Ventr = entropy_vertical_lines(number_of_vectors, vertical, minimum_vertical_line_length)
LAM_DET = laminarity_determinism(LAM, DET)
W = average_white_vertical_line_length(number_of_vectors, white, minimum_white_vertical_line_length)
Wmax = longest_white_vertical_line_length(number_of_vectors, white)
Wentr = entropy_white_vertical_lines(number_of_vectors, white, minimum_white_vertical_line_length)
TT = trapping_time(number_of_vert_lines_points, number_of_vert_lines)
return [RR, DET, L, Lmax, DIV, Lentr, DET_RR, LAM, V, Vmax, Ventr, LAM_DET, W, Wmax, Wentr, TT]
@jit(nopython=True)
def diagonal_frequency_distribution(recurrence_matrix):
# Calculating the number of states - N
number_of_vectors = recurrence_matrix.shape[0]
diagonal_frequency_distribution = np.zeros(number_of_vectors + 1)
# Calculating the diagonal frequency distribution - P(l)
for i in range(number_of_vectors - 1, -1, -1):
diagonal_line_length = 0
for j in range(0, number_of_vectors - i):
if recurrence_matrix[i + j, j] == 1:
diagonal_line_length += 1
if j == (number_of_vectors - i - 1):
diagonal_frequency_distribution[diagonal_line_length] += 1.0
else:
if diagonal_line_length != 0:
diagonal_frequency_distribution[diagonal_line_length] += 1.0
diagonal_line_length = 0
for k in range(1, number_of_vectors):
diagonal_line_length = 0
for i in range(number_of_vectors - k):
j = i + k
if recurrence_matrix[i, j] == 1:
diagonal_line_length += 1
if j == (number_of_vectors - 1):
diagonal_frequency_distribution[diagonal_line_length] += 1.0
else:
if diagonal_line_length != 0:
diagonal_frequency_distribution[diagonal_line_length] += 1.0
diagonal_line_length = 0
return diagonal_frequency_distribution
@jit(nopython=True)
def vertical_frequency_distribution(recurrence_matrix):
number_of_vectors = recurrence_matrix.shape[0]
# Calculating the vertical frequency distribution - P(v)
vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors):
vertical_line_length = 0
for j in range(number_of_vectors):
if recurrence_matrix[i, j] == 1:
vertical_line_length += 1
if j == (number_of_vectors - 1):
vertical_frequency_distribution[vertical_line_length] += 1.0
else:
if vertical_line_length != 0:
vertical_frequency_distribution[vertical_line_length] += 1.0
vertical_line_length = 0
return vertical_frequency_distribution
@jit(nopython=True)
def white_vertical_frequency_distribution(recurrence_matrix):
number_of_vectors = recurrence_matrix.shape[0]
# Calculating the white vertical frequency distribution - P(w)
white_vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors):
white_vertical_line_length = 0
for j in range(number_of_vectors):
if recurrence_matrix[i, j] == 0:
white_vertical_line_length += 1
if j == (number_of_vectors - 1):
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
else:
if white_vertical_line_length != 0:
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
white_vertical_line_length = 0
return white_vertical_frequency_distribution
@jit(nopython=True)
def recurrence_rate(recurrence_matrix):
# Calculating the recurrence rate - RR
number_of_vectors = recurrence_matrix.shape[0]
return np.float(np.sum(recurrence_matrix)) / np.power(number_of_vectors, 2)
def determinism(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
# Calculating the determinism - DET
numerator = np.sum(
[l * diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
denominator = np.sum([l * diagonal_frequency_distribution_[l] for l in range(1, number_of_vectors)])
return numerator / denominator
def average_diagonal_line_length(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
# Calculating the average diagonal line length - L
numerator = np.sum(
[l * diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
denominator = np.sum(
[diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
return numerator / denominator
@jit(nopython=True)
def longest_diagonal_line_length(number_of_vectors, diagonal_frequency_distribution_):
# Calculating the longest diagonal line length - Lmax
for l in range(number_of_vectors - 1, 0, -1):
if diagonal_frequency_distribution_[l] != 0:
longest_diagonal_line_length = l
break
return longest_diagonal_line_length
@jit(nopython=True)
def divergence(longest_diagonal_line_length_):
# Calculating the divergence - DIV
return 1. / longest_diagonal_line_length_
@jit(nopython=True)
def entropy_diagonal_lines(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
# Calculating the entropy diagonal lines - Lentr
sum_diagonal_frequency_distribution = np.float(
np.sum(diagonal_frequency_distribution_[minimum_diagonal_line_length:-1]))
entropy_diagonal_lines = 0
for l in range(minimum_diagonal_line_length, number_of_vectors):
if diagonal_frequency_distribution_[l] != 0:
entropy_diagonal_lines += (diagonal_frequency_distribution_[
l] / sum_diagonal_frequency_distribution) * np.log(
diagonal_frequency_distribution_[l] / sum_diagonal_frequency_distribution)
entropy_diagonal_lines *= -1
return entropy_diagonal_lines
@jit(nopython=True)
def ratio_determinism_recurrence_rate(determinism_, recurrence_rate_):
# Calculating the divergence - DIV
return determinism_ / recurrence_rate_
def laminarity(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
# Calculating the laminarity - LAM
numerator = np.sum(
[v * vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
denominator = np.sum([v * vertical_frequency_distribution_[v] for v in range(1, number_of_vectors + 1)])
return numerator / denominator
def average_vertical_line_length(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
# Calculating the average vertical line length - V
numerator = np.sum(
[v * vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
denominator = np.sum(
[vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
return numerator / denominator
@jit(nopython=True)
def longest_vertical_line_length(number_of_vectors, vertical_frequency_distribution_):
# Calculating the longest vertical line length - Vmax
longest_vertical_line_length_ = 0
for v in range(number_of_vectors, 0, -1):
if vertical_frequency_distribution_[v] != 0:
longest_vertical_line_length_ = v
break
return longest_vertical_line_length_
@jit(nopython=True)
def entropy_vertical_lines(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
# Calculating the entropy vertical lines - Ventr
sum_vertical_frequency_distribution = np.float(
np.sum(vertical_frequency_distribution_[minimum_vertical_line_length:]))
entropy_vertical_lines_ = 0
for v in range(minimum_vertical_line_length, number_of_vectors + 1):
if vertical_frequency_distribution_[v] != 0:
entropy_vertical_lines_ += (vertical_frequency_distribution_[
v] / sum_vertical_frequency_distribution) * np.log(
vertical_frequency_distribution_[v] / sum_vertical_frequency_distribution)
entropy_vertical_lines_ *= -1
return entropy_vertical_lines_
@jit(nopython=True)
def laminarity_determinism(laminarity_, determinism_):
# Calculating the ratio laminarity_determinism - LAM/DET
return laminarity_ / determinism_
def average_white_vertical_line_length(number_of_vectors, white_vertical_frequency_distribution_,
minimum_white_vertical_line_length):
# Calculating the average white vertical line length - W
numerator = np.sum([w * white_vertical_frequency_distribution_[w] for w in
range(minimum_white_vertical_line_length, number_of_vectors + 1)])
denominator = np.sum([white_vertical_frequency_distribution_[w] for w in
range(minimum_white_vertical_line_length, number_of_vectors + 1)])
return numerator / denominator
@jit(nopython=True)
def longest_white_vertical_line_length(number_of_vectors, white_vertical_frequency_distribution_):
# Calculating the longest white vertical line length - Wmax
longest_white_vertical_line_length_ = 0
for w in range(number_of_vectors, 0, -1):
if white_vertical_frequency_distribution_[w] != 0:
longest_white_vertical_line_length_ = w
break
return longest_white_vertical_line_length_
@jit(nopython=True)
def entropy_white_vertical_lines(number_of_vectors, white_vertical_frequency_distribution_,
minimum_white_vertical_line_length):
# Calculating the entropy white vertical lines - Wentr
sum_white_vertical_frequency_distribution = np.float(
np.sum(white_vertical_frequency_distribution_[minimum_white_vertical_line_length:]))
entropy_white_vertical_lines_ = 0
for w in range(minimum_white_vertical_line_length, number_of_vectors + 1):
if white_vertical_frequency_distribution_[w] != 0:
entropy_white_vertical_lines_ += (white_vertical_frequency_distribution_[
w] / sum_white_vertical_frequency_distribution) * np.log(
white_vertical_frequency_distribution_[w] / sum_white_vertical_frequency_distribution)
entropy_white_vertical_lines_ *= -1
return entropy_white_vertical_lines_
def number_of_vertical_lines(vertical_frequency_distribution_, minimum_vertical_line_length):
if minimum_vertical_line_length > 0:
return np.sum(vertical_frequency_distribution_[minimum_vertical_line_length - 1:])
return np.uint(0)
def number_of_vertical_lines_points(vertical_frequency_distribution_, minimum_vertical_line_length):
if minimum_vertical_line_length > 0:
return np.sum(
((np.arange(vertical_frequency_distribution_.size) + 1) * vertical_frequency_distribution_)[minimum_vertical_line_length - 1:])
return np.uint(0)
@jit(nopython=True)
def trapping_time(number_of_vertical_lines_points_, number_of_vertical_lines_):
"""
Trapping time (TT).
"""
try:
return np.float32(number_of_vertical_lines_points_ / number_of_vertical_lines_)
except:
return 0
def return_pyRQA_results(signal, nbr):
time_series = EmbeddedSeries(signal)
settings = Settings(time_series,
analysis_type=Classic,
neighbourhood=FixedRadius(nbr),
similarity_measure=EuclideanMetric,
theiler_corrector=1)
computation = RQAComputation.create(settings,
verbose=True)
result = computation.run()
return result