Source code for pyleoclim.utils.wavelet

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 25 08:14:31 2020

@author: deborahkhider

Functions concerning wavelet analysis
"""


__all__ = [
    #'cwt',
    'wwz',
    'xwc',
]

import numpy as np
import statsmodels.api as sm
from scipy import signal
from pathos.multiprocessing import ProcessingPool as Pool
import numba as nb
from numba.core.errors import NumbaPerformanceWarning
import warnings
import collections
import scipy.fftpack as fft
from scipy import optimize
from scipy.optimize import fminbound
from scipy.special._ufuncs import gamma, gammainc

#from .tsmodel import ar1_sim
from .tsutils import preprocess
from .tsbase import (
    clean_ts,
    is_evenly_spaced,
)

from .filter import ts_pad

warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)

#---------------
#Wrapper functions
#---------------

#----------------
#Main Functions
#----------------

[docs]class AliasFilter(object): '''Performing anti-alias filter on a psd experimental: Use at your own risk @author: fzhu ''' def alias_filter(self, freq, pwr, fs, fc, f_limit, avgs): ''' anti_alias filter Parameters ---------- freq : array vector of frequencies in power spectrum pwr : array vector of spectral power corresponding to frequencies "freq" fs : float sampling frequency fc : float corner frequency for 1/f^2 steepening of power spectrum f_limit : float lower frequency limit for estimating misfit of model-plus-alias spectrum vs. measured power avgs : int flag for whether spectrum is derived from instantaneous point measurements (avgs<>1) OR from measurements averaged over each sampling interval (avgs==1) Returns ------- alpha : float best-fit exponent of power-law model filtered_pwr : array vector of alias-filtered spectral power model_pwr : array vector of modeled spectral power aliased_pwr : array vector of modeled spectral power, plus aliases References ---------- Kirchner, J. W. Aliasing in 1/f(alpha) noise spectra: origins, consequences, and remedies. Phys Rev E Stat Nonlin Soft Matter Phys 71, 66110 (2005). ''' log_pwr = np.log(pwr) freq_mask = (freq > f_limit)*1 # convert True & False to 1 & 0 alpha_upper_bound = 5 if avgs == 1: alpha_lower_bound = -2.9 # if measurements are time-averaged else: alpha_lower_bound = -0.9 # if measurements are point samples alpha = optimize.fminbound(self.misfit, alpha_lower_bound, alpha_upper_bound, args=(fs, fc, freq, log_pwr, freq_mask, avgs), xtol=1e-4) model_pwr, aliased_pwr, RMSE = self.alias(alpha, fs, fc, freq, log_pwr, freq_mask, avgs) filtered_pwr = pwr * model_pwr / aliased_pwr return alpha, filtered_pwr, model_pwr, aliased_pwr def misfit(self, alpha, fs, fc, freq, log_pwr, freq_mask, avgs): model, aliased_pwr, RMSE = self.alias(alpha, fs, fc, freq, log_pwr, freq_mask, avgs) return RMSE def alias(self, alpha, fs, fc, freq, log_pwr, freq_mask, avgs): model_pwr = self.model(alpha, fs, fc, freq, avgs) aliased_pwr = np.copy(model_pwr) if avgs == 1: aliased_pwr = aliased_pwr * np.sinc(freq/fs) ** 2 for k in range(1, 11): alias_minus = self.model(alpha, fs, fc, k*fs-freq, avgs) if avgs == 1: alias_minus = alias_minus * np.sinc((k*fs-freq)/fs) ** 2 aliased_pwr = aliased_pwr + alias_minus alias_plus = self.model(alpha, fs, fc, k*fs+freq, avgs) # notice the + in (k*fs+freq) if avgs == 1: alias_plus = alias_plus * np.sinc((k*fs+freq)/fs) ** 2 aliased_pwr = aliased_pwr + alias_plus if avgs == 1: beta = alpha + 3 const = 1 / (2*np.pi**2*beta/fs) else: beta = alpha + 1 const = 1 / (beta*fs) zo_minus = (11*fs-freq)**(-beta) dz_minus = zo_minus / 20 for j in range(1, 21): aliased_pwr = aliased_pwr + const / ((j*dz_minus)**(2/beta) + 1/fc**2)*dz_minus zo_plus = (11*fs+freq)**(-beta) dz_plus = zo_plus / 20 for j in range(1, 21): aliased_pwr = aliased_pwr + const / ((j*dz_plus)**(2/beta) + 1/fc**2)*dz_plus log_aliased = np.log(aliased_pwr) prefactor = np.sum((log_pwr - log_aliased) * freq_mask) / np.sum(freq_mask) log_aliased = log_aliased + prefactor aliased_pwr = aliased_pwr * np.exp(prefactor) model_pwr = model_pwr * np.exp(prefactor) RMSE = np.sqrt(np.sum((log_aliased-log_pwr)*(log_aliased-log_pwr)*freq_mask)) / np.sum(freq_mask) return model_pwr, aliased_pwr, RMSE def model(self, alpha, fs, fc, freq, avgs): spectr = freq**(-alpha) / (1 + (freq/fc)**2) return spectr
# def cwt(ys,ts,scales,wavelet='morl',sampling_period=1.0,method='conv',axis=-1): # '''Continous wavelet transform for evenly spaced data # pywavelet documentation: https://pywavelets.readthedocs.io/en/latest/ref/cwt.html # Parameters # ---------- # ys : array # signal # ts : array # time # scales : array (float) # different wavelet scales to use # wavelet : str # types of wavelet options in function documentation link. The default is 'morl' for a morlet wavelet. # sampling_period : float, optional # sampling period for frequencies output. The default is 1.0. # method : str, optional # cwt computation method. 'conv','fft'. or 'auto' The default is 'conv'. # axis : int, optional # axis over which to compute cwt. The default is -1, the last axis. # Returns # ------- # res : dictionary # 'freq' - array(float) # frequencies # 'time' - array(float) # 'amplitude' - array(float) # 'coi' - array(float) # cone of inference # ''' # coeff,freq=pywt.cwt(data=ys,scales=scales,wavelet=wavelet,sampling_period=sampling_period,method=method,axis=axis) # amplitude=abs(coeff).T # if wavelet=='morl' or wavelet[:4]=='cmor': # coi=make_coi(tau=ts,Neff=6) # else: # coi=make_coi(tau=ts) # Results = collections.namedtuple('Results', ['amplitude','coi', 'freq', 'time', 'coeff']) # res = Results(amplitude=amplitude, coi=coi, freq=freq, time=ts, coeff=coeff) # return res
[docs]def assertPositiveInt(*args): ''' Assert that the arguments are all positive integers. Parameters ---------- args ''' for arg in args: assert isinstance(arg, int) and arg >= 1
[docs]def wwz_basic(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, nproc=1, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False): ''' Return the weighted wavelet amplitude (WWA). The Weighted wavelet Z-transform (WWZ) is based on Morlet wavelet estimation, using least squares minimization to suppress the energy leakage caused by the data gaps. WWZ does not rely on interpolation or detrending, and is appropriate for unevenly-spaced datasets. In particular, we use the variant of Kirchner & Neal (2013), in which basis rotations mitigate the numerical instability that occurs in pathological cases with the original algorithm (Foster, 1996). The WWZ method has one adjustable parameter, a decay constant `c` that balances the time and frequency resolutions of the analysis. This application uses the larger value (8π2)−1, justified elsewhere (Witt & Schumann, 2005). No multiprocessing is applied by Default. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc :int fake argument, just for convenience detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa : array the weighted wavelet amplitude phase : array the weighted wavelet phase Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficients (a0, a1, a2) References ---------- - Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996). - Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores. Nonlinear Processes in Geophysics 12, 345–352 (2005). - Kirchner, J. W. and Neal, C. (2013). Universal fractal scaling in stream chemistry and its implications for solute transport and water quality trend detection. Proc Natl Acad Sci USA 110:12213–12218. See also -------- pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' assert nproc == 1, "wwz_basic() only supports nproc=1" assertPositiveInt(Neff) nt = np.size(tau) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs = np.ndarray(shape=(nt, nf)) ywave_1 = np.ndarray(shape=(nt, nf)) ywave_2 = np.ndarray(shape=(nt, nf)) ywave_3 = np.ndarray(shape=(nt, nf)) S = np.zeros(shape=(3, 3)) for k in range(nf): for j in range(nt): dz = omega[k] * (ts - tau[j]) weights = np.exp(-c*dz**2) sum_w = np.sum(weights) Neffs[j, k] = sum_w**2 / np.sum(weights**2) # local number of effective dof if Neffs[j, k] <= Neff: ywave_1[j, k] = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff ywave_2[j, k] = np.nan ywave_3[j, k] = np.nan else: phi2 = np.cos(dz) phi3 = np.sin(dz) S[0, 0] = 1 S[1, 1] = np.sum(weights*phi2*phi2) / sum_w S[2, 2] = np.sum(weights*phi3*phi3) / sum_w S[1, 0] = S[0, 1] = np.sum(weights*phi2) / sum_w S[2, 0] = S[0, 2] = np.sum(weights*phi3) / sum_w S[2, 1] = S[1, 2] = np.sum(weights*phi2*phi3) / sum_w S_inv = np.linalg.pinv(S) weighted_phi1 = np.sum(weights*pd_ys) / sum_w weighted_phi2 = np.sum(weights*phi2*pd_ys) / sum_w weighted_phi3 = np.sum(weights*phi3*pd_ys) / sum_w ywave_1[j, k] = S_inv[0, 0]*weighted_phi1 + S_inv[0, 1]*weighted_phi2 + S_inv[0, 2]*weighted_phi3 ywave_2[j, k] = S_inv[1, 0]*weighted_phi1 + S_inv[1, 1]*weighted_phi2 + S_inv[1, 2]*weighted_phi3 ywave_3[j, k] = S_inv[2, 0]*weighted_phi1 + S_inv[2, 1]*weighted_phi2 + S_inv[2, 2]*weighted_phi3 wwa = np.sqrt(ywave_2**2 + ywave_3**2) phase = np.arctan2(ywave_3, ywave_2) # coeff = ywave_2 + ywave_3*1j coeff = (ywave_1, ywave_2, ywave_3) return wwa, phase, Neffs, coeff
[docs]def wwz_nproc(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, nproc=8, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False): ''' Return the weighted wavelet amplitude (WWA). Original method from Foster (1996). Supports multiprocessing. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc : int the number of processes for multiprocessing detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa : array the weighted wavelet amplitude phase : array the weighted wavelet phase Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficients (a0, a1, a2) See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' assert nproc >= 2, "wwz_nproc() should use nproc >= 2, if want serial run, please use wwz_basic()" assertPositiveInt(Neff) nt = np.size(tau) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs = np.ndarray(shape=(nt, nf)) ywave_1 = np.ndarray(shape=(nt, nf)) ywave_2 = np.ndarray(shape=(nt, nf)) ywave_3 = np.ndarray(shape=(nt, nf)) def wwa_1g(tau, omega): dz = omega * (ts - tau) weights = np.exp(-c*dz**2) sum_w = np.sum(weights) Neff_loc = sum_w**2 / np.sum(weights**2) S = np.zeros(shape=(3, 3)) if Neff_loc <= Neff: ywave_2_1g = np.nan ywave_3_1g = np.nan else: phi2 = np.cos(dz) phi3 = np.sin(dz) S[0, 0] = 1 S[1, 1] = np.sum(weights*phi2*phi2) / sum_w S[2, 2] = np.sum(weights*phi3*phi3) / sum_w S[1, 0] = S[0, 1] = np.sum(weights*phi2) / sum_w S[2, 0] = S[0, 2] = np.sum(weights*phi3) / sum_w S[2, 1] = S[1, 2] = np.sum(weights*phi2*phi3) / sum_w S_inv = np.linalg.pinv(S) weighted_phi1 = np.sum(weights*pd_ys) / sum_w weighted_phi2 = np.sum(weights*phi2*pd_ys) / sum_w weighted_phi3 = np.sum(weights*phi3*pd_ys) / sum_w ywave_1_1g = S_inv[0, 0]*weighted_phi1 + S_inv[0, 1]*weighted_phi2 + S_inv[0, 2]*weighted_phi3 ywave_2_1g = S_inv[1, 0]*weighted_phi1 + S_inv[1, 1]*weighted_phi2 + S_inv[1, 2]*weighted_phi3 ywave_3_1g = S_inv[2, 0]*weighted_phi1 + S_inv[2, 1]*weighted_phi2 + S_inv[2, 2]*weighted_phi3 return Neff_loc, ywave_1_1g, ywave_2_1g, ywave_3_1g tf_mesh = np.meshgrid(tau, omega) list_of_grids = list(zip(*(grid.flat for grid in tf_mesh))) tau_grids, omega_grids = zip(*list_of_grids) with Pool(nproc) as pool: res = pool.map(wwa_1g, tau_grids, omega_grids) res_array = np.asarray(res) Neffs = res_array[:, 0].reshape((np.size(omega), np.size(tau))).T ywave_1 = res_array[:, 1].reshape((np.size(omega), np.size(tau))).T ywave_2 = res_array[:, 2].reshape((np.size(omega), np.size(tau))).T ywave_3 = res_array[:, 3].reshape((np.size(omega), np.size(tau))).T wwa = np.sqrt(ywave_2**2 + ywave_3**2) phase = np.arctan2(ywave_3, ywave_2) # coeff = ywave_2 + ywave_3*1j coeff = (ywave_1, ywave_2, ywave_3) return wwa, phase, Neffs, coeff
[docs]def kirchner_basic(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, nproc=1, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False): ''' Return the weighted wavelet amplitude (WWA) modified by Kirchner. Method modified by Kirchner. No multiprocessing. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc : int fake argument for convenience, for parameter consistency between functions, does not need to be specified detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa : array the weighted wavelet amplitude phase : array the weighted wavelet phase Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficients (a0, a1, a2) References ---------- - Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996). - Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores. Nonlinear Processes in Geophysics 12, 345–352 (2005). See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' assert nproc == 1, "wwz_basic() only supports nproc=1" assertPositiveInt(Neff) nt = np.size(tau) nts = np.size(ts) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs = np.ndarray(shape=(nt, nf)) a0 = np.ndarray(shape=(nt, nf)) a1 = np.ndarray(shape=(nt, nf)) a2 = np.ndarray(shape=(nt, nf)) for k in range(nf): for j in range(nt): dz = omega[k] * (ts - tau[j]) weights = np.exp(-c*dz**2) sum_w = np.sum(weights) Neffs[j, k] = sum_w**2 / np.sum(weights**2) # local number of effective dof if Neffs[j, k] <= Neff: a0[j, k] = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff a1[j, k] = np.nan a2[j, k] = np.nan else: def w_prod(xs, ys): return np.sum(weights*xs*ys) / sum_w sin_basis = np.sin(omega[k]*ts) cos_basis = np.cos(omega[k]*ts) one_v = np.ones(nts) sin_one = w_prod(sin_basis, one_v) cos_one = w_prod(cos_basis, one_v) sin_cos = w_prod(sin_basis, cos_basis) sin_sin = w_prod(sin_basis, sin_basis) cos_cos = w_prod(cos_basis, cos_basis) numerator = 2 * (sin_cos - sin_one * cos_one) denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2) time_shift = np.arctan2(numerator, denominator) / (2*omega[k]) # Eq. (S5) sin_shift = np.sin(omega[k]*(ts - time_shift)) cos_shift = np.cos(omega[k]*(ts - time_shift)) sin_tau_center = np.sin(omega[k]*(time_shift - tau[j])) cos_tau_center = np.cos(omega[k]*(time_shift - tau[j])) ys_cos_shift = w_prod(pd_ys, cos_shift) ys_sin_shift = w_prod(pd_ys, sin_shift) ys_one = w_prod(pd_ys, one_v) cos_shift_one = w_prod(cos_shift, one_v) sin_shift_one = w_prod(sin_shift, one_v) A = 2*(ys_cos_shift-ys_one*cos_shift_one) B = 2*(ys_sin_shift-ys_one*sin_shift_one) a0[j, k] = ys_one a1[j, k] = cos_tau_center*A - sin_tau_center*B # Eq. (S6) a2[j, k] = sin_tau_center*A + cos_tau_center*B # Eq. (S7) wwa = np.sqrt(a1**2 + a2**2) phase = np.arctan2(a2, a1) # coeff = a1 + a2*1j coeff = (a0, a1, a2) return wwa, phase, Neffs, coeff
[docs]def kirchner_nproc(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, nproc=8, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False): ''' Return the weighted wavelet amplitude (WWA) modified by Kirchner. Method modified by kirchner. Supports multiprocessing. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc : int the number of processes for multiprocessing detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa (array): the weighted wavelet amplitude phase (array): the weighted wavelet phase Neffs (array): the matrix of effective number of points in the time-scale coordinates coeff (array): the wavelet transform coefficients (a0, a1, a2) See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' assert nproc >= 2, "wwz_nproc() should use nproc >= 2, if want serial run, please use wwz_basic()" assertPositiveInt(Neff) nt = np.size(tau) nts = np.size(ts) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs = np.ndarray(shape=(nt, nf)) a0 = np.ndarray(shape=(nt, nf)) a1 = np.ndarray(shape=(nt, nf)) a2 = np.ndarray(shape=(nt, nf)) def wwa_1g(tau, omega): dz = omega * (ts - tau) weights = np.exp(-c*dz**2) sum_w = np.sum(weights) Neff_loc = sum_w**2 / np.sum(weights**2) if Neff_loc <= Neff: a0_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff a1_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff a2_1g = np.nan else: def w_prod(xs, ys): return np.sum(weights*xs*ys) / sum_w sin_basis = np.sin(omega*ts) cos_basis = np.cos(omega*ts) one_v = np.ones(nts) sin_one = w_prod(sin_basis, one_v) cos_one = w_prod(cos_basis, one_v) sin_cos = w_prod(sin_basis, cos_basis) sin_sin = w_prod(sin_basis, sin_basis) cos_cos = w_prod(cos_basis, cos_basis) numerator = 2*(sin_cos - sin_one*cos_one) denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2) time_shift = np.arctan2(numerator, denominator) / (2*omega) # Eq. (S5) sin_shift = np.sin(omega*(ts - time_shift)) cos_shift = np.cos(omega*(ts - time_shift)) sin_tau_center = np.sin(omega*(time_shift - tau)) cos_tau_center = np.cos(omega*(time_shift - tau)) ys_cos_shift = w_prod(pd_ys, cos_shift) ys_sin_shift = w_prod(pd_ys, sin_shift) ys_one = w_prod(pd_ys, one_v) cos_shift_one = w_prod(cos_shift, one_v) sin_shift_one = w_prod(sin_shift, one_v) A = 2*(ys_cos_shift - ys_one*cos_shift_one) B = 2*(ys_sin_shift - ys_one*sin_shift_one) a0_1g = ys_one a1_1g = cos_tau_center*A - sin_tau_center*B # Eq. (S6) a2_1g = sin_tau_center*A + cos_tau_center*B # Eq. (S7) return Neff_loc, a0_1g, a1_1g, a2_1g tf_mesh = np.meshgrid(tau, omega) list_of_grids = list(zip(*(grid.flat for grid in tf_mesh))) tau_grids, omega_grids = zip(*list_of_grids) with Pool(nproc) as pool: res = pool.map(wwa_1g, tau_grids, omega_grids) res_array = np.asarray(res) Neffs = res_array[:, 0].reshape((np.size(omega), np.size(tau))).T a0 = res_array[:, 1].reshape((np.size(omega), np.size(tau))).T a1 = res_array[:, 2].reshape((np.size(omega), np.size(tau))).T a2 = res_array[:, 3].reshape((np.size(omega), np.size(tau))).T wwa = np.sqrt(a1**2 + a2**2) phase = np.arctan2(a2, a1) # coeff = a1 + a2*1j coeff = (a0, a1, a2) return wwa, phase, Neffs, coeff
[docs]def kirchner_numba(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False, nproc=1): ''' Return the weighted wavelet amplitude (WWA) modified by Kirchner. Using numba. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc : int fake argument, just for convenience detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa : array the weighted wavelet amplitude phase : array the weighted wavelet phase Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficients (a0, a1, a2) References ---------- Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996). Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores. Nonlinear Processes in Geophysics 12, 345–352 (2005). See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' assertPositiveInt(Neff) nt = np.size(tau) nts = np.size(ts) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs = np.ndarray(shape=(nt, nf)) a0 = np.ndarray(shape=(nt, nf)) a1 = np.ndarray(shape=(nt, nf)) a2 = np.ndarray(shape=(nt, nf)) @nb.jit(nopython=True, parallel=True, fastmath=True) def loop_over(nf, nt, Neffs, a0, a1, a2): def wwa_1g(tau, omega): dz = omega * (ts - tau) weights = np.exp(-c*dz**2) sum_w = np.sum(weights) Neff_loc = sum_w**2 / np.sum(weights**2) if Neff_loc <= Neff: a0_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff a1_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff a2_1g = np.nan else: def w_prod(xs, ys): return np.sum(weights*xs*ys) / sum_w sin_basis = np.sin(omega*ts) cos_basis = np.cos(omega*ts) one_v = np.ones(nts) sin_one = w_prod(sin_basis, one_v) cos_one = w_prod(cos_basis, one_v) sin_cos = w_prod(sin_basis, cos_basis) sin_sin = w_prod(sin_basis, sin_basis) cos_cos = w_prod(cos_basis, cos_basis) numerator = 2*(sin_cos - sin_one*cos_one) denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2) time_shift = np.arctan2(numerator, denominator) / (2*omega) # Eq. (S5) sin_shift = np.sin(omega*(ts - time_shift)) cos_shift = np.cos(omega*(ts - time_shift)) sin_tau_center = np.sin(omega*(time_shift - tau)) cos_tau_center = np.cos(omega*(time_shift - tau)) ys_cos_shift = w_prod(pd_ys, cos_shift) ys_sin_shift = w_prod(pd_ys, sin_shift) ys_one = w_prod(pd_ys, one_v) cos_shift_one = w_prod(cos_shift, one_v) sin_shift_one = w_prod(sin_shift, one_v) A = 2*(ys_cos_shift - ys_one*cos_shift_one) B = 2*(ys_sin_shift - ys_one*sin_shift_one) a0_1g = ys_one a1_1g = cos_tau_center*A - sin_tau_center*B # Eq. (S6) a2_1g = sin_tau_center*A + cos_tau_center*B # Eq. (S7) return Neff_loc, a0_1g, a1_1g, a2_1g for k in nb.prange(nf): for j in nb.prange(nt): Neffs[j, k], a0[j, k], a1[j, k], a2[j, k] = wwa_1g(tau[j], omega[k]) return Neffs, a0, a1, a2 Neffs, a0, a1, a2 = loop_over(nf, nt, Neffs, a0, a1, a2) wwa = np.sqrt(a1**2 + a2**2) phase = np.arctan2(a2, a1) # coeff = a1 + a2*1j coeff = (a0, a1, a2) return wwa, phase, Neffs, coeff
[docs]def kirchner_f2py(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff=3, nproc=8, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False): ''' Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Fastest method. Calls Fortran libraries. Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int the threshold of the number of effective degrees of freedom nproc : int fake argument, just for convenience detrend : string None - the original time series is assumed to have no trend; 'linear' - a linear least-squares fit to `ys` is subtracted; 'constant' - the mean of `ys` is subtracted 'savitzy-golay' - ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries Returns ------- wwa : array the weighted wavelet amplitude phase : array the weighted wavelet phase Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficients (a0, a1, a2) See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. ''' from . import f2py_wwz as f2py assertPositiveInt(Neff, nproc) nt = np.size(tau) nts = np.size(ts) nf = np.size(freq) pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) omega = make_omega(ts, freq) Neffs, a0, a1, a2 = f2py.f2py_wwz.wwa(tau, omega, c, Neff, ts, pd_ys, nproc, nts, nt, nf) undef = -99999. a0[a0 == undef] = np.nan a1[a1 == undef] = np.nan a2[a2 == undef] = np.nan wwa = np.sqrt(a1**2 + a2**2) phase = np.arctan2(a2, a1) # coeff = a1 + a2*1j coeff = (a0, a1, a2) return wwa, phase, Neffs, coeff
[docs]def make_coi(tau, Neff=3): ''' Return the cone of influence. Parameters ---------- tau : array the evenly-spaced time points, namely the time shift for wavelet analysis Neff : int the threshold of the number of effective samples Returns ------- coi : array cone of influence References ---------- wave_signif() in http://paos.colorado.edu/research/wavelets/wave_python/waveletFunctions.py ''' assert isinstance(Neff, int) and Neff >= 1 nt = np.size(tau) fourier_factor = 4*np.pi / (Neff+np.sqrt(2+Neff**2)) coi_const = fourier_factor / np.sqrt(2) dt = np.median(np.diff(tau)) nt_half = (nt+1)//2 - 1 A = np.append(0.00001, np.arange(nt_half)+1) B = A[::-1] if nt % 2 == 0: C = np.append(A, B) else: C = np.append(A, B[1:]) coi = coi_const * dt * C return coi
[docs]def make_omega(ts, freq): ''' Return the angular frequency based on the time axis and given frequency vector Parameters ---------- ys : array a time series ts : array time axis of the time series freq : array vector of frequency Returns ------- omega : array the angular frequency vector ''' # for the frequency band larger than f_Nyquist, the wwa will be marked as NaNs f_Nyquist = 0.5 / np.median(np.diff(ts)) freq_with_nan = np.copy(freq) freq_with_nan[freq > f_Nyquist] = np.nan omega = 2*np.pi*freq_with_nan return omega
[docs]def wwa2psd(wwa, ts, Neffs, freq=None, Neff=3, anti_alias=False, avgs=2): """ Return the power spectral density (PSD) using the weighted wavelet amplitude (WWA). Parameters ---------- wwa : array the weighted wavelet amplitude. ts : array the time points, should be pre-truncated so that the span is exactly what is used for wwz Neffs : array the matrix of effective number of points in the time-scale coordinates obtained from wwz freq : array vector of frequency from wwz Neff : int the threshold of the number of effective samples anti_alias : bool whether to apply anti-alias filter avgs : int flag for whether spectrum is derived from instantaneous point measurements (avgs<>1) OR from measurements averaged over each sampling interval (avgs==1) Returns ------- psd : array power spectral density References ---------- Kirchner's C code for weighted psd calculation """ af = AliasFilter() # weighted psd calculation start power = wwa**2 * 0.5 * (np.max(ts)-np.min(ts))/np.size(ts) * Neffs Neff_diff = Neffs - Neff Neff_diff[Neff_diff < 0] = 0 sum_power = np.nansum(power * Neff_diff, axis=0) sum_eff = np.nansum(Neff_diff, axis=0) psd = sum_power / sum_eff # weighted psd calculation end if anti_alias: assert freq is not None, "freq is required for alias filter!" dt = np.median(np.diff(ts)) f_sampling = 1/dt psd_copy = psd[1:] freq_copy = freq[1:] alpha, filtered_pwr, model_pwer, aliased_pwr = af.alias_filter( freq_copy, psd_copy, f_sampling, f_sampling*1e3, np.min(freq), avgs) psd[1:] = np.copy(filtered_pwr) return psd
[docs]def wwz(ys, ts, tau=None, ntau=None, freq=None, freq_method='log', freq_kwargs={}, c=1/(8*np.pi**2), Neff=3, Neff_coi=3, nMC=200, nproc=8, detrend=False, sg_kwargs=None, gaussianize=False, standardize=False, method='Kirchner_numba', len_bd=0, bc_mode='reflect', reflect_type='odd'): ''' Weighted wavelet amplitude (WWA) for unevenly-spaced data Parameters ---------- ys : array a time series, NaNs will be deleted automatically ts : array the time points, if `ys` contains any NaNs, some of the time points will be deleted accordingly tau : array the evenly-spaced time vector for the analysis, namely the time shift for wavelet analysis freq : array vector of frequency freq_method : str Method to generate the frequency vector if not set directly. The following options are avialable: - 'log' (default) - 'lomb_scargle' - 'welch' - 'scale' - 'nfft' See :func:`pyleoclim.utils.wavelet.make_freq_vector()` for details freq_kwargs : str used when freq=None for certain methods c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int effective number of points nMC : int the number of Monte-Carlo simulations nproc : int the number of processes for multiprocessing detrend : string, {None, 'linear', 'constant', 'savitzy-golay'} available methods for detrending, including - None: the original time series is assumed to have no trend; - 'linear': a linear least-squares fit to `ys` is subtracted; - 'constant': the mean of `ys` is subtracted - 'savitzy-golay': ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. See :func:`pyleoclim.utils.filter.savitzky_golay()` for details. method : string, {'Foster', 'Kirchner', 'Kirchner_f2py', 'Kirchner_numba'} available specific implementation of WWZ, including - 'Foster': the original WWZ method; - 'Kirchner': the method Kirchner adapted from Foster; - 'Kirchner_f2py': the method Kirchner adapted from Foster, implemented with f2py for acceleration; - 'Kirchner_numba': the method Kirchner adapted from Foster, implemented with Numba for acceleration (default); len_bd : int the number of the ghost grids want to creat on each boundary bc_mode : string, {'constant', 'edge', 'linear_ramp', 'maximum', 'mean', 'median', 'minimum', 'reflect' , 'symmetric', 'wrap'} For more details, see np.lib.pad() reflect_type : string, optional, {‘even’, ‘odd’} Used in ‘reflect’, and ‘symmetric’. The ‘even’ style is the default with an unaltered reflection around the edge value. For the ‘odd’ style, the extented part of the array is created by subtracting the reflected values from two times the edge value. For more details, see np.lib.pad() Returns ------- res : namedtuple a namedtuple that includes below items wwa : array the weighted wavelet amplitude. coi : array cone of influence freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis Neffs : array the matrix of effective number of points in the time-scale coordinates coeff : array the wavelet transform coefficents See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector Examples -------- We perform an ideal test below. We use a sine wave with a period of 50 yrs as the signal for test. Then performing wavelet analysis should return an energy band around period of 50 yrs in the time-period scalogram domain. .. ipython:: python :okwarning: from pyleoclim import utils import matplotlib.pyplot as plt from matplotlib.ticker import ScalarFormatter, FormatStrFormatter import numpy as np # Create a signal time = np.arange(2001) f = 1/50 # the period is then 1/f = 50 signal = np.cos(2*np.pi*f*time) # Wavelet Analysis res = utils.wwz(signal, time) # Visualization fig, ax = plt.subplots() contourf_args = {'cmap': 'magma', 'origin': 'lower', 'levels': 11} cbar_args = {'drawedges': False, 'orientation': 'vertical', 'fraction': 0.15, 'pad': 0.05} cont = ax.contourf(res.time, 1/res.freq, res.amplitude.T, **contourf_args) ax.plot(res.time, res.coi, 'k--') # plot the cone of influence ax.set_yscale('log') ax.set_yticks([2, 5, 10, 20, 50, 100, 200, 500, 1000]) ax.set_ylim([2, 1000]) ax.yaxis.set_major_formatter(ScalarFormatter()) ax.yaxis.set_major_formatter(FormatStrFormatter('%g')) ax.set_xlabel('Time (yr)') ax.set_ylabel('Period (yrs)') cb = plt.colorbar(cont, **cbar_args) @savefig wwa_wwz.png plt.show() ''' assert isinstance(nMC, int) and nMC >= 0, "nMC should be larger than or equal to 0." ys_cut, ts_cut, freq, tau = prepare_wwz( ys, ts, freq=freq, freq_method=freq_method, freq_kwargs=freq_kwargs, tau=tau, len_bd=len_bd, bc_mode=bc_mode, reflect_type=reflect_type ) wwz_func = get_wwz_func(nproc, method) wwa, phase, Neffs, coeff = wwz_func(ys_cut, ts_cut, freq, tau, Neff=Neff, c=c, nproc=nproc, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize) # Monte-Carlo simulations of AR1 process nt = np.size(tau) nf = np.size(freq) # wwa_red = np.ndarray(shape=(nMC, nt, nf)) # AR1_q = np.ndarray(shape=(nt, nf)) # if nMC >= 1: # for i in tqdm(range(nMC), desc='Monte-Carlo simulations'): # r = ar1_sim(ys_cut, np.size(ts_cut), 1, ts=ts_cut) # wwa_red[i, :, :], _, _, _ = wwz_func(r, ts_cut, freq, tau, c=c, Neff=Neff, nproc=nproc, # detrend=detrend, sg_kwargs=sg_kwargs, # gaussianize=gaussianize, standardize=standardize) # for j in range(nt): # for k in range(nf): # AR1_q[j, k] = mquantiles(wwa_red[:, j, k], 0.95) # else: # AR1_q = None # AR1_q = None # calculate the cone of influence coi = make_coi(tau, Neff=Neff_coi) # Results = collections.namedtuple('Results', ['amplitude', 'phase', 'AR1_q', 'coi', 'freq', 'time', 'Neffs', 'coeff']) # res = Results(amplitude=wwa, phase=phase, AR1_q=AR1_q, coi=coi, freq=freq, time=tau, Neffs=Neffs, coeff=coeff) Results = collections.namedtuple('Results', ['amplitude', 'phase', 'coi', 'freq', 'time', 'Neffs', 'coeff']) res = Results(amplitude=wwa, phase=phase, coi=coi, freq=freq, time=tau, Neffs=Neffs, coeff=coeff) return res
[docs]def xwc(ys1, ts1, ys2, ts2, smooth_factor=0.25, tau=None, freq=None, freq_method='log', freq_kwargs=None, c=1/(8*np.pi**2), Neff=3, nproc=8, detrend=False, sg_kwargs=None, nMC=200, gaussianize=False, standardize=False, method='Kirchner_numba', verbose=False): ''' Return the cross-wavelet coherence of two time series. Parameters ---------- ys1 : array first of two time series ys2 : array second of the two time series ts1 : array time axis of first time series ts2 : array time axis of the second time series tau : array the evenly-spaced time points freq : array vector of frequency c : float the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution; the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases Neff : int effective number of points nproc : int the number of processes for multiprocessing nMC : int the number of Monte-Carlo simulations detrend : string - None: the original time series is assumed to have no trend; - 'linear': a linear least-squares fit to `ys` is subtracted; - 'constant': the mean of `ys` is subtracted - 'savitzy-golay': ys is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series sg_kwargs : dict The parameters for the Savitzky-Golay filters. see pyleoclim.utils.filter.savitzy_golay for details. gaussianize : bool If True, gaussianizes the timeseries standardize : bool If True, standardizes the timeseries method : string - 'Foster': the original WWZ method; - 'Kirchner': the method Kirchner adapted from Foster; - 'Kirchner_f2py': the method Kirchner adapted from Foster with f2py - 'Kirchner_numba': Kirchner's algorithm with Numba support for acceleration (default) verbose : bool If True, print warning messages Returns ------- res : dict contains the cross wavelet coherence, cross-wavelet phase, vector of frequency, evenly-spaced time points, AR1 sims, cone of influence See also -------- pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package. pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler. pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter. pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' assert isinstance(nMC, int) and nMC >= 0, "nMC should be larger than or eaqual to 0." if tau is None: lb1, ub1 = np.min(ts1), np.max(ts1) lb2, ub2 = np.min(ts2), np.max(ts2) lb = np.max([lb1, lb2]) ub = np.min([ub1, ub2]) inside = ts1[(ts1>=lb) & (ts1<=ub)] tau = np.linspace(lb, ub, np.size(inside)//10) print(f'Setting tau={tau[:3]}...{tau[-3:]}, ntau={np.size(tau)}') if freq is None: freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() freq = make_freq_vector(ts1, method=freq_method, **freq_kwargs) print(f'Setting freq={freq[:3]}...{freq[-3:]}, nfreq={np.size(freq)}') ys1_cut, ts1_cut, freq1, tau1 = prepare_wwz(ys1, ts1, freq=freq, tau=tau) ys2_cut, ts2_cut, freq2, tau2 = prepare_wwz(ys2, ts2, freq=freq, tau=tau) if np.any(tau1 != tau2): if verbose: print('inconsistent `tau`, recalculating...') tau_min = np.min([np.min(tau1), np.min(tau2)]) tau_max = np.max([np.max(tau1), np.max(tau2)]) ntau = np.max([np.size(tau1), np.size(tau2)]) tau = np.linspace(tau_min, tau_max, ntau) else: tau = tau1 if np.any(freq1 != freq2): if verbose: print('inconsistent `freq`, recalculating...') freq_min = np.min([np.min(freq1), np.min(freq2)]) freq_max = np.max([np.max(freq1), np.max(freq2)]) nfreq = np.max([np.size(freq1), np.size(freq2)]) freq = np.linspace(freq_min, freq_max, nfreq) else: freq = freq1 if freq[0] == 0: freq = freq[1:] # delete 0 frequency if present res_wwz1 = wwz(ys1_cut, ts1_cut, tau=tau, freq=freq, c=c, Neff=Neff, nMC=0, nproc=nproc, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize, method=method) res_wwz2 = wwz(ys2_cut, ts2_cut, tau=tau, freq=freq, c=c, Neff=Neff, nMC=0, nproc=nproc, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize, method=method) wt_coeff1 = res_wwz1.coeff[1] - res_wwz1.coeff[2]*1j wt_coeff2 = res_wwz2.coeff[1] - res_wwz2.coeff[2]*1j xw_coherence, xw_phase = wavelet_coherence(wt_coeff1, wt_coeff2, freq, tau, smooth_factor=smooth_factor) xwt, xw_amplitude, _ = cross_wt(wt_coeff1, wt_coeff2) # Monte-Carlo simulations of AR1 process nt = np.size(tau) nf = np.size(freq) # coherence_red = np.ndarray(shape=(nMC, nt, nf)) # AR1_q = np.ndarray(shape=(nt, nf)) coherence_red = None AR1_q = None # if nMC >= 1: # for i in tqdm(range(nMC), desc='Monte-Carlo simulations'): # r1 = ar1_sim(ys1_cut, np.size(ts1_cut), 1, ts=ts1_cut) # r2 = ar1_sim(ys2_cut, np.size(ts2_cut), 1, ts=ts2_cut) # res_wwz_r1 = wwz(r1, ts1_cut, tau=tau, freq=freq, c=c, Neff=Neff, nMC=0, nproc=nproc, # detrend=detrend, sg_kwargs=sg_kwrags, # gaussianize=gaussianize, standardize=standardize) # res_wwz_r2 = wwz(r2, ts2_cut, tau=tau, freq=freq, c=c, Neff=Neff, nMC=0, nproc=nproc, # detrend=detrend, sg_kwargs=sg_kwargs, # gaussianize=gaussianize, standardize=standardize) # wt_coeffr1 = res_wwz_r1.coeff[1] - res_wwz_r2.coeff[2]*1j # wt_coeffr2 = res_wwz_r1.coeff[1] - res_wwz_r2.coeff[2]*1j # coherence_red[i, :, :], phase_red = wavelet_coherence(wt_coeffr1, wt_coeffr2, freq, tau, smooth_factor=smooth_factor) # for j in range(nt): # for k in range(nf): # AR1_q[j, k] = mquantiles(coherence_red[:, j, k], 0.95) # else: # AR1_q = None coi = make_coi(tau, Neff=Neff) Results = collections.namedtuple('Results', ['xw_coherence', 'xw_amplitude', 'xw_phase', 'xwt', 'freq', 'time', 'AR1_q', 'coi']) res = Results(xw_coherence=xw_coherence, xw_amplitude=xw_amplitude, xw_phase=xw_phase, xwt=xwt, freq=freq, time=tau, AR1_q=AR1_q, coi=coi) return res
[docs]def freq_vector_lomb_scargle(ts, dt= None, nf=None, ofac=4, hifac=1): ''' Return the frequency vector based on the REDFIT recommendation. Parameters ---------- ts : array time axis of the time series dt : float The resolution of the data. If None, uses the median resolution. Defaults to None. nf : int Number of frequency points. If None, calculated as the difference between the highest and lowest frequencies (set by hifac and ofac) divided by resolution. Defaults to None ofac : float Oversampling rate that influences the resolution of the frequency axis, when equals to 1, it means no oversamling (should be >= 1). The default value 4 is usually a good value. hifac : float fhi/fnyq (should be <= 1), where fhi is the highest frequency that can be analyzed by the Lomb-Scargle algorithm and fnyq is the Nyquist frequency. Returns ------- freq : array the frequency vector References ---------- Trauth, M. H. MATLAB® Recipes for Earth Sciences. (Springer, 2015). pp 181. See also -------- pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method. pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' assert ofac >= 1 and hifac <= 1, "`ofac` should be >= 1, and `hifac` should be <= 1" if dt is None: dt = np.median(np.diff(ts)) flo = (1/(2*dt)) / (np.size(ts)*ofac) fhi = hifac / (2*dt) if nf is None: df = flo nf = int((fhi - flo) / df + 1) freq = np.linspace(flo, fhi, nf) return freq
[docs]def freq_vector_welch(ts): ''' Return the frequency vector based on the Welch's method. Parameters ---------- ts : array time axis of the time series Returns ------- freq : array the frequency vector References ---------- https://github.com/scipy/scipy/blob/v0.14.0/scipy/signal/Spectral.py See also -------- pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT recommendation. pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' nt = np.size(ts) dt = np.median(np.diff(ts)) fs = 1 / dt if nt % 2 == 0: n_freq = nt//2 + 1 else: n_freq = (nt+1) // 2 freq = np.arange(n_freq) * fs / nt return freq
[docs]def freq_vector_nfft(ts): ''' Return the frequency vector based on NFFT Parameters ---------- ts : array time axis of the time series Returns ------- freq : array the frequency vector See also -------- pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT recommendation. pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method. pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' nt = np.size(ts) dt = np.median(np.diff(ts)) fs = 1 / dt n_freq = nt//2 + 1 freq = np.linspace(0, fs/2, n_freq) return freq
[docs]def freq_vector_scale(ts, nv=12, fourier_factor=1): ''' Return the frequency vector based on scales for wavelet analysis Parameters ---------- ts : array time axis of the time series nv : int the parameter that controls the number of freq points Returns ------- freq : array the frequency vector See also -------- pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT recommendation. pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method. pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' s0 = 2*np.median(np.diff(ts)) a0 = 2**(1/nv) noct = np.floor(np.log2(np.size(ts)))-1 # number of octave scale = s0*a0**(np.arange(noct*nv+1)) freq = 1/(scale[::-1]*fourier_factor) return freq
[docs]def freq_vector_log(ts, nfreq=None): ''' Return the frequency vector based on logspace Parameters ---------- ts : array time axis of the time series nv : int the parameter that controls the number of freq points Returns ------- freq : array the frequency vector See also -------- pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT recommendation. pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method. pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector ''' nt = np.size(ts) dt = np.median(np.diff(ts)) fs = 1 / dt if nfreq is None: nfreq = nt//10 + 1 fmin = 2/(np.max(ts)-np.min(ts)) fmax = fs/2 start = np.log2(fmin) stop = np.log2(fmax) freq = np.logspace(start, stop, nfreq, base=2) return freq
[docs]def make_freq_vector(ts, method='log', **kwargs): ''' Make frequency vector This function selects among five methods to obtain the frequency vector. Parameters ---------- ts : array Time axis of the time series method : string The method to use. Options are 'log' (default), 'nfft', 'lomb_scargle', 'welch', and 'scale' kwargs : dict, optional For Lomb_Scargle, additional parameters may be passed: - nf (int): number of frequency points - ofac (float): Oversampling rate that influences the resolution of the frequency axis, when equals to 1, it means no oversamling (should be >= 1). The default value 4 is usaually a good value. - hifac (float): fhi/fnyq (should be >= 1), where fhi is the highest frequency that can be analyzed by the Lomb-Scargle algorithm and fnyq is the Nyquist frequency. Returns ------- freq : array the frequency vector See also -------- pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT recommendation. pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method. pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace ''' if method == 'lomb_scargle': freq = freq_vector_lomb_scargle(ts,**kwargs) elif method == 'welch': freq = freq_vector_welch(ts) elif method == 'nfft': freq = freq_vector_nfft(ts) elif method == 'scale': freq = freq_vector_scale(ts, **kwargs) elif method == 'log': freq = freq_vector_log(ts, **kwargs) else: raise ValueError('This method is not supported') # freq = freq[1:] # discard the first element 0 return freq
[docs]def beta_estimation(psd, freq, fmin=None, fmax=None, logf_binning_step='max', verbose=False): ''' Estimate the power slope of a 1/f^beta process. Parameters ---------- psd : array the power spectral density freq : array the frequency vector fmin : float the min of frequency range for beta estimation fmax : float the max of frequency range for beta estimation verbose : bool if True, will print out debug information Returns ------- beta : float the estimated slope f_binned : array binned frequency vector psd_binned : array binned power spectral density Y_reg : array prediction based on linear regression ''' # drop the PSD at frequency zero if freq[0] == 0: psd = psd[1:] freq = freq[1:] if fmin is None or fmin == 0: fmin = np.min(freq) if fmax is None: fmax = np.max(freq) Results = collections.namedtuple('Results', ['beta', 'f_binned', 'psd_binned', 'Y_reg', 'std_err']) if np.max(freq) < fmax or np.min(freq) > fmin: if verbose: print(fmin, fmax) print(np.min(freq), np.max(freq)) print('WRONG') res = Results(beta=np.nan, f_binned=np.nan, psd_binned=np.nan, Y_reg=np.nan, std_err=np.nan) return res # frequency binning start fminindx = np.where(freq >= fmin)[0][0] fmaxindx = np.where(freq <= fmax)[0][-1] if fminindx >= fmaxindx: res = Results(beta=np.nan, f_binned=np.nan, psd_binned=np.nan, Y_reg=np.nan, std_err=np.nan) return res logf = np.log(freq) if logf_binning_step == 'max': logf_step = np.max(np.diff(logf)) elif logf_binning_step == 'first': logf_step = logf[fminindx+1] - logf[fminindx] else: raise ValueError('the option for logf_binning_step is unknown') logf_start = logf[fminindx] logf_end = logf[fmaxindx] logf_binedges = np.arange(logf_start, logf_end+logf_step, logf_step) n_intervals = np.size(logf_binedges)-1 logpsd_binned = np.empty(n_intervals) logf_binned = np.empty(n_intervals) logpsd = np.log(psd) for i in range(n_intervals): lb = logf_binedges[i] ub = logf_binedges[i+1] q = np.where((logf > lb) & (logf <= ub)) logpsd_binned[i] = np.nanmean(logpsd[q]) logf_binned[i] = (ub + lb) / 2 f_binned = np.exp(logf_binned) psd_binned = np.exp(logpsd_binned) # frequency binning end # linear regression below Y = np.log10(psd_binned) X = np.log10(f_binned) X_ex = sm.add_constant(X) # note below: 'drop' is used for missing, so NaNs will be removed, and we need to put it back in the end model = sm.OLS(Y, X_ex, missing='drop') results = model.fit() if np.size(results.params) < 2: beta = np.nan Y_reg = np.nan std_err = np.nan else: beta = -results.params[1] # the slope we want Y_reg_raw = 10**model.predict(results.params) # prediction based on linear regression # handeling potential NaNs in psd_binned Y_reg = [] i = 0 for psd in psd_binned: if np.isnan(psd): Y_reg.append(np.nan) else: Y_reg.append(Y_reg_raw[i]) i += 1 Y_reg = np.array(Y_reg) std_err = results.bse[1] res = Results(beta=beta, f_binned=f_binned, psd_binned=psd_binned, Y_reg=Y_reg, std_err=std_err) return res
[docs]def beta2HurstIndex(beta): ''' Translate psd slope to Hurst index Parameters ---------- beta : float the estimated slope of a power spectral density curve Returns ------- H : float Hurst index, should be in (0, 1) References ---------- Equation 2 in http://www.bearcave.com/misl/misl_tech/wavelets/hurst/ ''' H = (beta-1)/2 return H
[docs]def psd_ar(var_noise, freq, ar_params, f_sampling): ''' Return the theoretical power spectral density (PSD) of an autoregressive model Parameters ---------- var_noise : float the variance of the noise of the AR process freq : array vector of frequency ar_params : array autoregressive coefficients, not including zero-lag f_sampling : float sampling frequency Returns ------- psd : array power spectral density ''' p = np.size(ar_params) tmp = np.ndarray(shape=(p, np.size(freq)), dtype=complex) for k in range(p): tmp[k, :] = np.exp(-1j*2*np.pi*(k+1)*freq/f_sampling) psd = var_noise / np.absolute(1-np.sum(ar_params*tmp, axis=0))**2 return psd
[docs]def fBMsim(N=128, H=0.25): '''Simple method to generate fractional Brownian Motion Parameters ---------- N : int the length of the simulated time series H : float Hurst index, should be in (0, 1). The relationship between H and the scaling exponent beta is H = (beta-1) / 2 Returns ------- xfBm : array the simulated fractional Brownian Motion time series References ---------- 1. http://cours-physique.lps.ens.fr/index.php/TD11_Correlated_Noise_2011 2. https://www.wikiwand.com/en/Fractional_Brownian_motion @authors: jeg, fzhu ''' assert isinstance(N, int) and N >= 1 assert H > 0 and H < 1, "H should be in (0, 1)!" HH = 2 * H ns = N-1 # number of steps covariance = np.ones((ns, ns)) for i in range(ns): for j in range(i, ns): x = np.abs(i-j) covariance[i, j] = covariance[j, i] = (np.abs(x-1)**HH + (x+1)**HH - 2*x**HH) / 2. w, v = np.linalg.eig(covariance) A = np.zeros((ns, ns)) for i in range(ns): for j in range(i, ns): A[i, j] = A[j, i] = np.sum(np.sqrt(w) * v[i, :] * v[j, :]) xi = np.random.randn((ns)) eta = np.dot(A, xi) xfBm = np.zeros(N) xfBm[0] = 0 for i in range(1, N): xfBm[i] = xfBm[i-1] + eta[i-1] return xfBm
[docs]def psd_fBM(freq, ts, H): ''' Return the theoretical psd of a fBM Parameters ---------- freq : array vector of frequency ts : array the time axis of the time series H : float Hurst index, should be in (0, 1) Returns -------- psd : array power spectral density References ---------- Flandrin, P. On the spectrum of fractional Brownian motions. IEEE Transactions on Information Theory 35, 197–199 (1989). ''' nf = np.size(freq) psd = np.ndarray(shape=(nf)) T = np.max(ts) - np.min(ts) omega = 2 * np.pi * freq for k in range(nf): tmp = 2 * omega[k] * T psd[k] = (1 - 2**(1 - 2*H)*np.sin(tmp)/tmp) / np.abs(omega[k])**(1 + 2*H) return psd
[docs]def get_wwz_func(nproc, method): ''' Return the wwz function to use. Parameters ---------- nproc : int the number of processes for multiprocessing method : string 'Foster' - the original WWZ method; 'Kirchner' - the method Kirchner adapted from Foster; 'Kirchner_f2py' - the method Kirchner adapted from Foster with f2py 'Kirchner_numba' - Kirchner's algorithm with Numba support for acceleration (default) Returns ------- wwz_func : function the wwz function to use ''' assertPositiveInt(nproc) if method == 'Foster': if nproc == 1: wwz_func = wwz_basic else: wwz_func = wwz_nproc elif method == 'Kirchner': if nproc == 1: wwz_func = kirchner_basic else: wwz_func = kirchner_nproc elif method == 'Kirchner_f2py': wwz_func = kirchner_f2py elif method == 'Kirchner_numba': wwz_func = kirchner_numba else: raise ValueError('Wrong specific method name for WWZ. Should be one of {"Foster", "Kirchner", "Kirchner_f2py", "Kirchner_numba"}') return wwz_func
[docs]def prepare_wwz(ys, ts, freq=None, freq_method='log', freq_kwargs=None, tau=None, len_bd=0, bc_mode='reflect', reflect_type='odd', **kwargs): ''' Return the truncated time series with NaNs deleted and estimate frequency vector and tau Parameters ---------- ys : array a time series, NaNs will be deleted automatically ts : array the time points, if `ys` contains any NaNs, some of the time points will be deleted accordingly freq : array vector of frequency. If None, will be ganerated according to freq_method. may be set. freq_method : str when freq=None, freq will be ganerated according to freq_method freq_kwargs : str used when freq=None for certain methods tau : array The evenly-spaced time points, namely the time shift for wavelet analysis. If the boundaries of tau are not exactly on two of the time axis points, then tau will be adjusted to be so. If None, at most 50 tau points will be generated from the input time span. len_bd : int the number of the ghost grids want to create on each boundary bc_mode : string {'constant', 'edge', 'linear_ramp', 'maximum', 'mean', 'median', 'minimum', 'reflect' , 'symmetric', 'wrap'} For more details, see np.lib.pad() reflect_type : string {‘even’, ‘odd’}, optional Used in ‘reflect’, and ‘symmetric’. The ‘even’ style is the default with an unaltered reflection around the edge value. For the ‘odd’ style, the extented part of the array is created by subtracting the reflected values from two times the edge value. For more details, see np.lib.pad() Returns ------- ys_cut : array the truncated time series with NaNs deleted ts_cut : array the truncated time axis of the original time series with NaNs deleted freq : array vector of frequency tau : array the evenly-spaced time points, namely the time shift for wavelet analysis ''' ys, ts = clean_ts(ys, ts) if tau is None: ntau = np.min([np.size(ts), 50]) tau = np.linspace(np.min(ts), np.max(ts), ntau) elif np.isnan(tau).any(): warnings.warn("The input tau contains some NaNs." + "It will be regenerated using the boundarys of the time axis of the time series with NaNs deleted," + "with the length of the size of the input tau.") tau = np.linspace(np.min(ts), np.max(ts), np.size(tau)) elif np.min(tau) < np.min(ts) and np.max(tau) > np.max(ts): warnings.warn("tau should be within the time span of the time series." + "Note that sometimes if the leading points of the time series are NaNs," + "they will be deleted and cause np.min(tau) < np.min(ts)." + "A new tau with the same size of the input tau will be generated.") tau = np.linspace(np.min(ts), np.max(ts), np.size(tau)) elif np.min(tau) not in ts or np.max(tau) not in ts: warnings.warn("The boundaries of tau are not exactly on two of the time axis points," + "and it will be adjusted to be so.") tau_lb = np.min(ts[ts > np.min(tau)]) tau_ub = np.max(ts[ts < np.max(tau)]) tau = np.linspace(tau_lb, tau_ub, np.size(tau)) # boundary condition if len_bd > 0: dt = np.median(np.diff(ts)) dtau = np.median(np.diff(tau)) len_bd_tau = len_bd*dt//dtau if bc_mode in ['reflect', 'symmetric']: ys = np.lib.pad(ys, (len_bd, len_bd), bc_mode, reflect_type=reflect_type) else: ys = np.lib.pad(ys, (len_bd, len_bd), bc_mode) ts_left_bd = np.linspace(ts[0]-dt*len_bd, ts[0]-dt, len_bd) ts_right_bd = np.linspace(ts[-1]+dt, ts[-1]+dt*len_bd, len_bd) ts = np.concatenate((ts_left_bd, ts, ts_right_bd)) warnings.warn("The tau will be regenerated to fit the boundary condition.") tau_left_bd = np.linspace(tau[0]-dtau*len_bd_tau, tau[0]-dtau, len_bd_tau) tau_right_bd = np.linspace(tau[-1]+dtau, tau[-1]+dtau*len_bd_tau, len_bd_tau) tau = np.concatenate((tau_left_bd, tau, tau_right_bd)) # truncate the time series when the range of tau is smaller than that of the time series ts_cut = ts[(np.min(tau) <= ts) & (ts <= np.max(tau))] ys_cut = ys[(np.min(tau) <= ts) & (ts <= np.max(tau))] if freq is None: freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() freq = make_freq_vector(ts_cut, method=freq_method, **freq_kwargs) # remove 0 in freq vector freq = freq[freq != 0] return ys_cut, ts_cut, freq, tau
[docs]def cross_wt(coeff1, coeff2): ''' Return the cross wavelet transform. Parameters ---------- coeff1 : array the first of two sets of wavelet transform coefficients **in the form of a1 + a2*1j** coeff2 : array the second of two sets of wavelet transform coefficients **in the form of a1 + a2*1j** freq : array vector of frequency tau : array' the evenly-spaced time points, namely the time shift for wavelet analysis Returns ------- xw_amplitude : array the cross wavelet amplitude xw_phase : array the cross wavelet phase References ---------- Grinsted, A., Moore, J. C. & Jevrejeva, S. Application of the cross wavelet transform and wavelet coherence to geophysical time series. Nonlin. Processes Geophys. 11, 561–566 (2004). ''' xwt = coeff1 * np.conj(coeff2) xw_amplitude = np.sqrt(xwt.real**2 + xwt.imag**2) xw_phase = np.arctan2(xwt.imag, xwt.real) return xwt, xw_amplitude, xw_phase
[docs]def wavelet_coherence(coeff1, coeff2, freq, tau, smooth_factor=0.25): ''' Return the cross wavelet coherence. Parameters ---------- coeff1 : array the first of two sets of wavelet transform coefficients **in the form of a1 + a2*1j** coeff2 : array the second of two sets of wavelet transform coefficients **in the form of a1 + a2*1j** freq : array vector of frequency tau : array' the evenly-spaced time points, namely the time shift for wavelet analysis Returns ------- xw_coherence : array the cross wavelet coherence References ---------- 1. Grinsted, A., Moore, J. C. & Jevrejeva, S. Application of the cross wavelet transform and wavelet coherence to geophysical time series. Nonlin. Processes Geophys. 11, 561–566 (2004). 2. Matlab code by Grinsted (https://github.com/grinsted/wavelet-coherence) 3. Python code by Sebastian Krieger (https://github.com/regeirk/pycwt) ''' def rect(length, normalize=False): """ Rectangular function adapted from https://github.com/regeirk/pycwt/blob/master/pycwt/helpers.py Args: length (int): length of the rectangular function normalize (bool): normalize or not Returns: rect (array): the (normalized) rectangular function """ rect = np.zeros(length) rect[0] = rect[-1] = 0.5 rect[1:-1] = 1 if normalize: rect /= rect.sum() return rect def smoothing(coeff, snorm, dj, smooth_factor=smooth_factor): """ Smoothing function adapted from https://github.com/regeirk/pycwt/blob/master/pycwt/helpers.py Args ---- coeff : array the wavelet coefficients get from wavelet transform **in the form of a1 + a2*1j** snorm : array normalized scales dj : float it satisfies the equation [ Sj = S0 * 2**(j*dj) ] Returns ------- rect : array the (normalized) rectangular function """ def fft_kwargs(signal, **kwargs): return {'n': np.int(2 ** np.ceil(np.log2(len(signal))))} W = coeff.transpose() m, n = np.shape(W) # Smooth in time k = 2 * np.pi * fft.fftfreq(fft_kwargs(W[0, :])['n']) k2 = k ** 2 # Notes by Smoothing by Gaussian window (absolute value of wavelet function) # using the convolution theorem: multiplication by Gaussian curve in # Fourier domain for each scale, outer product of scale and frequency F = np.exp(-smooth_factor * (snorm[:, np.newaxis] ** 2) * k2) # Outer product smooth = fft.ifft(F * fft.fft(W, axis=1, **fft_kwargs(W[0, :])), axis=1, # Along Fourier frequencies **fft_kwargs(W[0, :], overwrite_x=True)) T = smooth[:, :n] # Remove possibly padded region due to FFT if np.isreal(W).all(): T = T.real # Smooth in scale wsize = 0.6 / dj * 2 win = rect(np.int(np.round(wsize)), normalize=True) T = signal.convolve2d(T, win[:, np.newaxis], 'same') S = T.transpose() return S xwt = coeff1 * np.conj(coeff2) power1 = np.abs(coeff1)**2 power2 = np.abs(coeff2)**2 scales = 1/freq # `scales` here is the `Period` axis in the wavelet plot dt = np.median(np.diff(tau)) snorm = scales / dt # normalized scales # with WWZ method, we don't have a constant dj, so we will just take the average over the whole scale range N = np.size(scales) s0 = scales[-1] sN = scales[0] dj = np.log2(sN/s0) / N S12 = smoothing(xwt/scales, snorm, dj) S1 = smoothing(power1/scales, snorm, dj) S2 = smoothing(power2/scales, snorm, dj) xw_coherence = np.abs(S12)**2 / (S1*S2) wcs = S12 / (np.sqrt(S1)*np.sqrt(S2)) xw_phase = np.angle(wcs) return xw_coherence, xw_phase
[docs]def reconstruct_ts(coeff, freq, tau, t, len_bd=0): ''' Reconstruct the normalized time series from the wavelet coefficients. Parameters ---------- coeff : array the coefficients of the corresponding basis functions (a0, a1, a2) freq : array vector of frequency of the basis functions tau : array the evenly-spaced time points of the basis functions t : array the specified evenly-spaced time points of the reconstructed time series len_bd : int the number of the ghost grids want to creat on each boundary Returns ------- rec_ts : array the reconstructed normalized time series t : array the evenly-spaced time points of the reconstructed time series ''' omega = 2*np.pi*freq nf = np.size(freq) dt = np.median(np.diff(t)) if len_bd > 0: t_left_bd = np.linspace(t[0]-dt*len_bd, t[0]-dt, len_bd) t_right_bd = np.linspace(t[-1]+dt, t[-1]+dt*len_bd, len_bd) t = np.concatenate((t_left_bd, t, t_right_bd)) ntau = np.size(tau) a_0, a_1, a_2 = coeff rec_ts = np.zeros(np.size(t)) for k in range(nf): for j in range(ntau): if np.isnan(a_0[j, k]) or np.isnan(a_1[j, k]) or np.isnan(a_1[j, k]): continue else: dz = omega[k] * (t - tau[j]) phi_1 = np.cos(dz) phi_2 = np.sin(dz) rec_ts += (a_0[j, k] + a_1[j, k]*phi_1 + a_2[j, k]*phi_2) rec_ts = preprocess(rec_ts, t, detrend=False, gaussianize=False, standardize=False) return rec_ts, t
# ## Methods for Torrence and compo # # This is the main function, which has been rewritten to work with functionalities in Pyleoclim # def cwt(ys,ts,mother='morlet',param=None,freq=None,freq_method='scale', # freq_kwargs={},detrend=False, sg_kwargs={}, gaussianize=False, # standardize=False,pad=False,pad_kwargs={}): # ys=np.array(ys) # ts=np.array(ts) # ys, ts = clean_ts(ys, ts) #clean up time # #make sure that the time series is evenly-spaced # if is_evenly_spaced(ts) == True: # dt = np.mean(np.diff(ts)) # else: # raise ValueError('Time series must be evenly spaced in time') # # prepare the time series # pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, # gaussianize=gaussianize, standardize=standardize) # # Get the fourier factor # if mother.lower() == 'morlet': # if param is None: # param = 6. # fourier_factor = 4 * np.pi / (param + np.sqrt(2 + param**2)) # elif mother.lower() == 'paul': # if param is None: # param = 4. # fourier_factor = 4 * np.pi / (2 * param + 1) # elif mother.lower() == 'dog': # if param is None: # param = 2. # fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * param + 1)) # else: # fourier_factor = 1 # #get the frequency/scale information # if freq is None: # freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() # if freq_method == 'scale': # freq_kwargs.update({'fourier_factor':fourier_factor}) # freq = make_freq_vector(ts, method=freq_method, **freq_kwargs) # # Use scales # scale = np.sort(1/(freq*fourier_factor)) # #Normalize # #n_ys = pd_ys-np.mean(pd_ys) # #pad if wanted # if pad == True: # pad_kwargs = {} if pad_kwargs is None else pad_kwargs.copy() # yp,tp = ts_pad(pd_ys,ts,**pad_kwargs) # else: # yp=pd_ys # tp=ts # # Wave calculation # n = len(yp) # # construct wavenumber array used in transform [Eqn(5)] # kplus = np.arange(1, int(n / 2) + 1) # kplus = (kplus * 2 * np.pi / (n * dt)) # kminus = np.arange(1, int((n - 1) / 2) + 1) # kminus = np.sort((-kminus * 2 * np.pi / (n * dt))) # k = np.concatenate(([0.], kplus, kminus)) # # compute FFT of the (padded) time series # f = np.fft.fft(yp) # # define the wavelet array # wave = np.zeros(shape=(len(scale), n), dtype=complex) # # loop through all scales and compute transform # for a1 in range(0, len(scale)): # daughter, fourier_factor, coi, _ = \ # wave_bases(mother, k, scale[a1], param) # wave[a1, :] = np.fft.ifft(f * daughter) # wavelet transform[Eqn(4)] # #COI # coi = coi * dt * np.concatenate(( # np.insert(np.arange(int((len(ys) + 1) / 2) - 1), [0], [1E-5]), # np.insert(np.flipud(np.arange(0, int(len(ys) / 2) - 1)), [-1], [1E-5]))) # #Remove the padding # if pad == True: # idx = np.in1d(tp,ts) # wave = wave[:,idx] # res = {} # return res # def wave_bases(mother, k, scale, param): # ''' # Parameters # ---------- # mother : string, {} # DESCRIPTION. # k : TYPE # DESCRIPTION. # scale : TYPE # DESCRIPTION. # param : TYPE # DESCRIPTION. # Raises # ------ # KeyError # DESCRIPTION. # Returns # ------- # daughter : TYPE # DESCRIPTION. # fourier_factor : TYPE # DESCRIPTION. # coi : TYPE # DESCRIPTION. # dofmin : TYPE # DESCRIPTION. # ''' # n = len(k) # kplus = np.array(k > 0., dtype=float) # if mother == 'morlet': # ----------------------------------- Morlet # if param == -1: # param = 6. # k0 = np.copy(param) # # calc psi_0(s omega) from Table 1 # expnt = -(scale * k - k0) ** 2 / 2. * kplus # norm = np.sqrt(scale * k[1]) * (np.pi ** (-0.25)) * np.sqrt(n) # daughter = norm * np.exp(expnt) # daughter = daughter * kplus # Heaviside step function # # Scale-->Fourier [Sec.3h] # fourier_factor = (4 * np.pi) / (k0 + np.sqrt(2 + k0 ** 2)) # coi = fourier_factor / np.sqrt(2) # Cone-of-influence [Sec.3g] # dofmin = 2 # Degrees of freedom # elif mother == 'paul': # -------------------------------- Paul # if param == -1: # param = 4. # m = param # # calc psi_0(s omega) from Table 1 # expnt = -scale * k * kplus # norm_bottom = np.sqrt(m * np.prod(np.arange(1, (2 * m)))) # norm = np.sqrt(scale * k[1]) * (2 ** m / norm_bottom) * np.sqrt(n) # daughter = norm * ((scale * k) ** m) * np.exp(expnt) * kplus # fourier_factor = 4 * np.pi / (2 * m + 1) # coi = fourier_factor * np.sqrt(2) # dofmin = 2 # elif mother == 'dog': # -------------------------------- DOG # if param == -1: # param = 2. # m = param # # calc psi_0(s omega) from Table 1 # expnt = -(scale * k) ** 2 / 2.0 # norm = np.sqrt(scale * k[1] / gamma(m + 0.5)) * np.sqrt(n) # daughter = -norm * (1j ** m) * ((scale * k) ** m) * np.exp(expnt) # fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * m + 1)) # coi = fourier_factor / np.sqrt(2) # dofmin = 1 # else: # raise KeyError('Mother must be one of "morlet", "paul", "dog"') # return daughter, fourier_factor, coi, dofmin # def chisquare_inv(P, V): # if (1 - P) < 1E-4: # print('P must be < 0.9999') # if P == 0.95 and V == 2: # this is a no-brainer # X = 5.9915 # return X # MINN = 0.01 # hopefully this is small enough # MAXX = 1 # actually starts at 10 (see while loop below) # X = 1 # TOLERANCE = 1E-4 # this should be accurate enough # while (X + TOLERANCE) >= MAXX: # should only need to loop thru once # MAXX = MAXX * 10. # # this calculates value for X, NORMALIZED by V # X = fminbound(chisquare_solve, MINN, MAXX, args=(P, V), xtol=TOLERANCE) # MINN = MAXX # X = X * V # put back in the goofy V factor # return X # def chisquare_solve(XGUESS, P, V): # PGUESS = gammainc(V / 2, V * XGUESS / 2) # incomplete Gamma function # PDIFF = np.abs(PGUESS - P) # error in calculated P # TOL = 1E-4 # if PGUESS >= 1 - TOL: # if P is very close to 1 (i.e. a bad guess) # PDIFF = XGUESS # then just assign some big number like XGUESS # return PDIFF