from dataclasses import dataclass import numpy as np import scipy.linalg as la from scipy.signal import find_peaks from math import ceil def thin_peaks(peak_list, dmin=10, voxel_size=(1,1,1), return_larger_peaks=False): """ Remove peaks within a specified distance of each other, retaining the peak with the highest intensity. Args: - peak_list (list of PeakData): Each element contains: - pos (list of float): 3D coordinates of the peak. - intensity (float): The intensity value of the peak. - key (tuple): A unique identifier or index for the peak (#trace, #peak) - dmin (float, optional): Minimum distance between peaks. peaks closer than this threshold will be thinned. Defaults to 10. - return_larger_peaks (bool, optional): Indicate larger peak for each thinned peak Returns: - list of tuples: A list containing keys of the removed peaks. if return_larger_peaks - list of tuples: A list containing the keys of the larger peak causing the peak to be removed Notes: - The function uses the L2 norm (Euclidean distance) to compute the distance between peaks. - When two peaks are within `dmin` distance, the peak with the lower intensity is removed. """ removed_peaks = [] removed_larger_peaks = [] for i in range(len(peak_list)): if peak_list[i].key in removed_peaks: continue for j in range(len(peak_list)): if i==j: continue if peak_list[j].key in removed_peaks: continue d = (np.array(peak_list[i].pos) - np.array(peak_list[j].pos))*np.array(voxel_size) d = la.norm(d) if d=alpha*np.max(v)) return np.array(pos[idx]) else: return np.array([], dtype=np.int32) def analyse_celldata(cell_data, config): """ Analyse the provided cell data to extract focus-related information. Args: cd (CellData): An instance of the CellData class containing path data information. config (dictionary): Configuration dictionary containing 'peak_threshold' and 'threshold_type' 'peak_threshold' (float) - threshold for calling peaks as foci 'threshold_type' (str) = 'per-trace', 'per-foci' Returns: tuple: A tuple containing: - foci_rel_intensity (list): List of relative intensities for the detected foci. - foci_pos (list): List of absolute positions of the detected foci. - foci_pos_index (list): List of indices of the detected foci. - screened_foci_data (list): List of RemovedPeakData indicating positions of removed peaks and the index of the larger peak - trace_median_intensities (list): Per-trace median intensity - trace_thresholds (list): Per-trace absolute threshold for calling peaks as foci """ foci_abs_intensity = [] foci_pos = [] foci_pos_index = [] screened_foci_data = [] trace_median_intensities = [] trace_thresholds = [] peak_threshold = config['peak_threshold'] threshold_type = config['threshold_type'] if threshold_type == 'per-trace': """ Call extracted peaks as foci if intensity - trace_mean > peak_threshold * (trace_max_foci_intensity - trace_mean) """ for path_data in cell_data.pathdata_list: peaks = np.array(path_data.peaks, dtype=np.int32) # Normalize extracted fluorescent intensities by subtracting mean (and dividing # by standard deviation - note that the latter should have no effect on the results). h = np.array(path_data.o_intensity) h = h - np.mean(h) h = h/np.std(h) # Extract foci according to criterion foci_idx = focus_criterion(peaks, h[peaks], peak_threshold) # removed_peaks = path_data.removed_peaks removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) if len(peaks): trace_thresholds.append((1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks])) else: trace_thresholds.append(None) if len(removed_peaks): if len(peaks): threshold = (1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks]) else: threshold = float('-inf') removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] screened_foci_idx = np.where(removed_peak_heights>threshold)[0] screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) else: screened_foci_data.append([]) pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length foci_pos.append(pos_abs) foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) foci_pos_index.append(foci_idx) trace_median_intensities.append(np.median(path_data.o_intensity)) elif threshold_type == 'per-cell': """ Call extracted peaks as foci if intensity - trace_mean > peak_threshold * max(intensity - trace_mean) """ max_cell_intensity = float("-inf") for path_data in cell_data.pathdata_list: # Normalize extracted fluorescent intensities by subtracting mean (and dividing # by standard deviation - note that the latter should have no effect on the results). h = np.array(path_data.o_intensity) h = h - np.mean(h) max_cell_intensity = max(max_cell_intensity, np.max(h)) for path_data in cell_data.pathdata_list: peaks = np.array(path_data.peaks, dtype=np.int32) # Normalize extracted fluorescent intensities by subtracting mean (and dividing # by standard deviation - note that the latter should have no effect on the results). h = np.array(path_data.o_intensity) h = h - np.mean(h) foci_idx = peaks[h[peaks]>peak_threshold*max_cell_intensity] removed_peaks = path_data.removed_peaks removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) trace_thresholds.append(np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity) if len(removed_peaks): threshold = np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] screened_foci_idx = np.where(removed_peak_heights>threshold)[0] screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) else: screened_foci_data.append([]) pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length foci_pos.append(pos_abs) foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) foci_pos_index.append(foci_idx) trace_median_intensities.append(np.median(path_data.o_intensity)) else: raise NotImplementedError return foci_abs_intensity, foci_pos, foci_pos_index, screened_foci_data, trace_median_intensities, trace_thresholds def analyse_traces(all_paths, path_lengths, measured_trace_fluorescence, config): cd = process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=config['screening_distance']) return analyse_celldata(cd, config)