Source code for pyleoclim.utils.filter

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities to filter arrayed timeseries data, leveraging SciPy
"""

__all__ = [
    'butterworth',
    'savitzky_golay',
    'firwin',
    'lanczos'
]

import numpy as np
import statsmodels.api as sm
from scipy import signal

from .tsbase import (
    is_evenly_spaced
)

[docs]def savitzky_golay(ys, window_length=None, polyorder=2, deriv=0, delta=1, axis=-1, mode='mirror', cval=0): """ Smooth (and optionally differentiate) data with a Savitzky-Golay filter. The Savitzky-Golay filter removes high frequency noise from data. It has the advantage of preserving the original shape and features of the signal better than other types of filtering approaches, such as moving averages techniques. The Savitzky-Golay is a type of low-pass filter, particularly suited for smoothing noisy data. The main idea behind this approach is to make for each point a least-square fit with a polynomial of high order over a odd-sized window centered at the point. Uses the implementation from scipy.signal: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html Parameters ---------- ys : array the values of the signal to be filtered window_length : int The length of the filter window. Must be a positive integer. If mode is 'interp', window_length must be less than or equal to the size of ys. Default is the size of ys. polyorder : int The order of the polynomial used to fit the samples. polyorder Must be less than window_length. Default is 2 deriv : int The order of the derivative to compute. This must be a nonnegative integer. The default is 0, which means to filter the data without differentiating delta : float The spacing of the samples to which the filter will be applied. This is only used if deriv>0. Default is 1.0 axis : int The axis of the array ys along which the filter will be applied. Default is -1 mode : str Must be ‘mirror’ (the default), ‘constant’, ‘nearest’, ‘wrap’ or ‘interp’. This determines the type of extension to use for the padded signal to which the filter is applied. When mode is ‘constant’, the padding value is given by cval. See the Notes for more details on ‘mirror’, ‘constant’, ‘wrap’, and ‘nearest’. When the ‘interp’ mode is selected, no extension is used. Instead, a degree polyorder polynomial is fit to the last window_length values of the edges, and this polynomial is used to evaluate the last window_length // 2 output values. cval : scalar Value to fill past the edges of the input if mode is ‘constant’. Default is 0.0. Returns ------- yf : array ndarray of shape (N), the smoothed signal (or its n-th derivative). See also -------- pyleoclim.utils.filter.butterworth : Applies a Butterworth filter with frequency fc, with padding pyleoclim.utils.filter.lanczos : Applies a Lanczos filter with frequency fc, with padding pyleoclim.utils.filter.firwin : Applies a Finite Impulse Response filter with frequency fc, with padding References ---------- A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of Data by Simplified Least Squares Procedures. Analytical Chemistry, 1964, 36 (8), pp 1627-1639. Numerical Recipes 3rd Edition: The Art of Scientific Computing W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery Cambridge University Press ISBN-13: 9780521880688 `SciPy Cookbook <https://github.com/scipy/scipy-cookbook/blob/master/ipython/SavitzkyGolay.ipynb>`_. Notes ----- Details on the mode option: - ‘mirror’: Repeats the values at the edges in reverse order. The value closest to the edge is not included. - ‘nearest’: The extension contains the nearest input value. - ‘constant’: The extension contains the value given by the cval argument. - ‘wrap’: The extension contains the values from the other end of the array. """ if window_length==None: window_length=int(np.ceil(len(ys))//2*2-1) elif type(window_length) is not int: raise TypeError("window_length should be of type int") if type(polyorder) is not int: raise TypeError("polyorder should be of type int") if window_length % 2 != 1 or window_length < 1: raise TypeError("window_length size must be a positive odd number") if window_length < polyorder + 2: raise TypeError("window_length is too small for the polynomial's order") yf=signal.savgol_filter(ys,window_length=window_length, polyorder=polyorder, deriv=deriv, delta=delta, axis=axis, mode=mode, cval=cval) return yf
[docs]def ts_pad(ys,ts,method = 'reflect', params=(1,0,0), reflect_type = 'odd',padFrac=0.1): """ Pad a timeseries based on timeseries model predictions Parameters ---------- ys : numpy array Evenly-spaced timeseries ts : numpy array Time axis method : str the method to use to pad the series - ARIMA: uses a fitted ARIMA model - reflect (default): Reflects the time series around either end. params : tuple the ARIMA model order parameters (p,d,q), Default corresponds to an AR(1) model reflect_type : str; {‘even’, ‘odd’}, optional Used in ‘reflect’, and ‘symmetric’. The ‘even’ style is the default with an unaltered reflection around the edge value. For the ‘odd’ style, the extented part of the array is created by subtracting the reflected values from two times the edge value. For more details, see np.lib.pad() padFrac : float padding fraction (scalar) such that padLength = padFrac*length(series) Returns ------- yp : array padded timeseries tp : array augmented time axis See also -------- pyleoclim.utils.filter.butterworth : Applies a Butterworth filter with frequency fc, with padding pyleoclim.utils.filter.lanczos : Applies a Lanczos filter with frequency fc, with padding pyleoclim.utils.filter.firwin : Applies a Finite Impulse Response filter with frequency fc, with padding """ padLength = np.round(len(ts)*padFrac).astype(np.int64) if is_evenly_spaced(ts)==False: raise ValueError("ts needs to be composed of even increments") else: dt = np.diff(ts)[0] # computp time interval #time axis tp = np.arange(ts[0]-padLength*dt,ts[-1]+padLength*dt+dt,dt) if method == 'ARIMA': # fit ARIMA model fwd_mod = sm.tsa.ARIMA(ys,params).fit() # model with time going forward bwd_mod = sm.tsa.ARIMA(np.flip(ys,0),params).fit() # model with time going backwards # predict forward & backward fwd_pred = fwd_mod.forecast(padLength); yf = fwd_pred[0] bwd_pred = bwd_mod.forecast(padLength); yb = np.flip(bwd_pred[0],0) # extend time series yp = np.empty(len(tp)) yp[0:padLength]=yb yp[padLength:len(ts)+padLength]=ys yp[len(ts)+padLength:]=yf elif method == 'reflect': yp = np.pad(ys,(padLength,padLength),mode='reflect',reflect_type=reflect_type) else: raise ValueError('Not a valid argument. Enter "ARIMA" or "reflect"') return yp, tp
[docs]def butterworth(ys,fc,fs=1,filter_order=3,pad='reflect', reflect_type='odd',params=(1,0,0),padFrac=0.1): '''Applies a Butterworth filter with frequency fc, with padding Supports both lowpass and band-pass filtering. Parameters ---------- ys : numpy array Timeseries fc : float or list cutoff frequency. If scalar, it is interpreted as a low-frequency cutoff (lowpass) If fc is a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass) fs : float sampling frequency filter_order : int order n of Butterworth filter pad : str Indicates if padding is needed. - 'reflect': Reflects the timeseries - 'ARIMA': Uses an ARIMA model for the padding - None: No padding. params : tuple model parameters for ARIMA model (if pad = 'ARIMA') padFrac : float fraction of the series to be padded Returns ------- yf : array filtered array See also -------- pyleoclim.utils.filter.ts_pad : Pad a timeseries based on timeseries model predictions pyleoclim.utils.filter.firwin : Applies a Finite Impulse Response filter with frequency fc, with padding pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter pyleoclim.utils.filter.lanczos : Applies a Lanczos filter with frequency fc, with padding ''' nyq = 0.5 * fs if isinstance(fc, list) and len(fc) == 2: fl = fc[0] / nyq fh = fc[1] / nyq b, a = signal.butter(filter_order, [fl, fh], btype='bandpass') else: fl = fc / nyq b, a = signal.butter(filter_order, fl , btype='lowpass') ts = np.arange(len(ys)) # define time axis if pad=='ARIMA': yp, tp = ts_pad(ys,ts,method = 'ARIMA', params=params, padFrac=padFrac) elif pad=='reflect': yp, tp = ts_pad(ys,ts,method = 'reflect', reflect_type=reflect_type, padFrac=padFrac) elif pad is None: yp = ys tp = ts else: raise ValueError('Not a valid argument. Enter "ARIMA", "reflect" or None') ypf = signal.filtfilt(b, a, yp) yf = ypf[np.isin(tp,ts)] return yf
[docs]def lanczos(ys,fc,fs=1,pad='reflect', reflect_type='odd',params=(1,0,0),padFrac=0.1): '''Applies a Lanczos (lowpass) filter with frequency fc, with optional padding Parameters ---------- ys : numpy array Timeseries fc : float cutoff frequency fs : float sampling frequency pad : str Indicates if padding is needed - 'reflect': Reflects the timeseries - 'ARIMA': Uses an ARIMA model for the padding - None: No padding params : tuple model parameters for ARIMA model (if pad = 'ARIMA'). May require fiddling padFrac : float fraction of the series to be padded Returns ------- yf : array filtered array See also -------- pyleoclim.utils.filter.ts_pad : Pad a timeseries based on timeseries model predictions pyleoclim.utils.filter.butterworth : Applies a Butterworth filter with frequency fc, with padding pyleoclim.utils.filter.firwin : Applies a Finite Impulse Response filter with frequency fc, with padding pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter References ---------- Filter design from http://scitools.org.uk/iris/docs/v1.2/examples/graphics/SOI_filtering.html ''' ts = np.arange(len(ys)) # define "time" axis if pad=='ARIMA': yp, tp = ts_pad(ys,ts,method = 'ARIMA', params=params, padFrac=padFrac) elif pad=='reflect': yp, tp = ts_pad(ys,ts,method = 'reflect', reflect_type=reflect_type, padFrac=padFrac) elif pad is None: yp = ys tp = ts else: raise ValueError("Not a valid argument. Enter 'ARIMA', 'reflect' or None") window = max(51,len(yp)//4) # arbitrary? order = ((window - 1) // 2 ) + 1 nwts = 2 * order + 1 w = np.zeros([nwts]) n = nwts // 2 w[n] = 2 * fc / fs k = np.arange(1., n) sigma = np.sin(np.pi * k / n) * n / (np.pi * k) firstfactor = np.sin(2. * np.pi * fc / fs * k) / (np.pi * k) w[n-1:0:-1] = firstfactor * sigma w[n+1:-1] = firstfactor * sigma wgts = w[1:-1] ypf = np.convolve(yp,wgts, 'same') yf = ypf[np.isin(tp,ts)] return yf
[docs]def firwin(ys, fc, numtaps=None, fs=1, pad='reflect', window='hamming', reflect_type='odd', params=(1,0,0), padFrac=0.1, **kwargs): '''Applies a Finite Impulse Response filter design with window method and frequency fc, with padding Parameters ---------- ys : numpy array Timeseries fc : float or list cutoff frequency. If scalar, it is interpreted as a low-frequency cutoff (lowpass) If fc is a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass) numptaps : int Length of the filter (number of coefficients, i.e. the filter order + 1). numtaps must be odd if a passband includes the Nyquist frequency. If None, will use the largest number that is smaller than 1/3 of the the data length. fs : float sampling frequency window : str or tuple of string and parameter values, optional Desired window to use. See scipy.signal.get_window for a list of windows and required parameters. pad : str Indicates if padding is needed. - 'reflect': Reflects the timeseries - 'ARIMA': Uses an ARIMA model for the padding - None: No padding. params : tuple model parameters for ARIMA model (if pad = True) padFrac : float fraction of the series to be padded kwargs : dict a dictionary of keyword arguments for scipy.signal.firwin Returns ------- yf : array filtered array See also -------- scipy.signal.firwin : FIR filter design using the window method pyleoclim.utils.filter.ts_pad : Pad a timeseries based on timeseries model predictions pyleoclim.utils.filter.butterworth : Applies a Butterworth filter with frequency fc, with padding pyleoclim.utils.filter.lanczos : Applies a Lanczos filter with frequency fc, with padding pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter ''' # taps = signal.firwin(numtaps, fc, window=window, fs=fs, **kwargs) nyq = 0.5 * fs if np.isscalar(fc): pass_zero = 'lowpass' elif len(fc) == 2: pass_zero = 'bandpass' else: raise ValueError('Wrong input fc') if numtaps is None: # use the largest number of taps that the default padding method in scipy.signal.filtfilt allows numtaps = int(np.size(ys)//3) taps = signal.firwin(numtaps, fc/nyq, window=window, pass_zero=pass_zero, **kwargs) ts = np.arange(len(ys)) # define time axis if pad=='ARIMA': yp, tp = ts_pad(ys,ts,method = 'ARIMA', params=params, padFrac=padFrac) elif pad=='reflect': yp, tp = ts_pad(ys,ts,method = 'reflect', reflect_type=reflect_type, padFrac=padFrac) elif pad is None: yp = ys tp = ts else: raise ValueError('Not a valid argument. Enter "ARIMA", "reflect" or None') ypf = signal.filtfilt(taps, 1, yp) yf = ypf[np.isin(tp,ts)] return yf