|
from numba import jit
|
|
from pynndescent.distances import wasserstein_1d, spearmanr, euclidean
|
|
import numpy as np
|
|
from pyrqa.computation import RPComputation
|
|
from pyrqa.time_series import TimeSeries, EmbeddedSeries
|
|
from pyrqa.settings import Settings
|
|
from pyrqa.analysis_type import Classic
|
|
from pyrqa.metric import EuclideanMetric
|
|
|
|
|
|
from pyrqa.neighbourhood import Unthresholded
|
|
import torch
|
|
|
|
|
|
|
|
|
|
@jit(nopython=True)
|
|
def EuclideanPyRQA_RP_stft_cpu(stft_):
|
|
result = np.zeros((stft_.shape[0], stft_.shape[0]))
|
|
|
|
for i, fft in enumerate(stft_):
|
|
for j, v in enumerate(stft_):
|
|
d = euclidean(fft, v)
|
|
result[i, j] = d
|
|
return result
|
|
|
|
@jit(nopython=True)
|
|
def wasserstein_squereform(stft_):
|
|
result = np.zeros((stft_.shape[0], stft_.shape[0]))
|
|
|
|
for i, fft in enumerate(stft_):
|
|
for j, v in enumerate(stft_):
|
|
d = wasserstein_1d(fft, v)
|
|
result[i, j] = d
|
|
return result
|
|
|
|
@jit(nopython=True)
|
|
def spearmanr_squereform(stft_):
|
|
result = np.zeros((stft_.shape[0], stft_.shape[0]))
|
|
|
|
for i, fft in enumerate(stft_):
|
|
for j, v in enumerate(stft_):
|
|
d = spearmanr(fft, v)
|
|
result[i, j] = d
|
|
return result
|
|
|
|
@jit(nopython=True)
|
|
def wasserstein_squereform_binary(stft_, eps_):
|
|
result = np.zeros((stft_.shape[0], stft_.shape[0]))
|
|
|
|
for i, fft in enumerate(stft_):
|
|
for j, v in enumerate(stft_):
|
|
d = wasserstein_1d(fft, v)
|
|
if d <= eps_:
|
|
dist = 1
|
|
else:
|
|
dist = 0
|
|
|
|
result[i, j] = dist
|
|
return result
|
|
|
|
|
|
@jit(nopython=True)
|
|
def wasserstein_1d_array(stft_):
|
|
result = np.zeros((stft_.shape[0] * stft_.shape[0]))
|
|
k = 0
|
|
for i, fft in enumerate(stft_):
|
|
for j, v in enumerate(stft_):
|
|
d = wasserstein_1d(fft, v)
|
|
result[k] = d
|
|
k += 1
|
|
|
|
return result
|
|
|
|
def set_epsilon(matrix, eps):
|
|
return np.heaviside(eps - matrix, 0)
|
|
|
|
|
|
def EuclideanPyRQA_RP(signal, embedding = 2,timedelay = 9):
|
|
time_series = TimeSeries(signal,
|
|
embedding_dimension=embedding,
|
|
time_delay=timedelay)
|
|
settings = Settings(time_series,
|
|
analysis_type=Classic,
|
|
neighbourhood=Unthresholded(),
|
|
similarity_measure=EuclideanMetric,
|
|
theiler_corrector=1)
|
|
|
|
computation = RPComputation.create(settings)
|
|
result = computation.run()
|
|
return result.recurrence_matrix
|
|
|
|
|
|
def EuclideanPyRQA_RP_stft(signal, embedding = 2,timedelay = 9):
|
|
time_series = EmbeddedSeries(signal)
|
|
settings = Settings(time_series,
|
|
analysis_type=Classic,
|
|
neighbourhood=Unthresholded(),
|
|
similarity_measure=EuclideanMetric,
|
|
theiler_corrector=1)
|
|
|
|
computation = RPComputation.create(settings)
|
|
result = computation.run()
|
|
return result.recurrence_matrix |