|
from numba import jit
|
|
import numpy as np
|
|
from pyrqa.computation import RPComputation, RQAComputation
|
|
from pyrqa.time_series import TimeSeries, EmbeddedSeries
|
|
from pyrqa.settings import Settings
|
|
from pyrqa.analysis_type import Classic
|
|
from pyrqa.metric import EuclideanMetric
|
|
|
|
|
|
from pyrqa.neighbourhood import Unthresholded,FixedRadius
|
|
|
|
def get_results(recurrence_matrix,
|
|
minimum_diagonal_line_length,
|
|
minimum_vertical_line_length,
|
|
minimum_white_vertical_line_length):
|
|
|
|
number_of_vectors = recurrence_matrix.shape[0]
|
|
diagonal = diagonal_frequency_distribution(recurrence_matrix)
|
|
vertical = vertical_frequency_distribution(recurrence_matrix)
|
|
white = white_vertical_frequency_distribution(recurrence_matrix)
|
|
|
|
number_of_vert_lines = number_of_vertical_lines(vertical, minimum_vertical_line_length)
|
|
number_of_vert_lines_points = number_of_vertical_lines_points(vertical, minimum_vertical_line_length)
|
|
|
|
RR = recurrence_rate(recurrence_matrix)
|
|
DET = determinism(number_of_vectors, diagonal, minimum_diagonal_line_length)
|
|
L = average_diagonal_line_length(number_of_vectors, diagonal, minimum_diagonal_line_length)
|
|
Lmax = longest_diagonal_line_length(number_of_vectors, diagonal)
|
|
DIV = divergence(Lmax)
|
|
Lentr = entropy_diagonal_lines(number_of_vectors, diagonal, minimum_diagonal_line_length)
|
|
DET_RR = ratio_determinism_recurrence_rate(DET, RR)
|
|
LAM = laminarity(number_of_vectors, vertical, minimum_vertical_line_length)
|
|
V = average_vertical_line_length(number_of_vectors, vertical, minimum_vertical_line_length)
|
|
Vmax = longest_vertical_line_length(number_of_vectors, vertical)
|
|
Ventr = entropy_vertical_lines(number_of_vectors, vertical, minimum_vertical_line_length)
|
|
LAM_DET = laminarity_determinism(LAM, DET)
|
|
W = average_white_vertical_line_length(number_of_vectors, white, minimum_white_vertical_line_length)
|
|
Wmax = longest_white_vertical_line_length(number_of_vectors, white)
|
|
Wentr = entropy_white_vertical_lines(number_of_vectors, white, minimum_white_vertical_line_length)
|
|
TT = trapping_time(number_of_vert_lines_points, number_of_vert_lines)
|
|
|
|
return [RR, DET, L, Lmax, DIV, Lentr, DET_RR, LAM, V, Vmax, Ventr, LAM_DET, W, Wmax, Wentr, TT]
|
|
|
|
|
|
@jit(nopython=True)
|
|
def diagonal_frequency_distribution(recurrence_matrix):
|
|
|
|
number_of_vectors = recurrence_matrix.shape[0]
|
|
diagonal_frequency_distribution = np.zeros(number_of_vectors + 1)
|
|
|
|
|
|
for i in range(number_of_vectors - 1, -1, -1):
|
|
diagonal_line_length = 0
|
|
for j in range(0, number_of_vectors - i):
|
|
if recurrence_matrix[i + j, j] == 1:
|
|
diagonal_line_length += 1
|
|
if j == (number_of_vectors - i - 1):
|
|
diagonal_frequency_distribution[diagonal_line_length] += 1.0
|
|
else:
|
|
if diagonal_line_length != 0:
|
|
diagonal_frequency_distribution[diagonal_line_length] += 1.0
|
|
diagonal_line_length = 0
|
|
for k in range(1, number_of_vectors):
|
|
diagonal_line_length = 0
|
|
for i in range(number_of_vectors - k):
|
|
j = i + k
|
|
if recurrence_matrix[i, j] == 1:
|
|
diagonal_line_length += 1
|
|
if j == (number_of_vectors - 1):
|
|
diagonal_frequency_distribution[diagonal_line_length] += 1.0
|
|
else:
|
|
if diagonal_line_length != 0:
|
|
diagonal_frequency_distribution[diagonal_line_length] += 1.0
|
|
diagonal_line_length = 0
|
|
|
|
return diagonal_frequency_distribution
|
|
|
|
|
|
@jit(nopython=True)
|
|
def vertical_frequency_distribution(recurrence_matrix):
|
|
number_of_vectors = recurrence_matrix.shape[0]
|
|
|
|
|
|
vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
|
|
for i in range(number_of_vectors):
|
|
vertical_line_length = 0
|
|
for j in range(number_of_vectors):
|
|
if recurrence_matrix[i, j] == 1:
|
|
vertical_line_length += 1
|
|
if j == (number_of_vectors - 1):
|
|
vertical_frequency_distribution[vertical_line_length] += 1.0
|
|
else:
|
|
if vertical_line_length != 0:
|
|
vertical_frequency_distribution[vertical_line_length] += 1.0
|
|
vertical_line_length = 0
|
|
|
|
return vertical_frequency_distribution
|
|
|
|
|
|
@jit(nopython=True)
|
|
def white_vertical_frequency_distribution(recurrence_matrix):
|
|
number_of_vectors = recurrence_matrix.shape[0]
|
|
|
|
|
|
white_vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
|
|
for i in range(number_of_vectors):
|
|
white_vertical_line_length = 0
|
|
for j in range(number_of_vectors):
|
|
if recurrence_matrix[i, j] == 0:
|
|
white_vertical_line_length += 1
|
|
if j == (number_of_vectors - 1):
|
|
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
|
|
else:
|
|
if white_vertical_line_length != 0:
|
|
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
|
|
white_vertical_line_length = 0
|
|
|
|
return white_vertical_frequency_distribution
|
|
|
|
|
|
@jit(nopython=True)
|
|
def recurrence_rate(recurrence_matrix):
|
|
|
|
number_of_vectors = recurrence_matrix.shape[0]
|
|
return np.float(np.sum(recurrence_matrix)) / np.power(number_of_vectors, 2)
|
|
|
|
|
|
def determinism(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
|
|
|
|
numerator = np.sum(
|
|
[l * diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
|
|
denominator = np.sum([l * diagonal_frequency_distribution_[l] for l in range(1, number_of_vectors)])
|
|
return numerator / denominator
|
|
|
|
|
|
def average_diagonal_line_length(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
|
|
|
|
numerator = np.sum(
|
|
[l * diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
|
|
denominator = np.sum(
|
|
[diagonal_frequency_distribution_[l] for l in range(minimum_diagonal_line_length, number_of_vectors)])
|
|
return numerator / denominator
|
|
|
|
|
|
@jit(nopython=True)
|
|
def longest_diagonal_line_length(number_of_vectors, diagonal_frequency_distribution_):
|
|
|
|
for l in range(number_of_vectors - 1, 0, -1):
|
|
if diagonal_frequency_distribution_[l] != 0:
|
|
longest_diagonal_line_length = l
|
|
break
|
|
return longest_diagonal_line_length
|
|
|
|
|
|
@jit(nopython=True)
|
|
def divergence(longest_diagonal_line_length_):
|
|
|
|
return 1. / longest_diagonal_line_length_
|
|
|
|
|
|
@jit(nopython=True)
|
|
def entropy_diagonal_lines(number_of_vectors, diagonal_frequency_distribution_, minimum_diagonal_line_length):
|
|
|
|
sum_diagonal_frequency_distribution = np.float(
|
|
np.sum(diagonal_frequency_distribution_[minimum_diagonal_line_length:-1]))
|
|
entropy_diagonal_lines = 0
|
|
for l in range(minimum_diagonal_line_length, number_of_vectors):
|
|
if diagonal_frequency_distribution_[l] != 0:
|
|
entropy_diagonal_lines += (diagonal_frequency_distribution_[
|
|
l] / sum_diagonal_frequency_distribution) * np.log(
|
|
diagonal_frequency_distribution_[l] / sum_diagonal_frequency_distribution)
|
|
entropy_diagonal_lines *= -1
|
|
return entropy_diagonal_lines
|
|
|
|
|
|
@jit(nopython=True)
|
|
def ratio_determinism_recurrence_rate(determinism_, recurrence_rate_):
|
|
|
|
return determinism_ / recurrence_rate_
|
|
|
|
|
|
def laminarity(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
|
|
|
|
numerator = np.sum(
|
|
[v * vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
|
|
denominator = np.sum([v * vertical_frequency_distribution_[v] for v in range(1, number_of_vectors + 1)])
|
|
return numerator / denominator
|
|
|
|
|
|
def average_vertical_line_length(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
|
|
|
|
numerator = np.sum(
|
|
[v * vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
|
|
denominator = np.sum(
|
|
[vertical_frequency_distribution_[v] for v in range(minimum_vertical_line_length, number_of_vectors + 1)])
|
|
return numerator / denominator
|
|
|
|
|
|
@jit(nopython=True)
|
|
def longest_vertical_line_length(number_of_vectors, vertical_frequency_distribution_):
|
|
|
|
longest_vertical_line_length_ = 0
|
|
for v in range(number_of_vectors, 0, -1):
|
|
if vertical_frequency_distribution_[v] != 0:
|
|
longest_vertical_line_length_ = v
|
|
break
|
|
return longest_vertical_line_length_
|
|
|
|
|
|
@jit(nopython=True)
|
|
def entropy_vertical_lines(number_of_vectors, vertical_frequency_distribution_, minimum_vertical_line_length):
|
|
|
|
sum_vertical_frequency_distribution = np.float(
|
|
np.sum(vertical_frequency_distribution_[minimum_vertical_line_length:]))
|
|
entropy_vertical_lines_ = 0
|
|
for v in range(minimum_vertical_line_length, number_of_vectors + 1):
|
|
if vertical_frequency_distribution_[v] != 0:
|
|
entropy_vertical_lines_ += (vertical_frequency_distribution_[
|
|
v] / sum_vertical_frequency_distribution) * np.log(
|
|
vertical_frequency_distribution_[v] / sum_vertical_frequency_distribution)
|
|
entropy_vertical_lines_ *= -1
|
|
return entropy_vertical_lines_
|
|
|
|
|
|
@jit(nopython=True)
|
|
def laminarity_determinism(laminarity_, determinism_):
|
|
|
|
return laminarity_ / determinism_
|
|
|
|
|
|
def average_white_vertical_line_length(number_of_vectors, white_vertical_frequency_distribution_,
|
|
minimum_white_vertical_line_length):
|
|
|
|
numerator = np.sum([w * white_vertical_frequency_distribution_[w] for w in
|
|
range(minimum_white_vertical_line_length, number_of_vectors + 1)])
|
|
denominator = np.sum([white_vertical_frequency_distribution_[w] for w in
|
|
range(minimum_white_vertical_line_length, number_of_vectors + 1)])
|
|
return numerator / denominator
|
|
|
|
|
|
@jit(nopython=True)
|
|
def longest_white_vertical_line_length(number_of_vectors, white_vertical_frequency_distribution_):
|
|
|
|
longest_white_vertical_line_length_ = 0
|
|
for w in range(number_of_vectors, 0, -1):
|
|
if white_vertical_frequency_distribution_[w] != 0:
|
|
longest_white_vertical_line_length_ = w
|
|
break
|
|
return longest_white_vertical_line_length_
|
|
|
|
|
|
@jit(nopython=True)
|
|
def entropy_white_vertical_lines(number_of_vectors, white_vertical_frequency_distribution_,
|
|
minimum_white_vertical_line_length):
|
|
|
|
sum_white_vertical_frequency_distribution = np.float(
|
|
np.sum(white_vertical_frequency_distribution_[minimum_white_vertical_line_length:]))
|
|
entropy_white_vertical_lines_ = 0
|
|
for w in range(minimum_white_vertical_line_length, number_of_vectors + 1):
|
|
if white_vertical_frequency_distribution_[w] != 0:
|
|
entropy_white_vertical_lines_ += (white_vertical_frequency_distribution_[
|
|
w] / sum_white_vertical_frequency_distribution) * np.log(
|
|
white_vertical_frequency_distribution_[w] / sum_white_vertical_frequency_distribution)
|
|
entropy_white_vertical_lines_ *= -1
|
|
return entropy_white_vertical_lines_
|
|
|
|
def number_of_vertical_lines(vertical_frequency_distribution_, minimum_vertical_line_length):
|
|
if minimum_vertical_line_length > 0:
|
|
return np.sum(vertical_frequency_distribution_[minimum_vertical_line_length - 1:])
|
|
|
|
return np.uint(0)
|
|
|
|
|
|
def number_of_vertical_lines_points(vertical_frequency_distribution_, minimum_vertical_line_length):
|
|
if minimum_vertical_line_length > 0:
|
|
return np.sum(
|
|
((np.arange(vertical_frequency_distribution_.size) + 1) * vertical_frequency_distribution_)[minimum_vertical_line_length - 1:])
|
|
|
|
return np.uint(0)
|
|
|
|
@jit(nopython=True)
|
|
def trapping_time(number_of_vertical_lines_points_, number_of_vertical_lines_):
|
|
"""
|
|
Trapping time (TT).
|
|
"""
|
|
try:
|
|
return np.float32(number_of_vertical_lines_points_ / number_of_vertical_lines_)
|
|
except:
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def return_pyRQA_results(signal, nbr):
|
|
time_series = EmbeddedSeries(signal)
|
|
settings = Settings(time_series,
|
|
analysis_type=Classic,
|
|
neighbourhood=FixedRadius(nbr),
|
|
similarity_measure=EuclideanMetric,
|
|
theiler_corrector=1)
|
|
computation = RQAComputation.create(settings,
|
|
verbose=True)
|
|
result = computation.run()
|
|
return result
|
|
|