Source code for pyleoclim.utils.tsutils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities to manipulate timeseries - useful for preprocessing prior to analysis
"""

__all__ = [
    'simple_stats',
    'bin',
    'interp',
    'gkernel',
    'standardize',
    'ts2segments',
    'annualize',
    'gaussianize',
    'detrend',
    'remove_outliers'
]


import numpy as np
import pandas as pd
import warnings
import copy
from scipy import special
from scipy import signal
from scipy import interpolate
from scipy import stats
from pyhht import EMD
from sklearn.cluster import DBSCAN
import matplotlib.pyplot as plt

from sklearn.neighbors import NearestNeighbors
import statsmodels.tsa.stattools as sms

import math
from sys import exit
from .plotting import plot_scatter_xy, plot_xy, savefig
from .filter import savitzky_golay

from .tsbase import (
    clean_ts
)

[docs]def simple_stats(y, axis=None): """ Computes simple statistics Computes the mean, median, min, max, standard deviation, and interquartile range of a numpy array y, ignoring NaNs. Parameters ---------- y: array A Numpy array axis : int, tuple of ints Axis or Axes along which the means are computed, the default is to compute the mean of the flattened array. If a tuple of ints, performed over multiple axes Returns ------- mean : float mean of y, ignoring NaNs median : float median of y, ignoring NaNs min_ : float mininum value in y, ignoring NaNs max_ : float max value in y, ignoring NaNs std : float standard deviation of y, ignoring NaNs IQR : float Interquartile range of y along specified axis, ignoring NaNs """ # make sure that y is an array y = np.array(y, dtype='float64') # Perform the various calculations mean = np.nanmean(y, axis=axis) std = np.nanstd(y, axis=axis) median = np.nanmedian(y, axis=axis) min_ = np.nanmin(y, axis=axis) max_ = np.nanmax(y, axis=axis) IQR = np.nanpercentile(y, 75, axis=axis) - np.nanpercentile(y, 25, axis=axis) return mean, median, min_, max_, std, IQR
[docs]def bin(x, y, bin_size=None, start=None, stop=None, evenly_spaced = True): """ Bin the values Parameters ---------- x : array The x-axis series. y : array The y-axis series. bin_size : float The size of the bins. Default is the mean resolution if evenly_spaced is not True start : float Where/when to start binning. Default is the minimum stop : float When/where to stop binning. Default is the maximum evenly_spaced : {True,False} Makes the series evenly-spaced. This option is ignored if bin_size is set to float Returns ------- binned_values : array The binned values bins : array The bins (centered on the median, i.e., the 100-200 bin is 150) n : array Number of data points in each bin error : array The standard error on the mean in each bin See also -------- pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel pyleoclim.utils.tsutils.interp : Interpolate y onto a new x-axis """ # Make sure x and y are numpy arrays x = np.array(x, dtype='float64') y = np.array(y, dtype='float64') if bin_size is not None and evenly_spaced == True: warnings.warn('The bin_size has been set, the series may not be evenly_spaced') # Get the bin_size if not available if bin_size is None: if evenly_spaced == True: bin_size = np.nanmax(np.diff(x)) else: bin_size = np.nanmean(np.diff(x)) # Get the start/stop if not given if start is None: start = np.nanmin(x) if stop is None: stop = np.nanmax(x) # Set the bin medians bins = np.arange(start+bin_size/2, stop + bin_size/2, bin_size) # Perform the calculation binned_values = [] n = [] error = [] for val in np.nditer(bins): idx = [idx for idx, c in enumerate(x) if c >= (val-bin_size/2) and c < (val+bin_size/2)] if y[idx].size == 0: binned_values.append(np.nan) n.append(np.nan) error.append(np.nan) else: binned_values.append(np.nanmean(y[idx])) n.append(y[idx].size) error.append(np.nanstd(y[idx])) res_dict = { 'bins': bins, 'binned_values': binned_values, 'n': n, 'error': error, } return res_dict
[docs]def gkernel(t,y, h = 3.0, step=None,start=None,stop=None, step_style = 'max'): '''Coarsen time resolution using a Gaussian kernel Parameters ---------- t : 1d array the original time axis y : 1d array values on the original time axis h : float kernel e-folding scale step : float The interpolation step. Default is max spacing between consecutive points. start : float where/when to start the interpolation. Default is min(t). stop : float where/when to stop the interpolation. Default is max(t). step_style : str step style to be applied from 'increments' [default = 'max'] Returns ------- tc : 1d array the coarse-grained time axis yc: 1d array The coarse-grained time series References ---------- Rehfeld, K., Marwan, N., Heitzig, J., and Kurths, J.: Comparison of correlation analysis techniques for irregularly sampled time series, Nonlin. Processes Geophys., 18, 389–404, https://doi.org/10.5194/npg-18-389-2011, 2011. See also -------- pyleoclim.utils.tsutils.increments : Establishes the increments of a numerical array pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.interp : Interpolate y onto a new x-axis ''' if len(t) != len(y): raise ValueError('y and t must have the same length') # get the interpolation step if not provided if step is None: _, _, step = increments(np.asarray(t), step_style = step_style) # Get the start and end point if not given if start is None: start = np.nanmin(np.asarray(t)) if stop is None: stop = np.nanmax(np.asarray(t)) # Get the uniform time axis. tc = np.arange(start,stop+step,step) kernel = lambda x, s : 1.0/(s*np.sqrt(2*np.pi))*np.exp(-0.5*(x/s)**2) # define kernel function yc = np.zeros((len(tc))) yc[:] = np.nan for i in range(len(tc)-1): xslice = t[(t>=tc[i])&(t<tc[i+1])] yslice = y[(t>=tc[i])&(t<tc[i+1])] if len(xslice)>0: d = xslice-tc[i] weight = kernel(d,h) yc[i] = sum(weight*yslice)/sum(weight) else: yc[i] = np.nan return tc, yc
def increments(x,step_style='median'): ''' Establishes the increments of a numerical array: start, stop, and representative step. Parameters ---------- x : array step_style : str Method to obtain a representative step if x is not evenly spaced. Valid entries: 'median' [default], 'mean', 'mode' or 'max' The mode is the most frequent entry in a dataset, and may be a good choice if the timeseries is nearly equally spaced but for a few gaps. Max is a conservative choice, appropriate for binning methods and Gaussian kernel coarse-graining Returns ------- start : float min(x) stop : float max(x) step : float The representative spacing between consecutive values, computed as above See also -------- pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel ''' start = np.nanmin(x) stop = np.nanmax(x) delta = np.diff(x) if step_style == 'mean': step = delta.mean() elif step_style == 'max': step = delta.max() elif step_style == 'mode': step = stats.mode(delta)[0][0] else: step = np.median(delta) return start, stop, step
[docs]def interp(x,y, interp_type='linear', step=None,start=None,stop=None, step_style= 'mean',**kwargs): """ Interpolate y onto a new x-axis Largely a wrapper for [scipy.interpolate.interp1d](https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html) Parameters ---------- x : array The x-axis y : array The y-axis interp_type : str Options include: 'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next' where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. step : float The interpolation step. Default is mean spacing between consecutive points. start : float where/when to start the interpolation. Default is min.. stop : float where/when to stop the interpolation. Default is max. kwargs : kwargs Aguments specific to interpolate.interp1D. If getting an error about extrapolation, you can use the arguments `bound_errors=False` and `fill_value="extrapolate"` to allow for extrapolation. Returns ------- xi : array The interpolated x-axis yi : array The interpolated y values See Also -------- pyleoclim.utils.tsutils.increment : Establishes the increments of a numerical array pyleoclim.utils.tsutils.bin : Bin the values pyleoclim.utils.tsutils.gkernel : Coarsen time resolution using a Gaussian kernel """ #Make sure x and y are numpy arrays x = np.array(x,dtype='float64') y = np.array(y,dtype='float64') # get the interpolation step if not available if step is None: _, _, step = increments(np.asarray(x), step_style = step_style) # Get the start and end point if not given if start is None: start = np.nanmin(np.asarray(x)) if stop is None: stop = np.nanmax(np.asarray(x)) # Get the interpolated x-axis. xi = np.arange(start,stop,step) #Make sure the data is increasing data = pd.DataFrame({"x-axis": x, "y-axis": y}).sort_values('x-axis') # Add arguments yi = interpolate.interp1d(data['x-axis'],data['y-axis'],kind=interp_type,**kwargs)(xi) return xi, yi
[docs]def standardize(x, scale=1, axis=0, ddof=0, eps=1e-3): """Centers and normalizes a time series. Constant or nearly constant time series not rescaled. Parameters ---------- x : array vector of (real) numbers as a time series, NaNs allowed scale : real A scale factor used to scale a record to a match a given variance axis : int or None axis along which to operate, if None, compute over the whole array ddof : int degress of freedom correction in the calculation of the standard deviation eps : real a threshold to determine if the standard deviation is too close to zero Returns ------- z : array The standardized time series (z-score), Z = (X - E[X])/std(X)*scale, NaNs allowed mu : real The mean of the original time series, E[X] sig : real The standard deviation of the original time series, std[X] References ---------- Tapio Schneider's MATLAB code: https://github.com/tapios/RegEM/blob/master/standardize.m The zscore function in SciPy: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.zscore.html See also -------- pyleoclim.utils.tsutils.preprocess : pre-processes a times series using standardization and detrending. """ x = np.asanyarray(x) assert x.ndim <= 2, 'The time series x should be a vector or 2-D array!' mu = np.nanmean(x, axis=axis) # the mean of the original time series sig = np.nanstd(x, axis=axis, ddof=ddof) # the standard deviation of the original time series mu2 = np.asarray(np.copy(mu)) # the mean used in the calculation of zscore sig2 = np.asarray(np.copy(sig) / scale) # the standard deviation used in the calculation of zscore if np.any(np.abs(sig) < eps): # check if x contains (nearly) constant time series warnings.warn('Constant or nearly constant time series not rescaled.') where_const = np.abs(sig) < eps # find out where we have (nearly) constant time series # if a vector is (nearly) constant, keep it the same as original, i.e., substract by 0 and divide by 1. mu2[where_const] = 0 sig2[where_const] = 1 if axis and mu.ndim < x.ndim: z = (x - np.expand_dims(mu2, axis=axis)) / np.expand_dims(sig2, axis=axis) else: z = (x - mu2) / sig2 return z, mu, sig
def center(y, axis=0): """ Centers array y (i.e. removes the sample mean) Parameters ---------- y : array Vector of (real) numbers as a time series, NaNs allowed axis : int or None Axis along which to operate, if None, compute over the whole array Returns ------- yc : array The centered time series, yc = (y - ybar), NaNs allowed ybar : real The sampled mean of the original time series, y References ---------- Tapio Schneider's MATLAB code: https://github.com/tapios/RegEM/blob/master/center.m """ y = np.asanyarray(y) assert y.ndim <= 2, 'The time series y should be a vector or 2-D array!' ybar = np.nanmean(y, axis=axis) # the mean of the original time series if axis and ybar.ndim < y.ndim: yc = y - np.expand_dims(ybar, axis=axis) else: yc = y - ybar return yc, ybar
[docs]def ts2segments(ys, ts, factor=10): ''' Chop a time series into several segments based on gap detection. The rule of gap detection is very simple: we define the intervals between time points as dts, then if dts[i] is larger than factor * dts[i-1], we think that the change of dts (or the gradient) is too large, and we regard it as a breaking point and chop the time series into two segments here Parameters ---------- ys : array A time series, NaNs allowed ts : array The time points factor : float The factor that adjusts the threshold for gap detection Returns ------- seg_ys : list A list of several segments with potentially different lengths seg_ts : list A list of the time axis of the several segments n_segs : int The number of segments ''' ys, ts = clean_ts(ys, ts) nt = np.size(ts) dts = np.diff(ts) seg_ys, seg_ts = [], [] # store the segments with lists n_segs = 1 i_start = 0 for i in range(1, nt-1): if np.abs(dts[i]) > factor*np.abs(dts[i-1]): i_end = i + 1 seg_ys.append(ys[i_start:i_end]) seg_ts.append(ts[i_start:i_end]) i_start = np.copy(i_end) n_segs += 1 seg_ys.append(ys[i_start:nt]) seg_ts.append(ts[i_start:nt]) return seg_ys, seg_ts, n_segs
[docs]def annualize(ys, ts): ''' Annualize a time series whose time resolution is finer than 1 year Parameters ---------- ys : array A time series, NaNs allowed ts : array The time axis of the time series, NaNs allowed Returns ------- ys_ann : array the annualized time series year_int : array The time axis of the annualized time series ''' ys = np.asarray(ys, dtype=float) ts = np.asarray(ts, dtype=float) assert ys.size == ts.size, 'The size of time axis and data value should be equal!' year_int = list(set(np.floor(ts))) year_int = np.sort(list(map(int, year_int))) n_year = len(year_int) year_int_pad = list(year_int) year_int_pad.append(np.max(year_int)+1) ys_ann = np.zeros(n_year) for i in range(n_year): t_start = year_int_pad[i] t_end = year_int_pad[i+1] t_range = (ts >= t_start) & (ts < t_end) ys_ann[i] = np.average(ys[t_range], axis=0) return ys_ann, year_int
# def gaussianize(X): # """ Quantile maps a matrix to a Gaussian distribution # Parameters # ---------- # X : array # Timeseries arrayed by column # Returns # ------- # Xn : array # Gaussianized array # References # ---------- # van Albada, S., and P. Robinson (2007), Transformation of arbitrary # distributions to the normal distribution with application to EEG # test-retest reliability, Journal of Neuroscience Methods, 161(2), # 205 - 211, doi:10.1016/j.jneumeth.2006.11.004. # See also # -------- # pyleoclim.utils.tsutils.gaussianize_1d : Quantile maps a 1D array to a Gaussian distribution # """ # # Give every record at least one dimensions, or else the code will crash. # X = np.atleast_1d(X) # # Make a blank copy of the array, retaining the data type of the original data variable. # Xn = copy.deepcopy(X) # Xn[:] = np.NAN # if len(X.shape) == 1: # Xn = gaussianize_1d(X) # else: # for i in range(X.shape[1]): # Xn[:, i] = gaussianize_1d(X[:, i]) # return Xn
[docs]def gaussianize(ys): """ Maps a 1D array to a Gaussian distribution using the inverse Rosenblatt transform The resulting array is mapped to a standard normal distribution, and therefore has zero mean and unit standard deviation. Using `gaussianize()` obviates the need for `standardize()`. Parameters ---------- ys : 1D Array e.g. a timeseries Returns ------- yg : 1D Array Gaussianized values of ys. References ---------- van Albada, S., and P. Robinson (2007), Transformation of arbitrary distributions to the normal distribution with application to EEG test-retest reliability, Journal of Neuroscience Methods, 161(2), 205 - 211, doi:10.1016/j.jneumeth.2006.11.004. See also -------- pyleoclim.utils.tsutils.standardize : Centers and normalizes a time series """ # Count only elements with data. n = ys[~np.isnan(ys)].shape[0] # Create a blank copy of the array. yg = copy.deepcopy(ys) yg[:] = np.NAN nz = np.logical_not(np.isnan(ys)) index = np.argsort(ys[nz]) rank = np.argsort(index) CDF = 1.*(rank+1)/(1.*n) - 1./(2*n) yg[nz] = np.sqrt(2)*special.erfinv(2*CDF - 1) return yg
[docs]def detrend(y, x=None, method="emd", n=1, sg_kwargs=None): """Detrend a timeseries according to four methods Detrending methods include: "linear", "constant", using a low-pass Savitzky-Golay filter, and Empirical Mode Decomposition (default). Linear and constant methods use [scipy.signal.detrend](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html), EMD uses [pyhht.emd.EMD](https://pyhht.readthedocs.io/en/stable/apiref/pyhht.html) Parameters ---------- y : array The series to be detrended. x : array Abscissa for array y. Necessary for use with the Savitzky-Golay method, since the series should be evenly spaced. method : str The type of detrending: - "linear": the result of a linear least-squares fit to y is subtracted from y. - "constant": only the mean of data is subtracted. - "savitzky-golay", y is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. - "emd" (default): Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series n : int Works only if `method == 'emd'`. The number of smoothest modes to remove. sg_kwargs : dict The parameters for the Savitzky-Golay filters. Returns ------- ys : array The detrended timeseries. See also -------- pyleoclim.utils.filter.savitzky_golay : Filtering using Savitzy-Golay pyleoclim.utils.tsutils.preprocess : pre-processes a times series using standardization and detrending. """ y = np.array(y) if x is not None: x = np.array(x) if method == "linear": ys = signal.detrend(y,type='linear') elif method == 'constant': ys = signal.detrend(y,type='constant') elif method == "savitzky-golay": # Check that the timeseries is uneven and interpolate if needed if x is None: raise ValueError("An independent variable is needed for the Savitzky-Golay filter method") # Check whether the timeseries is unvenly-spaced and interpolate if needed if len(np.unique(np.diff(x)))>1: warnings.warn("Timeseries is not evenly-spaced, interpolating...") x_interp, y_interp = interp(x,y,bounds_error=False,fill_value='extrapolate') else: x_interp = x y_interp = y sg_kwargs = {} if sg_kwargs is None else sg_kwargs.copy() # Now filter y_filt = savitzky_golay(y_interp,**sg_kwargs) # Put it all back on the original x axis y_filt_x = np.interp(x,x_interp,y_filt) ys = y-y_filt_x elif method == "emd": imfs = EMD(y).decompose() if np.shape(imfs)[0] == 1: trend = np.zeros(np.size(y)) else: # trend = imfs[-1] trend = np.sum(imfs[-n:], axis=0) # remove the n smoothest modes ys = y - trend else: raise KeyError('Not a valid detrending method') return ys
def distance_neighbors(signal): '''Finds Distance of each point in the timeseries from its 4 nearest neighbors Wrapper around [sklearn.neighbors.NearestNeighbors](https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html) Parameters ---------- signal : array The timeseries Returns ------- distances : array Distance of each point from its nearest neighbors in decreasing order ''' nn = NearestNeighbors(n_neighbors=4) # 4 nearest neighbors nbrs =nn.fit(signal.reshape(-1,1)) distances, indices = nbrs.kneighbors(signal.reshape(-1,1)) distances = sorted(distances[:,-1],reverse=True) return distances def find_knee(distances): '''Finds knee point automatically in a given array sorted in decreasing order Parameters ---------- distances : array Distance of each point in the timeseries from it's nearest neighbors in decreasing order Returns ------- knee : float Knee point in the array ''' nPoints = len(distances) allCoord = np.vstack((range(nPoints), distances)).T np.array([range(nPoints), distances]) firstPoint = allCoord[0] lineVec = allCoord[-1] - allCoord[0] lineVecNorm = lineVec / np.sqrt(np.sum(lineVec**2)) vecFromFirst = allCoord - firstPoint # scalarProduct = np.sum(vecFromFirst * np.matlib.repmat(lineVecNorm, nPoints, 1), axis=1) scalarProduct = np.sum(vecFromFirst * np.tile(lineVecNorm, (nPoints, 1)), axis=1) vecFromFirstParallel = np.outer(scalarProduct, lineVecNorm) vecToLine = vecFromFirst - vecFromFirstParallel distToLine = np.sqrt(np.sum(vecToLine ** 2, axis=1)) idxOfBestPoint = np.argmax(distToLine) knee = distances[idxOfBestPoint] return knee def detect_outliers(ts, ys,auto=True, plot_knee=True,plot_outliers=True, plot_outliers_kwargs=None,plot_knee_kwargs=None, figsize=[10,4],saveknee_settings=None, saveoutliers_settings=None): ''' Function to detect outliers in the given timeseries For more details, see: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html Parameters ---------- ts : array Time axis of time series ys : array y values of time series auto : boolean True by default, if false the user manually selects the knee point plot_knee : boolean True by default, plots the knee plot_outliers : boolean True by default, plots the outliers using a scatter plot plot_outliers_kwargs : dict Keyword arguments for plot_scatter_xy for outliers plot plot_knee_kwargs : dict Keyword arguments for plot_xy for knee plot figsize : tuple, list Tuple or list of figure size saveknee_settings : dict the dictionary of arguments for plt.savefig() for knee plot; some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} saveoutliers_settings : dict the dictionary of arguments for plt.savefig() for outliers plot; some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} Returns ------- outliers : array a list of values consisting of outlier indices See also -------- pyleoclim.utils.tsutils.distance_neighbors : Finds Distance of each point in the timeseries from its 4 nearest neighbors pyleoclim.utils.tsutils.find_knee : Finds knee point automatically in a given array sorted in decreasing order pyleoclim.utils.tsutils.remove_outliers : Removes outliers from a timeseries pyleoclim.utils.plotting.plot_xy : Plot a timeseries pyleoclim.utils.plotting.plot_scatter_xy : Plot a scatter on top of a line plot. ''' #Take care of arguments for the knee plot saveknee_settings = {} if saveknee_settings is None else saveknee_settings.copy() try: minpts = math.log(len(ys)) distances = distance_neighbors(ys) flag = all(v < 0.0001 for v in distances) knee_point = find_knee(distances) mark = distances.index(knee_point) index = [i for i in range(len(distances))] if auto == True: db = DBSCAN(eps=knee_point, min_samples=int(minpts)) clusters = db.fit(ys.reshape(-1, 1)) cluster_labels = clusters.labels_ outliers = np.where(cluster_labels == -1) if plot_knee==True: fig1, ax1 = plt.subplots(figsize=figsize) if flag == True: knee_point = 0.1 ax1.annotate("knee={}".format(knee_point), (mark, knee_point), arrowprops=dict(facecolor='black', shrink=0.05)) plot_xy(index, distances,xlabel='Indices',ylabel='Distances',plot_kwargs=plot_knee_kwargs,ax=ax1) elif auto == False: plot_xy(index, distances, xlabel='Indices', ylabel='Distances',plot_kwargs=plot_knee_kwargs) eps = float(input('Enter the value for knee point')) if plot_knee==True: fig1,ax1 = plt.subplots(figsize=figsize) ax1.annotate("knee={}".format(eps), (mark, knee_point), arrowprops=dict(facecolor='black', shrink=0.05)) plot_xy(index, distances, xlabel='Indices', ylabel='Distances',plot_kwargs=plot_knee_kwargs,ax=ax1) db = DBSCAN(eps=eps, min_samples=int(minpts)) clusters = db.fit(ys.reshape(-1, 1)) cluster_labels = clusters.labels_ outliers = np.where(cluster_labels == -1) if 'fig1' in locals(): if 'path' in saveknee_settings: savefig(fig1, settings=saveknee_settings) if plot_outliers==True: x2 = ts[outliers] y2 = ys[outliers] plot_scatter_xy(ts,ys,x2,y2,figsize=figsize,xlabel='time',ylabel='value',savefig_settings=saveoutliers_settings,plot_kwargs=plot_outliers_kwargs) return outliers except ValueError: choice = input('Switch to Auto Mode(y/n)?') choice = choice.lower() if choice == 'y': a = detect_outliers(ts, ys, auto=True) return a else: exit(1)
[docs]def remove_outliers(ts,ys,outlier_points): ''' Removes outliers from a timeseries Parameters ---------- ts : array x axis of timeseries ys : array y axis of timeseries outlier_points : array indices of outlier points Returns ------- ys : array y axis of timeseries ts : array x axis of timeseries See also -------- pyleoclim.utils.tsutils.detect_outliers : Function to detect outliers in the given timeseries ''' ys = np.delete(ys,outlier_points) ts = np.delete(ts,outlier_points) return ys,ts
def eff_sample_size(y, detrend_flag=False): '''Effective Sample Size of timeseries y Parameters ---------- y : float 1d array detrend_flag : boolean if True (default), detrends y before estimation. Returns ------- neff : float The effective sample size Reference --------- Thiébaux HJ and Zwiers FW, 1984: The interpretation and estimation of effective sample sizes. Journal of Climate and Applied Meteorology 23: 800–811. ''' if len(y) < 100: fft = False else: fft = True if detrend_flag: yd = detrend(y) else: yd = y n = len(y) nl = math.floor(max(np.sqrt(n),10)) # rule of thumb for choosing number of lags rho = sms.acf(yd,adjusted=True,fft=fft,nlags=nl) # compute autocorrelation function kvec = np.arange(nl) fac = (1-kvec/nl)*rho[1:] neff = n/(1+2*np.sum(fac)) # Thiébaux & Zwiers 84, Eq 2.1 return neff # alias std = standardize gauss = gaussianize def preprocess(ys, ts, detrend=False, sg_kwargs=None, gaussianize=False, standardize=True): ''' Return the processed time series using detrend and standardization. Parameters ---------- ys : array a time series ts : array The time axis for the timeseries. Necessary for use with the Savitzky-Golay filters method since the series should be evenly spaced. detrend : string 'none'/False/None - no detrending will be applied; 'emd' - the last mode is assumed to be the trend and removed from the series 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted. sg_kwargs : dict The parameters for the Savitzky-Golay filter. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- res : array the processed time series See also -------- pyleoclim.utils.tsutils.detrend : Detrend a timeseries according to four methods pyleoclim.utils.filter.savitzy_golay : Filtering using Savitzy-Golay method pyleoclim.utils.tsutils.standardize : Centers and normalizes a given time series pyleoclim.utils.tsutils.gaussianize_1d : Quantile maps a matrix to a Gaussian distribution ''' if detrend == 'none' or detrend is False or detrend is None: ys_d = ys else: ys_d = detrend(ys, ts, method=detrend, sg_kwargs=sg_kwargs) if standardize: res, _, _ = std(ys_d) else: res = ys_d if gaussianize: res = gauss(res) return res