Source code for pyleoclim.utils.tsutils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities to manipulate timeseries - useful for preprocessing prior to analysis
"""

__all__ = [
    'simple_stats',
    'bin',
    'interp',
    'gkernel',
    'standardize',
    'ts2segments',
    'annualize',
    'gaussianize',
    'detrend',
    'detect_outliers_DBSCAN',
    'detect_outliers_kmeans',
    'remove_outliers',
    'phaseran',
    'phaseran2'
]

import numpy as np
from numpy import pi
import pandas as pd
import warnings
import copy
from scipy import special
from scipy import signal
from scipy import interpolate
from scipy.interpolate import splrep, splev
from scipy import stats
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.neighbors import LocalOutlierFactor
#import matplotlib.pyplot as plt

import statsmodels.tsa.stattools as sms

import math
from .filter import savitzky_golay

from .tsbase import (
    clean_ts,
    dropna,
)


from .emd_utils import (
    extr,
    boundary_conditions)

[docs]class EmpiricalModeDecomposition(object): """The EMD class. This class has been adapted from the pyhht package (https://github.com/jaidevd/pyhht/tree/dev) and adapated to work with scipy 0.12.0. See the copyright notice in the emd_utils module.""" def __init__(self, x, t=None, threshold_1=0.05, threshold_2=0.5, alpha=0.05, ndirs=4, fixe=0, maxiter=2000, fixe_h=0, n_imfs=0, nbsym=2, bivariate_mode='bbox_center'): """Empirical mode decomposition. Parameters ---------- x : array-like, shape (n_samples,) The signal on which to perform EMD t : array-like, shape (n_samples,), optional The timestamps of the signal. threshold_1 : float, optional Threshold for the stopping criterion, corresponding to :math:`\\theta_{1}` in [3]. Defaults to 0.05. threshold_2 : float, optional Threshold for the stopping criterion, corresponding to :math:`\\theta_{2}` in [3]. Defaults to 0.5. alpha : float, optional Tolerance for the stopping criterion, corresponding to :math:`\\alpha` in [3]. Defaults to 0.05. ndirs : int, optional Number of directions in which interpolants for envelopes are computed for bivariate EMD. Defaults to 4. This is ignored if the signal is real valued. fixe : int, optional Number of sifting iterations to perform for each IMF. By default, the stopping criterion mentioned in [1] is used. If set to a positive integer, each mode is either the result of exactly `fixe` number of sifting iterations, or until a pure IMF is found, whichever is sooner. maxiter : int, optional Upper limit of the number of sifting iterations for each mode. Defaults to 2000. n_imfs : int, optional Number of IMFs to extract. By default, this is ignored and decomposition is continued until a monotonic trend is left in the residue. nbsym : int, optional Number of extrema to use to mirror the signals on each side of their boundaries. bivariate_mode : str, optional The algorithm to be used for bivariate EMD as described in [4]. Can be one of 'centroid' or 'bbox_center'. This is ignored if the signal is real valued. Attributes ---------- is_bivariate : bool Whether the decomposer performs bivariate EMD. This is automatically determined by the input value. This is True if at least one non-zero imaginary component is found in the signal. nbits : list List of number of sifting iterations it took to extract each IMF. References ---------- .. [1] Huang H. et al. 1998 'The empirical mode decomposition and the \ Hilbert spectrum for nonlinear and non-stationary time series \ analysis.' \ Procedings of the Royal Society 454, 903-995 .. [2] Zhao J., Huang D. 2001 'Mirror extending and circular spline \ function for empirical mode decomposition method'. \ Journal of Zhejiang University (Science) V.2, No.3, 247-252 .. [3] Gabriel Rilling, Patrick Flandrin, Paulo Gonçalves, June 2003: \ 'On Empirical Mode Decomposition and its Algorithms',\ IEEE-EURASIP Workshop on Nonlinear Signal and Image Processing \ NSIP-03 .. [4] Gabriel Rilling, Patrick Flandrin, Paulo Gonçalves, \ Jonathan M. Lilly. Bivariate Empirical Mode Decomposition. \ 10 pages, 3 figures. Submitted to Signal Processing Letters, \ IEEE. Matlab/C codes and additional .. 2007. <ensl-00137611> .. [5] https://github.com/jaidevd/pyhht/tree/dev """ self.threshold_1 = threshold_1 self.threshold_2 = threshold_2 self.alpha = alpha self.maxiter = maxiter self.fixe_h = fixe_h self.ndirs = ndirs self.nbit = 0 self.Nbit = 0 self.n_imfs = n_imfs self.k = 1 # self.mask = mask self.nbsym = nbsym self.nbit = 0 self.NbIt = 0 if x.ndim > 1: if 1 not in x.shape: raise ValueError("x must have only one row or one column.") if x.shape[0] > 1: x = x.ravel() if np.any(np.isinf(x)): raise ValueError("All elements of x must be finite.") self.x = x self.ner = self.nzr = len(self.x) self.residue = self.x.copy() if t is None: self.t = np.arange(max(x.shape)) else: if t.shape != self.x.shape: raise ValueError("t must have the same dimensions as x.") if t.ndim > 1: if 1 not in t.shape: raise ValueError("t must have only one column or one row.") if not np.all(np.isreal(t)): raise TypeError("t must be a real vector.") if t.shape[0] > 1: t = t.ravel() self.t = t if fixe: self.maxiter = fixe if self.fixe_h: raise TypeError("Cannot use both fixe and fixe_h modes") self.fixe = fixe self.is_bivariate = np.any(np.iscomplex(self.x)) if self.is_bivariate: self.bivariate_mode = bivariate_mode self.imf = [] self.nbits = []
[docs] def io(self): r"""Compute the index of orthoginality, as defined by: .. math:: \sum_{i,j=1,i\neq j}^{N}\frac{\|C_{i}\overline{C_{j}}\|}{\|x\|^2} Where :math:`C_{i}` is the :math:`i` th IMF. Returns ------- float Index of orthogonality. Lower values are better. """ imf = np.array(self.imf) dp = np.dot(imf, np.conj(imf).T) mask = np.logical_not(np.eye(len(self.imf))) s = np.abs(dp[mask]).sum() return s / (2 * np.sum(self.x ** 2))
[docs] def stop_EMD(self): """Check if there are enough extrema (3) to continue sifting. Returns ------- bool Whether to stop further cubic spline interpolation for lack of local extrema. """ if self.is_bivariate: stop = False for k in range(self.ndirs): phi = k * pi / self.ndirs indmin, indmax, _ = extr( np.real(np.exp(1j * phi) * self.residue)) if len(indmin) + len(indmax) < 3: stop = True break else: indmin, indmax, _ = extr(self.residue) ner = len(indmin) + len(indmax) stop = ner < 3 return stop
[docs] def mean_and_amplitude(self, m): """ Compute the mean of the envelopes and the mode amplitudes. Parameters ---------- m : array-like, shape (n_samples,) The input array or an itermediate value of the sifting process. Returns ------- tuple A tuple containing the mean of the envelopes, the number of extrema, the number of zero crosssing and the estimate of the amplitude of themode. """ if self.is_bivariate: if self.bivariate_mode == 'centroid': nem = [] nzm = [] envmin = np.zeros((self.ndirs, len(self.t))) envmax = np.zeros((self.ndirs, len(self.t))) for k in range(self.ndirs): phi = k * pi / self.ndirs y = np.real(np.exp(-1j * phi) * m) indmin, indmax, indzer = extr(y) nem.append(len(indmin) + len(indmax)) nzm.append(len(indzer)) if self.nbsym: tmin, tmax, zmin, zmax = boundary_conditions( y, self.t, m, self.nbsym) else: tmin = np.r_[self.t[0], self.t[indmin], self.t[-1]] tmax = np.r_[self.t[0], self.t[indmax], self.t[-1]] zmin, zmax = m[tmin], m[tmax] f = splrep(tmin, zmin) spl = splev(self.t, f) envmin[k, :] = spl f = splrep(tmax, zmax) spl = splev(self.t, f) envmax[k, :] = spl envmoy = np.mean((envmin + envmax) / 2, axis=0) amp = np.mean(abs(envmax - envmin), axis=0) / 2 elif self.bivariate_mode == 'bbox_center': nem = [] nzm = [] envmin = np.zeros((self.ndirs, len(self.t)), dtype=complex) envmax = np.zeros((self.ndirs, len(self.t)), dtype=complex) for k in range(self.ndirs): phi = k * pi / self.ndirs y = np.real(np.exp(-1j * phi) * m) indmin, indmax, indzer = extr(y) nem.append(len(indmin) + len(indmax)) nzm.append(len(indzer)) if self.nbsym: tmin, tmax, zmin, zmax = boundary_conditions( y, self.t, m, self.nbsym) else: tmin = np.r_[self.t[0], self.t[indmin], self.t[-1]] tmax = np.r_[self.t[0], self.t[indmax], self.t[-1]] zmin, zmax = m[tmin], m[tmax] f = splrep(tmin, zmin) spl = splev(self.t, f) envmin[k, ] = np.exp(1j * phi) * spl f = splrep(tmax, zmax) spl = splev(self.t, f) envmax[k, ] = np.exp(1j * phi) * spl envmoy = np.mean((envmin + envmax), axis=0) amp = np.mean(abs(envmax - envmin), axis=0) / 2 else: indmin, indmax, indzer = extr(m) nem = len(indmin) + len(indmax) nzm = len(indzer) if self.nbsym: tmin, tmax, mmin, mmax = boundary_conditions(m, self.t, m, self.nbsym) else: tmin = np.r_[self.t[0], self.t[indmin], self.t[-1]] tmax = np.r_[self.t[0], self.t[indmax], self.t[-1]] mmin, mmax = m[tmin], m[tmax] f = splrep(tmin, mmin) envmin = splev(self.t, f) f = splrep(tmax, mmax) envmax = splev(self.t, f) envmoy = (envmin + envmax) / 2 amp = np.abs(envmax - envmin) / 2.0 if self.is_bivariate: nem = np.array(nem) nzm = np.array(nzm) return envmoy, nem, nzm, amp
[docs] def stop_sifting(self, m): """Evaluate the stopping criteria for the current mode. Parameters ---------- m : array-like, shape (n_samples,) The current mode. Returns ------- bool Whether to stop sifting. If this evaluates to true, the current mode is interpreted as an IMF. """ # FIXME: This method needs a better name. if self.fixe: (moyenne, _, _, _), stop_sift = self.mean_and_amplitude(m), 0 # NOQA elif self.fixe_h: stop_count = 0 try: moyenne, nem, nzm = self.mean_and_amplitude(m)[:3] if np.all(abs(nzm - nem) > 1): stop = 0 stop_count = 0 else: stop_count += 1 stop = (stop_count == self.fixe_h) except: moyenne = np.zeros((len(m))) stop = 1 stop_sift = stop else: try: envmoy, nem, nzm, amp = self.mean_and_amplitude(m) except TypeError as err: if err.args[0] == "m > k must hold": return 1, np.zeros((len(m))) except ValueError as err: if err.args[0] == "Not enough extrema.": return 1, np.zeros((len(m))) sx = np.abs(envmoy) / amp stop = not(((np.mean(sx > self.threshold_1) > self.alpha) or np.any(sx > self.threshold_2)) and np.all(nem > 2)) if not self.is_bivariate: stop = stop and not(np.abs(nzm - nem) > 1) stop_sift = stop moyenne = envmoy return stop_sift, moyenne
[docs] def keep_decomposing(self): """Check whether to continue the sifting operation.""" return not(self.stop_EMD()) and \ (self.k < self.n_imfs + 1 or self.n_imfs == 0) # and \
# not(np.any(self.mask))
[docs] def decompose(self): """Decompose the input signal into IMFs. This function does all the heavy lifting required for sifting, and should ideally be the only public method of this class. Returns ------- imfs : array-like, shape (n_imfs, n_samples) A matrix containing one IMF per row. Examples -------- >>> from pyhht.visualization import plot_imfs >>> import numpy as np >>> t = np.linspace(0, 1, 1000) >>> modes = np.sin(2 * pi * 5 * t) + np.sin(2 * pi * 10 * t) >>> x = modes + t >>> decomposer = EMD(x) >>> imfs = decomposer.decompose() """ while self.keep_decomposing(): # current mode m = self.residue # computing mean and stopping criterion stop_sift, moyenne = self.stop_sifting(m) # in case current mode is small enough to cause spurious extrema if np.max(np.abs(m)) < (1e-10) * np.max(np.abs(self.x)): if not stop_sift: warnings.warn( "EMD Warning: Amplitude too small, stopping.") else: print("Force stopping EMD: amplitude too small.") return # SIFTING LOOP: while not(stop_sift) and (self.nbit < self.maxiter): # The following should be controlled by a verbosity parameter. # if (not(self.is_bivariate) and # (self.nbit > self.maxiter / 5) and # self.nbit % np.floor(self.maxiter / 10) == 0 and # not(self.fixe) and self.nbit > 100): # print("Mode " + str(self.k) + # ", Iteration " + str(self.nbit)) # im, iM, _ = extr(m) # print(str(np.sum(m[im] > 0)) + " minima > 0; " + # str(np.sum(m[im] < 0)) + " maxima < 0.") # Sifting m = m - moyenne # Computing mean and stopping criterion stop_sift, moyenne = self.stop_sifting(m) self.nbit += 1 self.NbIt += 1 # This following warning depends on verbosity and needs better # handling # if not self.fixe and self.nbit > 100(self.nbit == # (self.maxiter - 1)) and not(self.fixe) and (self.nbit > 100): # warnings.warn("Emd:warning, Forced stop of sifting - " + # "Maximum iteration limit reached.") self.imf.append(m) self.nbits.append(self.nbit) self.nbit = 0 self.k += 1 self.residue = self.residue - m self.ort = self.io() if np.any(self.residue): self.imf.append(self.residue) return np.array(self.imf)
[docs]def simple_stats(y, axis=None): """ Computes simple statistics Computes the mean, median, min, max, standard deviation, and interquartile range of a numpy array y, ignoring NaNs. Parameters ---------- y: array A Numpy array axis : int, tuple of ints Axis or Axes along which the means are computed, the default is to compute the mean of the flattened array. If a tuple of ints, performed over multiple axes Returns ------- mean : float mean of y, ignoring NaNs median : float median of y, ignoring NaNs min_ : float mininum value in y, ignoring NaNs max_ : float max value in y, ignoring NaNs std : float standard deviation of y, ignoring NaNs IQR : float Interquartile range of y along specified axis, ignoring NaNs """ # make sure that y is an array y = np.array(y, dtype='float64') # Perform the various calculations mean = np.nanmean(y, axis=axis) std = np.nanstd(y, axis=axis) median = np.nanmedian(y, axis=axis) min_ = np.nanmin(y, axis=axis) max_ = np.nanmax(y, axis=axis) IQR = np.nanpercentile(y, 75, axis=axis) - np.nanpercentile(y, 25, axis=axis) return mean, median, min_, max_, std, IQR
def bin(x, y, bin_size=None, start=None, stop=None, step_style=None, evenly_spaced = False, statistic = 'mean', bin_edges=None, time_axis=None,no_nans = True): """ Bin the values The behavior of bins, as defined either by start, stop and step or by the bins argument, is to have all bins except the last one be half open. That is if bins are defined as bins = [1,2,3,4], bins will be [1,2), [2,3), [3,4]. This is the default behaviour of scipy.stats.binned_statistic (upon which this function is built). Parameters ---------- x : array The x-axis series. y : array The y-axis series. bin_size : float The size of the bins. Default is the maximum resolution if no_nans is True. start : float Where/when to start binning. Default is the minimum. stop : float When/where to stop binning. Default is the maximum. step_style : str; {'min','mean','median','max'} Step style to use when determining the size of the interval between points. Default is None. evenly_spaced : {True,False} Makes the series evenly-spaced. This option is ignored if bin_size is set to float. This option is being deprecated, no_nans should be used instead. statistic : str Statistic to calculate and return in values. Default is 'mean'. See scipy.stats.binned_statistic for other options. bin_edges : np.ndarray The edge of bins to use for binning. E.g. if bins = [1,2,3,4], bins will be [1,2), [2,3), [3,4]. See scipy.stats.binned_statistic for details. Start, stop, bin_size, step_style, and time_axis will be ignored if this is passed. time_axis : np.ndarray The time axis to use for binning. If passed, bin_edges will be set as the midpoints between times. The first time will be used as the left most edge, the last time will be used as the right most edge. Start, stop, bin_size, and step_style will be ignored if this is passed. no_nans : bool; {True,False} Sets the step_style to max, ensuring that the resulting series contains no empty values (nans). Default is True. Returns ------- binned_values : array The binned values bins : array The bins (centered on the median, i.e., the 100-200 bin is 150) n : array Number of data points in each bin error : array The standard error on the mean in each bin Notes ----- `start`, `stop`, `bin_size`, and `step_style` are interpreted as defining the `bin_edges` for this function. This differs from the `interp` interpretation, which uses these to define the time axis over which interpolation is applied. For `bin`, the time axis will be specified as the midpoints between `bin_edges`, unless `time_axis` is explicitly passed. See also -------- pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel pyleoclim.utils.tsutils.interp : Interpolate y onto a new x-axis `scipy.stats.binned_statistic <https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.binned_statistic.html>`_ : Scipy function around which this function is written Examples -------- There are several ways to specify the way binning is conducted via this function. Within these there is a hierarchy which we demonstrate below. Top priority is given to `bin_edges` if it is not None. All other arguments will be ignored (except for x and y). The resulting time axis will be comprised of the midpoints between bin edges. .. jupyter-execute:: import numpy as np import pyleoclim as pyleo x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(t)) xb,yb = pyleo.utils.tsutils.bin(x,y,bin_edges=[1,4,8,12,16,20]) xb Next, priority will go to `time_axis` if it is passed. In this case, bin edges will be taken as the midpoints between time axis points. The first and last time point will be used as the left most and right most bin edges. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(t)) xb,yb = pyleo.utils.tsutils.bin(x,y,time_axis=[1,4,8,12,16,20]) xb If `time_axis` is None, `bin_size` will be considered, overriding `step_style if it is passed. `start` and `stop` will be generated using defaults if not passed. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(t)) xb,yb = pyleo.utils.tsutils.bin(x,y,bin_size=2) xb If both `time_axis` and `step` are None but `step_style` is specified, the step will be generated using the prescribed `step_style`. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(t)) xb,yb = pyleo.utils.tsutils.bin(x,y,step_style='max') xb If none of these are specified, the mean spacing will be used. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(t)) xb,yb = pyleo.utils.tsutils.bin(x,y) xb """ if evenly_spaced: no_nans=True warnings.warn('`evenly_spaced` is being deprecated. Please switch to using the option `no_nans` (behaviour is identical).',DeprecationWarning,stacklevel=2) # Make sure x and y are numpy arrays x = np.array(x, dtype='float64') y = np.array(y, dtype='float64') # Set the bin edges if bin_edges is not None: if start is not None or stop is not None or bin_size is not None or step_style is not None or time_axis is not None: warnings.warn('Bins have been passed with other bin relevant arguments {start,stop,bin_size,step_style,time_axis}. Bin_edges take priority and will be used.',stacklevel=2) time_axis = (bin_edges[1:] + bin_edges[:-1])/2 # A bit of wonk is required to get the proper bin edges from the time axis elif time_axis is not None: if start is not None or stop is not None or bin_size is not None or step_style is not None: warnings.warn('The time axis has been passed with other time axis relevant arguments {start,stop,bin_size,step_style}. Time_axis takes priority and will be used.',stacklevel=2) bin_edges = np.zeros(len(time_axis)+1) bin_edges[0] = time_axis[0] bin_edges[-1] = time_axis[-1] bin_edges[1:-1] = (time_axis[1:]+time_axis[:-1])/2 else: bin_edges = make_even_axis(x=x,start=start,stop=stop,step=bin_size,step_style=step_style,no_nans=no_nans) time_axis = (bin_edges[1:]+bin_edges[:-1])/2 # Perform the calculation binned_values = stats.binned_statistic(x=x,values=y,bins=bin_edges,statistic=statistic).statistic n = stats.binned_statistic(x=x,values=y,bins=bin_edges,statistic='count').statistic error = stats.binned_statistic(x=x,values=y,bins=bin_edges,statistic='std').statistic #Returned bins should be at the midpoint of the bin edges res_dict = { 'bins': time_axis, 'binned_values': binned_values, 'n': n, 'error': error, } if no_nans: check = np.isnan(binned_values).any() if check: warnings.warn('no_nans is set to True but nans are present in the series. It has likely been overridden by other parameters. See tsutils.bin() documentation for details on parameter hierarchy',stacklevel=2) return res_dict
[docs]def gkernel(t,y, h = None, step=None,start=None,stop=None, step_style = None, evenly_spaced=False, bin_edges=None, time_axis=None,no_nans=True): '''Coarsen time resolution using a Gaussian kernel The behavior of bins, as defined either by start, stop and step (or step_style) or by the bins argument, is to have all bins except the last one be half open. That is if bins are defined as bins = [1,2,3,4], bins will be [1,2), [2,3), [3,4]. This is the default behaviour of our binning functionality (upon which this function is based). Parameters ---------- t : 1d array the original time axis y : 1d array values on the original time axis h : float kernel e-folding scale. Default value is None, in which case the median time step will be used. If the median time step results in a series with nan values, the maximum time step will be used. Note that if this variable is too small, this method may return nan values in parts of the series. step : float The interpolation step. Default is max spacing between consecutive points. start : float where/when to start the interpolation. Default is min(t). stop : float where/when to stop the interpolation. Default is max(t). step_style : str step style to be applied from 'increments' [default = 'max'] evenly_spaced : {True,False} Makes the series evenly-spaced. This option is ignored if bins are passed. This option is being deprecated, no_nans should be used instead. bin_edges : array The right hand edge of bins to use for binning. E.g. if bins = [1,2,3,4], bins will be [1,2), [2,3), [3,4]. Same behavior as scipy.stats.binned_statistic Start, stop, step, and step_style will be ignored if this is passed. time_axis : np.ndarray The time axis to use for binning. If passed, bin_edges will be set as the midpoints between times. The first time will be used as the left most edge, the last time will be used as the right most edge. Start, stop, bin_size, and step_style will be ignored if this is passed. no_nans : bool; {True,False} Sets the step_style to max, ensuring that the resulting series contains no empty values (nans). Default is True. Returns ------- tc : 1d array the coarse-grained time axis yc: 1d array The coarse-grained time series Notes ----- `start`, `stop`, `step`, and `step_style` are interpreted as defining the `bin_edges` for this function. This differs from the `interp` interpretation, which uses these to define the time axis over which interpolation is applied. For `gkernel`, the time axis will be specified as the midpoints between `bin_edges`, unless `time_axis` is explicitly passed. References ---------- Rehfeld, K., Marwan, N., Heitzig, J., and Kurths, J.: Comparison of correlation analysis techniques for irregularly sampled time series, Nonlin. Processes Geophys., 18, 389–404, doi:10.5194/npg-18-389-2011, 2011. See also -------- pyleoclim.utils.tsutils.increments : Establishes the increments of a numerical array pyleoclim.utils.tsutils.make_even_axis : Create an even time axis pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.interp : Interpolate y onto a new x-axis Examples -------- There are several ways to specify the way coarsening is done via this function. Within these there is a hierarchy which we demonstrate below. Top priority is given to `bin_edges` if it is not None. All other arguments will be ignored (except for x and y). The resulting time axis will be comprised of the midpoints between bin edges. .. jupyter-execute:: import numpy as np import pyleoclim as pyleo x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xc,yc = pyleo.utils.tsutils.gkernel(x,y,bin_edges=[1,4,8,12,16,20]) xc Next, priority will go to `time_axis` if it is passed. In this case, bin edges will be taken as the midpoints between time axis points. The first and last time point will be used as the left most and right most bin edges. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xc,yc = pyleo.utils.tsutils.gkernel(x,y,time_axis=[1,4,8,12,16,20]) xc If `time_axis` is None, `step` will be considered, overriding `step_style` if it is passed. `start` and `stop` will be generated using defaults if not passed. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xc,yc = pyleo.utils.tsutils.gkernel(x,y,step=2) xc If both `time_axis` and `step` are None but `step_style` is specified, the step will be generated using the prescribed `step_style`. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xc,yc = pyleo.utils.tsutils.gkernel(x,y,step_style='max') xc If none of these are specified, the mean spacing will be used. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xc,yc = pyleo.utils.tsutils.gkernel(x,y) xc ''' if len(t) != len(y): raise ValueError('y and t must have the same length') if evenly_spaced: no_nans=True warnings.warn('`evenly_spaced` is being deprecated. Please switch to using the option `no_nans` (behaviour is identical).',DeprecationWarning,stacklevel=2) # Make sure x and y are numpy arrays t = np.array(t, dtype='float64') y = np.array(y, dtype='float64') # Set the bin edges if bin_edges is not None: bin_edges = np.array(bin_edges) if start is not None or stop is not None or step is not None or step_style is not None or time_axis is not None: warnings.warn('Bins have been passed with other axis relevant arguments {start,stop,step,step_style,time_axis}. Bin_edges take priority and will be used.',stacklevel=2) time_axis = (bin_edges[1:] + bin_edges[:-1])/2 # A bit of wonk is required to get the proper bin edges from the time axis elif time_axis is not None: time_axis = np.array(time_axis) if start is not None or stop is not None or step is not None or step_style is not None: warnings.warn('The time axis has been passed with other axis relevant arguments {start,stop,step,step_style}. Time_axis takes priority and will be used.',stacklevel=2) bin_edges = np.zeros(len(time_axis)+1) bin_edges[0] = time_axis[0] bin_edges[-1] = time_axis[-1] bin_edges[1:-1] = (time_axis[1:]+time_axis[:-1])/2 else: bin_edges = make_even_axis(x=t,start=start,stop=stop,step=step,step_style=step_style,no_nans=no_nans) time_axis = (bin_edges[1:]+bin_edges[:-1])/2 kernel = lambda x, s : 1.0/(s*np.sqrt(2*np.pi))*np.exp(-0.5*(x/s)**2) # define kernel function if h is None: h = np.median(t) yc = np.zeros((len(time_axis))) yc[:] = np.nan for i in range(len(bin_edges)-1): if i < len(bin_edges)-1: xslice = t[(t>=bin_edges[i])&(t<bin_edges[i+1])] yslice = y[(t>=bin_edges[i])&(t<bin_edges[i+1])] else: xslice = t[(t>=bin_edges[i])&(t<=bin_edges[i+1])] yslice = y[(t>=bin_edges[i])&(t<=bin_edges[i+1])] if len(xslice)>0: d = xslice-time_axis[i] weight = kernel(d,h) yc[i] = sum(weight*yslice)/sum(weight) # normalize by the sum of weights else: yc[i] = np.nan if no_nans: check = np.isnan(yc).any() if check: warnings.warn('no_nans is set to True but nans are present in the series. It may have been overridden by other parameters. See tsutils.gkernel() documentation for details on parameter hierarchy, and check that your h parameter is large enough.',stacklevel=2) return time_axis, yc
[docs]def increments(x,step_style='median'): '''Establishes the increments of a numerical array: start, stop, and representative step. Parameters ---------- x : array step_style : str Method to obtain a representative step if x is not evenly spaced. Valid entries: 'median' [default], 'mean', 'mode' or 'max' The mode is the most frequent entry in a dataset, and may be a good choice if the timeseries is nearly equally spaced but for a few gaps. Max is a conservative choice, appropriate for binning methods and Gaussian kernel coarse-graining Returns ------- start : float min(x) stop : float max(x) step : float The representative spacing between consecutive values, computed as above See also -------- pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel ''' start = np.nanmin(x) stop = np.nanmax(x) delta = np.diff(x) if step_style == 'mean': step = delta.mean() elif step_style == 'max': step = delta.max() elif step_style == 'mode': step = stats.mode(delta)[0][0] else: step = np.median(delta) return start, stop, step
[docs]def interp(x,y, interp_type='linear', step=None, start=None, stop=None, step_style=None, time_axis=None,**kwargs): """ Interpolate y onto a new x-axis Largely a wrapper for `scipy.interpolate.interp1d <https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html>`_. Parameters ---------- x : array The x-axis y : array The y-axis interp_type : str Options include: 'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next' where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. step : float The interpolation step. Default is mean spacing between consecutive points. Step_style will be ignored if this is passed. start : float Where/when to start the interpolation. Default is the minimum. stop : float Where/when to stop the interpolation. Default is the maximum. step_style : str; {'min','mean','median','max'} Step style to use when determining the size of the interval between points. Default is None. time_axis : np.ndarray Time axis onto which the series will be interpolated. Start, stop, step, and step_style will be ignored if this is passed kwargs : kwargs Aguments specific to interpolate.interp1D. If getting an error about extrapolation, you can use the arguments `bound_errors=False` and `fill_value="extrapolate"` to allow for extrapolation. Returns ------- xi : array The interpolated x-axis yi : array The interpolated y values Notes ----- `start`, `stop`, `step` and `step_styl`e pertain to the creation of the time axis over which interpolation will be conducted. This differs from the way that the functions `bin` and `gkernel` interpret these arguments, which is as defining the `bin_edges` parameter used in those functions. See Also -------- pyleoclim.utils.tsutils.increments : Establishes the increments of a numerical array pyleoclim.utils.tsutils.make_even_axis : Makes an evenly spaced time axis pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel Examples -------- There are several ways to specifiy a time axis for interpolation. Within these there is a hierarchy which we demonstrate below. Top priority will always go to `time_axis` if it is passed. All other arguments will be overwritten (except for x,y, and interp_type). .. jupyter-execute:: import numpy as np import pyleoclim as pyleo x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xi,yi = pyleo.utils.tsutils.interp(x,y,time_axis=[1,4,8,12,16]) xi If `time_axis` is None, `step` will be considered, overriding `step_style if it is passed. `start` and `stop` will be generated using defaults if not passed. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xi,yi = pyleo.utils.tsutils.interp(x,y,step=2) xi If both `time_axis` and `step` are None but `step_style` is specified, the step will be generated using the prescribed `step_style`. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xi,yi = pyleo.utils.tsutils.interp(x,y,step_style='max') xi If none of these are specified, the mean spacing will be used. .. jupyter-execute:: x = np.array([1,2,3,5,8,12,20]) y = np.ones(len(x)) xi,yi = pyleo.utils.tsutils.interp(x,y) xi """ #Make sure x and y are numpy arrays x = np.array(x,dtype='float64') y = np.array(y,dtype='float64') #Drop nans if present before interpolating if np.isnan(y).any(): y,x = dropna(y,x) # get the evenly spaced time axis if one is not passed. if time_axis is not None: if start is not None or stop is not None or step is not None or step_style is not None: warnings.warn('A time axis has been passed with other time axis relevant arguments {start,stop,step,step_style}. The passed time axis takes priority and will be used.',stacklevel=3) pass else: time_axis = make_even_axis(x=x,start=start,stop=stop,step=step,step_style=step_style) #Make sure the data is increasing data = pd.DataFrame({"x-axis": x, "y-axis": y}).sort_values('x-axis') time_axis = np.sort(time_axis) # Add arguments yi = interpolate.interp1d(data['x-axis'],data['y-axis'],kind=interp_type,**kwargs)(time_axis) return time_axis, yi
[docs]def standardize(x, scale=1, axis=0, ddof=0, eps=1e-3): """Centers and normalizes a time series. Constant or nearly constant time series not rescaled. Parameters ---------- x : array vector of (real) numbers as a time series, NaNs allowed scale : real A scale factor used to scale a record to a match a given variance axis : int or None axis along which to operate, if None, compute over the whole array ddof : int degress of freedom correction in the calculation of the standard deviation eps : real a threshold to determine if the standard deviation is too close to zero Returns ------- z : array The standardized time series (z-score), Z = (X - E[X])/std(X)*scale, NaNs allowed mu : real The mean of the original time series, E[X] sig : real The standard deviation of the original time series, std[X] References ---------- Tapio Schneider's MATLAB code: https://github.com/tapios/RegEM/blob/master/standardize.m The zscore function in SciPy: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.zscore.html See also -------- pyleoclim.utils.tsutils.preprocess : pre-processes a times series using standardization and detrending. """ x = np.asanyarray(x) assert x.ndim <= 2, 'The time series x should be a vector or 2-D array!' mu = np.nanmean(x, axis=axis) # the mean of the original time series sig = np.nanstd(x, axis=axis, ddof=ddof) # the standard deviation of the original time series mu2 = np.asarray(np.copy(mu)) # the mean used in the calculation of zscore sig2 = np.asarray(np.copy(sig) / scale) # the standard deviation used in the calculation of zscore if np.any(np.abs(sig) < eps): # check if x contains (nearly) constant time series warnings.warn('Constant or nearly constant time series not rescaled.',stacklevel=2) where_const = np.abs(sig) < eps # find out where we have (nearly) constant time series # if a vector is (nearly) constant, keep it the same as original, i.e., substract by 0 and divide by 1. mu2[where_const] = 0 sig2[where_const] = 1 if axis and mu.ndim < x.ndim: z = (x - np.expand_dims(mu2, axis=axis)) / np.expand_dims(sig2, axis=axis) else: z = (x - mu2) / sig2 return z, mu, sig
[docs]def center(y, axis=0): """ Centers array y (i.e. removes the sample mean) Parameters ---------- y : array Vector of (real) numbers as a time series, NaNs allowed axis : int or None Axis along which to operate, if None, compute over the whole array Returns ------- yc : array The centered time series, yc = (y - ybar), NaNs allowed ybar : real The sampled mean of the original time series, y References ---------- Tapio Schneider's MATLAB code: https://github.com/tapios/RegEM/blob/master/center.m """ y = np.asanyarray(y) assert y.ndim <= 2, 'The time series y should be a vector or 2-D array!' ybar = np.nanmean(y, axis=axis) # the mean of the original time series if axis and ybar.ndim < y.ndim: yc = y - np.expand_dims(ybar, axis=axis) else: yc = y - ybar return yc, ybar
[docs]def ts2segments(ys, ts, factor=10): ''' Chop a time series into several segments based on gap detection. The rule of gap detection is very simple: we define the intervals between time points as dts, then if dts[i] is larger than factor * dts[i-1], we think that the change of dts (or the gradient) is too large, and we regard it as a breaking point and chop the time series into two segments here Parameters ---------- ys : array A time series, NaNs allowed ts : array The time points factor : float The factor that adjusts the threshold for gap detection Returns ------- seg_ys : list A list of several segments with potentially different lengths seg_ts : list A list of the time axis of the several segments n_segs : int The number of segments ''' ys, ts = clean_ts(ys, ts) nt = np.size(ts) dts = np.diff(ts) seg_ys, seg_ts = [], [] # store the segments with lists n_segs = 1 i_start = 0 for i in range(1, nt-1): if np.abs(dts[i]) > factor*np.abs(dts[i-1]): i_end = i + 1 seg_ys.append(ys[i_start:i_end]) seg_ts.append(ts[i_start:i_end]) i_start = np.copy(i_end) n_segs += 1 seg_ys.append(ys[i_start:nt]) seg_ts.append(ts[i_start:nt]) return seg_ys, seg_ts, n_segs
[docs]def annualize(ys, ts): ''' Annualize a time series whose time resolution is finer than 1 year Parameters ---------- ys : array A time series, NaNs allowed ts : array The time axis of the time series, NaNs allowed Returns ------- ys_ann : array the annualized time series year_int : array The time axis of the annualized time series ''' ys = np.asarray(ys, dtype=float) ts = np.asarray(ts, dtype=float) assert ys.size == ts.size, 'The size of time axis and data value should be equal!' year_int = list(set(np.floor(ts))) year_int = np.sort(list(map(int, year_int))) n_year = len(year_int) year_int_pad = list(year_int) year_int_pad.append(np.max(year_int)+1) ys_ann = np.zeros(n_year) for i in range(n_year): t_start = year_int_pad[i] t_end = year_int_pad[i+1] t_range = (ts >= t_start) & (ts < t_end) ys_ann[i] = np.average(ys[t_range], axis=0) return ys_ann, year_int
[docs]def gaussianize(ys): """ Maps a 1D array to a Gaussian distribution using the inverse Rosenblatt transform The resulting array is mapped to a standard normal distribution, and therefore has zero mean and unit standard deviation. Using `gaussianize()` obviates the need for `standardize()`. Parameters ---------- ys : 1D Array e.g. a timeseries Returns ------- yg : 1D Array Gaussianized values of ys. References ---------- van Albada, S., and P. Robinson (2007), Transformation of arbitrary distributions to the normal distribution with application to EEG test-retest reliability, Journal of Neuroscience Methods, 161(2), 205 - 211, doi:10.1016/j.jneumeth.2006.11.004. See also -------- pyleoclim.utils.tsutils.standardize : Centers and normalizes a time series """ # Count only elements with data. n = ys[~np.isnan(ys)].shape[0] # Create a blank copy of the array. yg = copy.deepcopy(ys) yg[:] = np.NAN nz = np.logical_not(np.isnan(ys)) index = np.argsort(ys[nz]) rank = np.argsort(index) CDF = 1.*(rank+1)/(1.*n) - 1./(2*n) yg[nz] = np.sqrt(2)*special.erfinv(2*CDF - 1) return yg
[docs]def detrend(y, x=None, method="emd", n=1, preserve_mean = False, sg_kwargs=None): """Detrend a timeseries according to four methods Detrending methods include: "linear", "constant", using a low-pass Savitzky-Golay filter, and Empirical Mode Decomposition (default). Linear and constant methods use `scipy.signal.detrend <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html>`_., EMD uses `pyhht.emd.EMD <https://pyhht.readthedocs.io/en/stable/apiref/pyhht.html>`_. Parameters ---------- y : array The series to be detrended. x : array Abscissa for array y. Necessary for use with the Savitzky-Golay method, since the series should be evenly spaced. method : str The type of detrending: - "linear": the result of a linear least-squares fit to y is subtracted from y. - "constant": only the mean of data is subtracted. - "savitzky-golay", y is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. - "emd" (default): Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series n : int Works only if `method == 'emd'`. The number of smoothest modes to remove. preserve_mean : boolean flag to indicate whether the mean of the series should be preserved despite the detrending sg_kwargs : dict The parameters for the Savitzky-Golay filters. Returns ------- ys : array The detrended version of y. trend : array The removed trend. Only non-empty for EMD and Savitzy-Golay methods, since SciPy detrending does not retain the trends See also -------- pyleoclim.utils.filter.savitzky_golay : Filtering using Savitzy-Golay pyleoclim.utils.tsutils.preprocess : pre-processes a times series using standardization and detrending. """ y = np.array(y) mu = y.mean() if x is not None: x = np.array(x) if method == "linear": ys = signal.detrend(y,type='linear') trend = y - ys elif method == 'constant': ys = signal.detrend(y,type='constant') trend = y - ys elif method == "savitzky-golay": # Check that the timeseries is uneven and interpolate if needed if x is None: raise ValueError("An independent variable is needed for the Savitzky-Golay filter method") # Check whether the timeseries is unvenly-spaced and interpolate if needed if len(np.unique(np.diff(x)))>1: warnings.warn("Timeseries is not evenly-spaced, interpolating...") x_interp, y_interp = interp(x,y,bounds_error=False,fill_value='extrapolate') else: x_interp = x y_interp = y sg_kwargs = {} if sg_kwargs is None else sg_kwargs.copy() # Now filter y_filt = savitzky_golay(y_interp,**sg_kwargs) # Put it all back on the original x axis trend = np.interp(x,x_interp,y_filt) ys = y - trend elif method == "emd": imfs = EmpiricalModeDecomposition(y).decompose() if np.shape(imfs)[0] == 1: trend = np.zeros(np.size(y)) else: trend = np.sum(imfs[-n:], axis=0) # remove the n smoothest modes ys = y - trend else: raise KeyError('Unknown method. Use one of linear, constant, savitzky-golay, emd (case-sensitive)') if preserve_mean: ys = ys - ys.mean() + mu return ys, trend
[docs]def calculate_distances(ys, n_neighbors=None, NN_kwargs=None): """ Uses the scikit-learn unsupervised learner for implementing neighbor searches and calculate the distance between a point and its nearest neighbors to estimate epsilon for DBSCAN. Parameters ---------- ys : tnumpy.array the y-values for the timeseries n_neighbors : int, optional Number of neighbors to use by default for kneighbors queries. The default is None. NN_kwargs : dict, optional Other arguments for sklearn.neighbors.NearestNeighbors. The default is None. See: https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html#sklearn.neighbors.NearestNeighbors Returns ------- min_eps : int Minimum value for epsilon. max_eps : int Maximum value for epsilon. """ ys=standardize(ys)[0] ys=np.array(ys) if n_neighbors is None: # Lowest number of nearest neighbors neigh = NearestNeighbors(n_neighbors=2) nbrs = neigh.fit(ys.reshape(-1, 1)) distances, indices = nbrs.kneighbors(ys.reshape(-1, 1)) min_eps = np.min(distances) if min_eps<=0: min_eps=0.01 # Highest number of nearest neighbors neigh = NearestNeighbors(n_neighbors=len(ys)-1) nbrs = neigh.fit(ys.reshape(-1, 1)) distances, indices = nbrs.kneighbors(ys.reshape(-1, 1)) max_eps = np.max(distances) else: neigh = NearestNeighbors(n_neighbors=n_neighbors) nbrs = neigh.fit(ys.reshape(-1, 1)) distances, indices = nbrs.kneighbors(ys.reshape(-1, 1)) min_eps = np.min(distances) max_eps = np.max(distances) return min_eps, max_eps
[docs]def detect_outliers_DBSCAN(ys, nbr_clusters = None, eps=None, min_samples=None, n_neighbors=None, metric='euclidean', NN_kwargs= None, DBSCAN_kwargs=None): """ Uses the unsupervised learning DBSCAN algorithm to identify outliers in timeseries data. The algorithm uses the silhouette score calculated over a range of epsilon and minimum sample values to determine the best clustering. In this case, we take the largest silhouette score (as close to 1 as possible). The DBSCAN implementation used here is from scikit-learn: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html The Silhouette Coefficient is calculated using the mean intra-cluster distance (a) and the mean nearest-cluster distance (b) for each sample. The best value is 1 and the worst value is -1. Values near 0 indicate overlapping clusters. Negative values generally indicate that a sample has been assigned to the wrong cluster, as a different cluster is more similar. For additional details, see: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html Parameters ---------- ys : numpy.array The y-values for the timeseries data. nbr_clusters : int, optional Number of clusters. Note that the DBSCAN algorithm calculates the number of clusters automatically. This paramater affects the optimization over the silhouette score. The default is None. eps : float or list, optional epsilon. The default is None, which allows the algorithm to optimize for the best value of eps, using the silhouette score as the optimization criterion. The maximum distance between two samples for one to be considered as in the neighborhood of the other. This is not a maximum bound on the distances of points within a cluster. This is the most important DBSCAN parameter to choose appropriately for your data set and distance function. min_samples : int or list, optional The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. This includes the point itself.. The default is None and optimized using the silhouette score n_neighbors : int, optional Number of neighbors to use by default for kneighbors queries, which can be used to calculate a range of plausible eps values. The default is None. metric : str, optional The metric to use when calculating distance between instances in a feature array. The default is 'euclidean'. See https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html for alternative values. NN_kwargs : dict, optional Other arguments for sklearn.neighbors.NearestNeighbors. The default is None. See: https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html#sklearn.neighbors.NearestNeighbors DBSCAN_kwargs : dict, optional Other arguments for sklearn.cluster.DBSCAN. The default is None. See: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html Returns ------- indices : list list of indices that are considered outliers. res : pandas.DataFrame Results of the clustering analysis. Contains information about eps value, min_samples value, number of clusters, the silhouette score, the indices of the outliers for each combination, and the cluster assignment for each point. """ NN_kwargs = {} if NN_kwargs is None else NN_kwargs.copy() DBSCAN_kwargs = {} if DBSCAN_kwargs is None else DBSCAN_kwargs.copy() ys=standardize(ys)[0] # standardization is key for the alogrithm to work. ys=np.array(ys) if eps and n_neighbors: print('Since eps is passed, ignoring the n_neighbors for distance calculation') if eps is None: min_eps,max_eps = calculate_distances(ys, n_neighbors=n_neighbors, NN_kwargs=NN_kwargs) eps_list = np.linspace(min_eps,max_eps,50) elif type(eps) is list: eps_list=eps else: print("You have tried to pass a float or integer, coercing to a list") eps_list=list(eps) if min_samples is None: min_samples_list = np.linspace(2,len(ys)/4,50,dtype='int') elif type(min_samples) is list: min_samples_list = min_samples else: print("You have tried to pass a float or integer, coercing to a list") min_samples_list=list(min_samples) print("Optimizing for the best number of clusters, this may take a few minutes") nbr_clusters=[] sil_score =[] eps_matrix=[] min_sample_matrix=[] idx_out = [] clusters = [] for eps_item in eps_list: for min_samples_item in min_samples_list: eps_matrix.append(eps_item) min_sample_matrix.append(min_samples_item) m = DBSCAN(eps=eps_item, min_samples=min_samples_item,**DBSCAN_kwargs) m.fit(ys.reshape(-1,1)) nbr_clusters.append(len(np.unique(m.labels_))-1) try: sil_score.append(silhouette_score(ys.reshape(-1,1), m.labels_)) except: sil_score.append(np.nan) idx_out.append(np.where(m.labels_==-1)[0]) clusters.append(m.labels_) res = pd.DataFrame({'eps':eps_matrix,'min_samples':min_sample_matrix,'number of clusters':nbr_clusters,'silhouette score':sil_score,'outlier indices':idx_out,'clusters':clusters}) if nbr_clusters is None: res_sil = res.loc[res['silhouette score']==np.max(res['silhouette score'])] else: try: res_cl = res.loc[res['number of clusters']==nbr_clusters] res_sil = res_cl.loc[res_cl['silhouette score']==np.max(res_cl['silhouette score'])] except: print("No valid solutions for the number of clusters, returning from silhouette score") res_sil = res.loc[res['silhouette score']==np.max(res['silhouette score'])] unique_idx = list(res_sil['outlier indices'].iloc[0]) if res_sil.shape[0]>1: for idx,row in res_sil.iterrows(): for item in row['outlier indices']: if item not in unique_idx: unique_idx.append(item) indices = np.array(unique_idx) return indices, res
[docs]def detect_outliers_kmeans(ys, nbr_clusters = None, max_cluster = 10, threshold=3, LOF=False, n_frac=0.9, contamination='auto', kmeans_kwargs=None): """ Outlier detection using the unsupervised alogrithm kmeans. The algorithm runs through various number of clusters and optimizes based on the silhouette score. KMeans implementation: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html The Silhouette Coefficient is calculated using the mean intra-cluster distance (a) and the mean nearest-cluster distance (b) for each sample. The best value is 1 and the worst value is -1. Values near 0 indicate overlapping clusters. Negative values generally indicate that a sample has been assigned to the wrong cluster, as a different cluster is more similar. For additional details, see: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html Outliers are identified based on their distance from the clusters. This can be done in two ways: (1) by using a threshold that corresponds to the Euclidean distance from the centroid and (2) using the Local Outlier Function (https://scikit-learn.org/stable/auto_examples/neighbors/plot_lof_outlier_detection.html) Parameters ---------- ys : numpy.array The y-values for the timeseries data nbr_clusters : int or list, optional A user number of clusters to considered. The default is None. max_cluster : int, optional The maximum number of clusters to consider in the optimization based on the Silhouette Score. The default is 10. threshold : int, optional The algorithm uses the euclidean distance for each point in the cluster to identify the outliers. This parameter sets the threshold on the euclidean distance to define an outlier. The default is 3. LOF : bool, optional By default, detect_outliers_kmeans uses euclidean distance for outlier detection. Set LOF to True to use LocalOutlierFactor for outlier detection. n_frac : float, optional The percentage of the time series length (the length, representing number of points) to be used to set the n_neighbors parameter for the LOF function in scikit-learn. We recommend using at least 50% (n_frac=0.5) of the timeseries. You cannot use 100% (n_frac!=1) contamination : ('auto', float), optional Same as LOF parameter from scikit-learn. We recommend using the default mode of auto. See: https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.LocalOutlierFactor.html for details. kmeans_kwargs : dict, optional Other parameters for the kmeans function. See: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html for details. The default is None. Returns ------- indices : list list of indices that are considered outliers. res : pandas.DataFrame Results of the clustering analysis. Contains information about number of clusters, the silhouette score, the indices of the outliers for each combination, and the cluster assignment for each point. """ kmeans_kwargs = {} if kmeans_kwargs is None else kmeans_kwargs.copy() ys=standardize(ys)[0] # standardization is key for the alogrithm to work. ys=np.array(ys) # run with either one cluster number of several if nbr_clusters is not None: if type(nbr_clusters) == list: range_n_clusters = nbr_clusters else: range_n_clusters = [nbr_clusters] else: range_n_clusters = np.arange(2,max_cluster+1,1,dtype='int') silhouette_avg = [] idx_out=[] clusters = [] for num_clusters in range_n_clusters: kmeans = KMeans(n_clusters=num_clusters) kmeans.fit(ys.reshape(-1, 1), **kmeans_kwargs) silhouette_avg.append(silhouette_score(ys.reshape(-1, 1), kmeans.labels_)) center=kmeans.cluster_centers_[kmeans.labels_,0] if LOF: model = LocalOutlierFactor(n_neighbors=int(ys.size*n_frac), contamination=contamination) pred = model.fit_predict(ys.reshape(-1,1)) idx_out.append(np.where(pred==-1)) else: distance=np.sqrt((ys-center)**2) idx_out.append(np.argwhere(distance>threshold).reshape(1,-1)[0]) clusters.append(kmeans.labels_) res = pd.DataFrame({'number of clusters':range_n_clusters, 'silhouette score':silhouette_avg,'outlier indices':idx_out,'clusters':clusters}) res_sil = res.loc[res['silhouette score']==np.max(res['silhouette score'])] unique_idx = list(res_sil['outlier indices'].iloc[0]) if res_sil.shape[0]>1: for idx,row in res_sil.iterrows(): for item in row['outlier indices']: if item not in unique_idx: unique_idx.append(item) indices = np.array(unique_idx) return indices, res
[docs]def remove_outliers(ts,ys,indices): """ Remove the outliers from timeseries data Parameters ---------- ts : numpy.array The time axis for the timeseries data. ys : numpy.array The y-values for the timeseries data. indices : numpy.array The indices of the outliers to be removed. Returns ------- ys : numpy.array The time axis for the timeseries data after outliers removal ts : numpy.array The y-values for the timeseries data after outliers removal """ ys = np.delete(ys,indices) ts = np.delete(ts,indices) return ys,ts
[docs]def eff_sample_size(y, detrend_flag=False): '''Effective Sample Size of timeseries y Parameters ---------- y : float 1d array detrend_flag : boolean if True (default), detrends y before estimation. Returns ------- neff : float The effective sample size References ---------- Thiébaux HJ and Zwiers FW, 1984: The interpretation and estimation of effective sample sizes. Journal of Climate and Applied Meteorology 23: 800–811. ''' if len(y) < 100: fft = False else: fft = True if detrend_flag: yd = detrend(y) else: yd = y n = len(y) nl = math.floor(max(np.sqrt(n),10)) # rule of thumb for choosing number of lags rho = sms.acf(yd,adjusted=True,fft=fft,nlags=nl) # compute autocorrelation function kvec = np.arange(nl) fac = (1-kvec/nl)*rho[1:] neff = n/(1+2*np.sum(fac)) # Thiébaux & Zwiers 84, Eq 2.1 return neff
# alias std = standardize gauss = gaussianize
[docs]def preprocess(ys, ts, detrend=False, sg_kwargs=None, gaussianize=False, standardize=True): ''' Return the processed time series using detrend and standardization. Parameters ---------- ys : array a time series ts : array The time axis for the timeseries. Necessary for use with the Savitzky-Golay filters method since the series should be evenly spaced. detrend : string 'none'/False/None - no detrending will be applied; 'emd' - the last mode is assumed to be the trend and removed from the series 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted. sg_kwargs : dict The parameters for the Savitzky-Golay filter. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- res : array the processed time series See also -------- pyleoclim.utils.tsutils.detrend : Detrend a timeseries according to four methods pyleoclim.utils.filter.savitzy_golay : Filtering using Savitzy-Golay method pyleoclim.utils.tsutils.standardize : Centers and normalizes a given time series pyleoclim.utils.tsutils.gaussianize : Quantile maps a matrix to a Gaussian distribution ''' if detrend == 'none' or detrend is False or detrend is None: ys_d = ys else: ys_d = detrend(ys, ts, method=detrend, sg_kwargs=sg_kwargs) if standardize: res, _, _ = std(ys_d) else: res = ys_d if gaussianize: res = gauss(res) return res
[docs]def make_even_axis(x=None,start=None,stop=None,step=None,step_style=None,no_nans=False): """Create a uniform time axis for binning/interpolating Parameters ---------- x : np.ndarray Uneven time axis upon which to base the uniform time axis. start : float Where to start the axis. Default is the first value of the passed time axis. stop : float Where to stop the axis. Default is the last of value of the passed time axis. step : float The step size to use for the axis. Must be greater than 0. step_style : str; {} Step style to use when defining the step size. Will be overridden by `step` if it is passed. no_nans : bool: {True,False} Whether or not to allow nans. When True, will set step style to 'max'. Will be overridden by `step_style` or `step` if they are passed. Default is False. ------- time_axis : np.ndarray An evenly spaced time axis. """ if start is None: if x is None: raise ValueError('If x is not passed then start, stop and step must be passed') else: start = x[0] if stop is None: if x is None: raise ValueError('If x is not passed then start, stop and step must be passed') else: stop = x[-1] if step is not None: pass elif step_style is not None: if x is None: raise ValueError('If x is not passed then start, stop and step must be passed') else: _, _, step = increments(np.asarray(x), step_style = step_style) elif no_nans: if x is None: raise ValueError('If x is not passed then start, stop and step must be passed') else: _, _, step = increments(np.asarray(x), step_style = 'max') else: if x is None: raise ValueError('If x is not passed then start, stop and step must be passed') else: _, _, step = increments(np.asarray(x), step_style = 'mean') new_axis = np.arange(start,stop+step,step) #Make sure that values in time_axis don't exceed the stop value if step > 0: if max(new_axis) > stop: time_axis = np.array([t for t in new_axis if t <= stop]) else: time_axis = new_axis elif step < 0: if min(new_axis) < stop: time_axis = np.array([t for t in new_axis if t >= stop]) else: time_axis = new_axis else: raise ValueError('Step must be nonzero') return time_axis
[docs]def phaseran(recblk, nsurr): ''' Simultaneous phase randomization of a set of time series It creates blocks of surrogate data with the same second order properties as the original time series dataset by transforming the original data into the frequency domain, randomizing the phases simultaneoulsy across the time series and converting the data back into the time domain. Written by Carlos Gias for MATLAB http://www.mathworks.nl/matlabcentral/fileexchange/32621-phase-randomization/content/phaseran.m Parameters ---------- recblk : numpy array 2D array , Row: time sample. Column: recording. An odd number of time samples (height) is expected. If that is not the case, recblock is reduced by 1 sample before the surrogate data is created. The class must be double and it must be nonsparse. nsurr : int is the number of image block surrogates that you want to generate. Returns ------- surrblk : numpy array 3D multidimensional array image block with the surrogate datasey along the third dimension See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- - Prichard, D., Theiler, J. Generating Surrogate Data for Time Series with Several Simultaneously Measured Variables (1994) Physical Review Letters, Vol 73, Number 7 - Carlos Gias (2020). Phase randomization, MATLAB Central File Exchange ''' # Get parameters nfrms = recblk.shape[0] if nfrms % 2 == 0: nfrms = nfrms-1 recblk = recblk[0:nfrms] len_ser = int((nfrms-1)/2) interv1 = np.arange(1, len_ser+1) interv2 = np.arange(len_ser+1, nfrms) # Fourier transform of the original dataset fft_recblk = np.fft.fft(recblk) surrblk = np.zeros((nfrms, nsurr)) # for k in tqdm(np.arange(nsurr)): for k in np.arange(nsurr): ph_rnd = np.random.rand(len_ser) # Create the random phases for all the time series ph_interv1 = np.exp(2*np.pi*1j*ph_rnd) ph_interv2 = np.conj(np.flipud(ph_interv1)) # Randomize all the time series simultaneously fft_recblk_surr = np.copy(fft_recblk) fft_recblk_surr[interv1] = fft_recblk[interv1] * ph_interv1 fft_recblk_surr[interv2] = fft_recblk[interv2] * ph_interv2 # Inverse transform surrblk[:, k] = np.real(np.fft.ifft(fft_recblk_surr)) return surrblk
[docs]def phaseran2(y, nsurr): ''' Phase randomization of a time series y, of even or odd length. Closely follows this strategy: https://stackoverflow.com/q/39543002 Parameters ---------- y : array, length nt Signal to be scrambled nsurr : int is the number of image block surrogates that you want to generate. Returns ------- ysurr: array nt x nsurr Array of y surrogates ''' from scipy.fft import fft, ifft nt = len(y); n2 = nt //2 ys = fft(y) pow_ys = np.abs(ys) ** 2. phase_ys = np.angle(ys) ysurr = np.zeros((nt, nsurr)) for i in range(nsurr): phase_ysr = np.empty_like(phase_ys) if nt % 2 == 0: # deal with even and odd-length arrays phase_ysr_lh = np.random.rand(n2-1) phase_ysr_rh = -phase_ysr_lh[::-1] phase_ysr = np.concatenate((np.array((phase_ys[0],)), phase_ysr_lh, np.array((phase_ys[n2],)), phase_ysr_rh)) else: phase_ysr_lh = np.random.rand(n2) phase_ysr_rh = -phase_ysr_lh[::-1] phase_ysr = np.concatenate((np.array((phase_ys[0],)), phase_ysr_lh, phase_ysr_rh)) # put it back together ysrp = np.sqrt(pow_ys) * np.exp(2*np.pi*1j*phase_ysr) ysurr[:,i] = ifft(ysrp).real return ysurr