#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for wavelet analysis, including Weighted Wavelet Z-transform (WWZ)
and Continuous Wavelet Transform (CWT) à la Torrence & Compo 1998.
Includes multiple options for WWZ (Fortran via f2py, numba, multiprocessing)
and enables cross-wavelet transform with either WWZ or CWT.
Designed for NumPy arrays, either evenly spaced or not (method-dependent)
"""
__all__ = [
'cwt',
'cwt_coherence',
'wwz',
'wwz_coherence',
'angle_stats',
'angle_sig'
]
import numpy as np
from scipy import signal
from pathos.multiprocessing import ProcessingPool as Pool
import numba as nb
from numba.core.errors import NumbaPerformanceWarning
import warnings
import collections
import scipy.fftpack as fft
from scipy import optimize
from scipy.optimize import fminbound
from scipy.special._ufuncs import gamma, gammainc
from .tsutils import preprocess
from .tsbase import (
clean_ts,
is_evenly_spaced)
from .tsmodel import(ar1_fit, ar1_sim)
warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)
#---------------
#Wrapper functions
#---------------
#----------------
#Main Functions
#----------------
[docs]class AliasFilter(object):
'''Performing anti-alias filter on a psd
experimental: Use at your own risk
@author: fzhu
'''
[docs] def alias_filter(self, freq, pwr, fs, fc, f_limit, avgs):
''' The anti-aliasing filter
Parameters
----------
freq : array
vector of frequencies in power spectrum
pwr : array
vector of spectral power corresponding to frequencies "freq"
fs : float
sampling frequency
fc : float
corner frequency for 1/f^2 steepening of power spectrum
f_limit : float
lower frequency limit for estimating misfit of model-plus-alias spectrum vs. measured power
avgs : int
flag for whether spectrum is derived from instantaneous point measurements (avgs<>1)
OR from measurements averaged over each sampling interval (avgs==1)
Returns
-------
alpha : float
best-fit exponent of power-law model
filtered_pwr : array
vector of alias-filtered spectral power
model_pwr : array
vector of modeled spectral power
aliased_pwr : array
vector of modeled spectral power, plus aliases
References
----------
Kirchner, J. W. Aliasing in 1/f(alpha) noise spectra: origins, consequences, and remedies.
Phys Rev E Stat Nonlin Soft Matter Phys 71, 66110 (2005).
'''
log_pwr = np.log(pwr)
freq_mask = (freq > f_limit)*1 # convert True & False to 1 & 0
alpha_upper_bound = 5
if avgs == 1:
alpha_lower_bound = -2.9 # if measurements are time-averaged
else:
alpha_lower_bound = -0.9 # if measurements are point samples
alpha = optimize.fminbound(self.misfit, alpha_lower_bound, alpha_upper_bound,
args=(fs, fc, freq, log_pwr, freq_mask, avgs), xtol=1e-4)
model_pwr, aliased_pwr, RMSE = self.alias(alpha, fs, fc, freq, log_pwr, freq_mask, avgs)
filtered_pwr = pwr * model_pwr / aliased_pwr
return alpha, filtered_pwr, model_pwr, aliased_pwr
def misfit(self, alpha, fs, fc, freq, log_pwr, freq_mask, avgs):
model, aliased_pwr, RMSE = self.alias(alpha, fs, fc, freq, log_pwr, freq_mask, avgs)
return RMSE
def alias(self, alpha, fs, fc, freq, log_pwr, freq_mask, avgs):
model_pwr = self.model(alpha, fs, fc, freq, avgs)
aliased_pwr = np.copy(model_pwr)
if avgs == 1:
aliased_pwr = aliased_pwr * np.sinc(freq/fs) ** 2
for k in range(1, 11):
alias_minus = self.model(alpha, fs, fc, k*fs-freq, avgs)
if avgs == 1:
alias_minus = alias_minus * np.sinc((k*fs-freq)/fs) ** 2
aliased_pwr = aliased_pwr + alias_minus
alias_plus = self.model(alpha, fs, fc, k*fs+freq, avgs) # notice the + in (k*fs+freq)
if avgs == 1:
alias_plus = alias_plus * np.sinc((k*fs+freq)/fs) ** 2
aliased_pwr = aliased_pwr + alias_plus
if avgs == 1:
beta = alpha + 3
const = 1 / (2*np.pi**2*beta/fs)
else:
beta = alpha + 1
const = 1 / (beta*fs)
zo_minus = (11*fs-freq)**(-beta)
dz_minus = zo_minus / 20
for j in range(1, 21):
aliased_pwr = aliased_pwr + const / ((j*dz_minus)**(2/beta) + 1/fc**2)*dz_minus
zo_plus = (11*fs+freq)**(-beta)
dz_plus = zo_plus / 20
for j in range(1, 21):
aliased_pwr = aliased_pwr + const / ((j*dz_plus)**(2/beta) + 1/fc**2)*dz_plus
log_aliased = np.log(aliased_pwr)
prefactor = np.sum((log_pwr - log_aliased) * freq_mask) / np.sum(freq_mask)
log_aliased = log_aliased + prefactor
aliased_pwr = aliased_pwr * np.exp(prefactor)
model_pwr = model_pwr * np.exp(prefactor)
RMSE = np.sqrt(np.sum((log_aliased-log_pwr)*(log_aliased-log_pwr)*freq_mask)) / np.sum(freq_mask)
return model_pwr, aliased_pwr, RMSE
def model(self, alpha, fs, fc, freq, avgs):
spectr = freq**(-alpha) / (1 + (freq/fc)**2)
return spectr
# def cwt(ys,ts,scales,wavelet='morl',sampling_period=1.0,method='conv',axis=-1):
# '''Continous wavelet transform for evenly spaced data
# pywavelet documentation: https://pywavelets.readthedocs.io/en/latest/ref/cwt.html
# Parameters
# ----------
# ys : array
# signal
# ts : array
# time
# scales : array (float)
# different wavelet scales to use
# wavelet : str
# types of wavelet options in function documentation link. The default is 'morl' for a morlet wavelet.
# sampling_period : float, optional
# sampling period for frequencies output. The default is 1.0.
# method : str, optional
# cwt computation method. 'conv','fft'. or 'auto' The default is 'conv'.
# axis : int, optional
# axis over which to compute cwt. The default is -1, the last axis.
# Returns
# -------
# res : dictionary
# 'freq' - array(float)
# frequencies
# 'time' - array(float)
# 'amplitude' - array(float)
# 'coi' - array(float)
# cone of inference
# '''
# coeff,freq=pywt.cwt(data=ys,scales=scales,wavelet=wavelet,sampling_period=sampling_period,method=method,axis=axis)
# amplitude=abs(coeff).T
# if wavelet=='morl' or wavelet[:4]=='cmor':
# coi=make_coi(tau=ts,Neff=6)
# else:
# coi=make_coi(tau=ts)
# Results = collections.namedtuple('Results', ['amplitude','coi', 'freq', 'time', 'coeff'])
# res = Results(amplitude=amplitude, coi=coi, freq=freq, time=ts, coeff=coeff)
# return res
[docs]def assertPositiveInt(*args):
''' Assert that the arguments are all integers larger than unity
Parameters
----------
args
'''
for arg in args:
assert isinstance(arg, int) and arg >= 1
[docs]def wwz_basic(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, nproc=1, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False):
''' Return the weighted wavelet amplitude (WWA).
The Weighted wavelet Z-transform (WWZ) is based on Morlet wavelet estimation, using
least squares minimization to suppress the energy leakage caused by data gaps.
WWZ does not rely on interpolation or detrending, and is appropriate for unevenly-spaced datasets.
In particular, we use the variant of Kirchner & Neal (2013), in which basis rotations mitigate the
numerical instability that occurs in pathological cases with the original algorithm (Foster, 1996).
The WWZ method has one adjustable parameter, a decay constant `c` that balances the time and frequency
resolutions of the analysis. This application uses the larger value 1/(8π^2), justified elsewhere
(Witt & Schumann, 2005).
No multiprocessing is applied by Default.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom
nproc :int
fake argument, just for convenience
detrend : string
None - the original time series is assumed to have no trend (default);
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa : array
the weighted wavelet amplitude
phase : array
the weighted wavelet phase
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients (a0, a1, a2)
References
----------
- Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996).
- Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores. Nonlinear Processes in Geophysics 12, 345–352 (2005).
- Kirchner, J. W. and Neal, C. (2013). Universal fractal scaling in stream chemistry and its implications for solute transport and water quality trend detection. Proc Natl Acad Sci USA 110:12213–12218.
See also
--------
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.tsutils.detrend : detrending functionalities using 4 possible methods
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
pyleoclim.utils.tsutils.standardize: Centers and normalizes a given time series.
'''
assert nproc == 1, "wwz_basic() only supports nproc=1"
assertPositiveInt(Neff_threshold)
nt = np.size(tau)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs = np.ndarray(shape=(nt, nf))
ywave_1 = np.ndarray(shape=(nt, nf))
ywave_2 = np.ndarray(shape=(nt, nf))
ywave_3 = np.ndarray(shape=(nt, nf))
S = np.zeros(shape=(3, 3))
for k in range(nf):
for j in range(nt):
dz = omega[k] * (ts - tau[j])
weights = np.exp(-c*dz**2)
sum_w = np.sum(weights)
Neffs[j, k] = sum_w**2 / np.sum(weights**2) # local number of effective dof
if Neffs[j, k] <= Neff_threshold:
ywave_1[j, k] = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
ywave_2[j, k] = np.nan
ywave_3[j, k] = np.nan
else:
phi2 = np.cos(dz)
phi3 = np.sin(dz)
S[0, 0] = 1
S[1, 1] = np.sum(weights*phi2*phi2) / sum_w
S[2, 2] = np.sum(weights*phi3*phi3) / sum_w
S[1, 0] = S[0, 1] = np.sum(weights*phi2) / sum_w
S[2, 0] = S[0, 2] = np.sum(weights*phi3) / sum_w
S[2, 1] = S[1, 2] = np.sum(weights*phi2*phi3) / sum_w
S_inv = np.linalg.pinv(S)
weighted_phi1 = np.sum(weights*pd_ys) / sum_w
weighted_phi2 = np.sum(weights*phi2*pd_ys) / sum_w
weighted_phi3 = np.sum(weights*phi3*pd_ys) / sum_w
ywave_1[j, k] = S_inv[0, 0]*weighted_phi1 + S_inv[0, 1]*weighted_phi2 + S_inv[0, 2]*weighted_phi3
ywave_2[j, k] = S_inv[1, 0]*weighted_phi1 + S_inv[1, 1]*weighted_phi2 + S_inv[1, 2]*weighted_phi3
ywave_3[j, k] = S_inv[2, 0]*weighted_phi1 + S_inv[2, 1]*weighted_phi2 + S_inv[2, 2]*weighted_phi3
wwa = np.sqrt(ywave_2**2 + ywave_3**2)
phase = np.arctan2(ywave_3, ywave_2)
# coeff = ywave_2 + ywave_3*1j
coeff = (ywave_1, ywave_2, ywave_3)
return wwa, phase, Neffs, coeff
[docs]def wwz_nproc(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, nproc=8, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False):
''' Return the weighted wavelet amplitude (WWA).
Original method from Foster (1996). Supports multiprocessing.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom [default = 3]
nproc : int
the number of processes for multiprocessing [default = 8]
detrend : string
False - the original time series is assumed to have no trend;
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa : array
the weighted wavelet amplitude
phase : array
the weighted wavelet phase
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients (a0, a1, a2)
See also
--------
pyleoclim.utils.wavelet.wwz_basic : weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
'''
assert nproc >= 2, "wwz_nproc() should use nproc >= 2, if want serial run, please use wwz_basic()"
assertPositiveInt(Neff_threshold)
nt = np.size(tau)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs = np.ndarray(shape=(nt, nf))
ywave_1 = np.ndarray(shape=(nt, nf))
ywave_2 = np.ndarray(shape=(nt, nf))
ywave_3 = np.ndarray(shape=(nt, nf))
def wwa_1g(tau, omega):
dz = omega * (ts - tau)
weights = np.exp(-c*dz**2)
sum_w = np.sum(weights)
Neff_loc = sum_w**2 / np.sum(weights**2)
S = np.zeros(shape=(3, 3))
if Neff_loc <= Neff_threshold:
ywave_2_1g = np.nan
ywave_3_1g = np.nan
else:
phi2 = np.cos(dz)
phi3 = np.sin(dz)
S[0, 0] = 1
S[1, 1] = np.sum(weights*phi2*phi2) / sum_w
S[2, 2] = np.sum(weights*phi3*phi3) / sum_w
S[1, 0] = S[0, 1] = np.sum(weights*phi2) / sum_w
S[2, 0] = S[0, 2] = np.sum(weights*phi3) / sum_w
S[2, 1] = S[1, 2] = np.sum(weights*phi2*phi3) / sum_w
S_inv = np.linalg.pinv(S)
weighted_phi1 = np.sum(weights*pd_ys) / sum_w
weighted_phi2 = np.sum(weights*phi2*pd_ys) / sum_w
weighted_phi3 = np.sum(weights*phi3*pd_ys) / sum_w
ywave_1_1g = S_inv[0, 0]*weighted_phi1 + S_inv[0, 1]*weighted_phi2 + S_inv[0, 2]*weighted_phi3
ywave_2_1g = S_inv[1, 0]*weighted_phi1 + S_inv[1, 1]*weighted_phi2 + S_inv[1, 2]*weighted_phi3
ywave_3_1g = S_inv[2, 0]*weighted_phi1 + S_inv[2, 1]*weighted_phi2 + S_inv[2, 2]*weighted_phi3
return Neff_loc, ywave_1_1g, ywave_2_1g, ywave_3_1g
tf_mesh = np.meshgrid(tau, omega)
list_of_grids = list(zip(*(grid.flat for grid in tf_mesh)))
tau_grids, omega_grids = zip(*list_of_grids)
with Pool(nproc) as pool:
res = pool.map(wwa_1g, tau_grids, omega_grids)
res_array = np.asarray(res)
Neffs = res_array[:, 0].reshape((np.size(omega), np.size(tau))).T
ywave_1 = res_array[:, 1].reshape((np.size(omega), np.size(tau))).T
ywave_2 = res_array[:, 2].reshape((np.size(omega), np.size(tau))).T
ywave_3 = res_array[:, 3].reshape((np.size(omega), np.size(tau))).T
wwa = np.sqrt(ywave_2**2 + ywave_3**2)
phase = np.arctan2(ywave_3, ywave_2)
# coeff = ywave_2 + ywave_3*1j
coeff = (ywave_1, ywave_2, ywave_3)
return wwa, phase, Neffs, coeff
[docs]def kirchner_basic(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, nproc=1, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False):
''' Return the weighted wavelet amplitude (WWA) modified by Kirchner.
Method modified by Kirchner. No multiprocessing.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom
nproc : int
fake argument for convenience, for parameter consistency between functions, does not need to be specified
detrend : string
None - the original time series is assumed to have no trend;
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa : array
the weighted wavelet amplitude
phase : array
the weighted wavelet phase
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients (a0, a1, a2)
References
----------
- Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996).
- Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores. Nonlinear Processes in Geophysics 12, 345–352 (2005).
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
'''
assert nproc == 1, "wwz_basic() only supports nproc=1"
assertPositiveInt(Neff_threshold)
nt = np.size(tau)
nts = np.size(ts)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs = np.ndarray(shape=(nt, nf))
a0 = np.ndarray(shape=(nt, nf))
a1 = np.ndarray(shape=(nt, nf))
a2 = np.ndarray(shape=(nt, nf))
for k in range(nf):
for j in range(nt):
dz = omega[k] * (ts - tau[j])
weights = np.exp(-c*dz**2)
sum_w = np.sum(weights)
Neffs[j, k] = sum_w**2 / np.sum(weights**2) # local number of effective dof
if Neffs[j, k] <= Neff_threshold:
a0[j, k] = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
a1[j, k] = np.nan
a2[j, k] = np.nan
else:
def w_prod(xs, ys):
return np.sum(weights*xs*ys) / sum_w
sin_basis = np.sin(omega[k]*ts)
cos_basis = np.cos(omega[k]*ts)
one_v = np.ones(nts)
sin_one = w_prod(sin_basis, one_v)
cos_one = w_prod(cos_basis, one_v)
sin_cos = w_prod(sin_basis, cos_basis)
sin_sin = w_prod(sin_basis, sin_basis)
cos_cos = w_prod(cos_basis, cos_basis)
numerator = 2 * (sin_cos - sin_one * cos_one)
denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2)
time_shift = np.arctan2(numerator, denominator) / (2*omega[k]) # Eq. (S5)
sin_shift = np.sin(omega[k]*(ts - time_shift))
cos_shift = np.cos(omega[k]*(ts - time_shift))
sin_tau_center = np.sin(omega[k]*(time_shift - tau[j]))
cos_tau_center = np.cos(omega[k]*(time_shift - tau[j]))
ys_cos_shift = w_prod(pd_ys, cos_shift)
ys_sin_shift = w_prod(pd_ys, sin_shift)
ys_one = w_prod(pd_ys, one_v)
cos_shift_one = w_prod(cos_shift, one_v)
sin_shift_one = w_prod(sin_shift, one_v)
A = 2*(ys_cos_shift-ys_one*cos_shift_one)
B = 2*(ys_sin_shift-ys_one*sin_shift_one)
a0[j, k] = ys_one
a1[j, k] = cos_tau_center*A - sin_tau_center*B # Eq. (S6)
a2[j, k] = sin_tau_center*A + cos_tau_center*B # Eq. (S7)
wwa = np.sqrt(a1**2 + a2**2)
phase = np.arctan2(a2, a1)
# coeff = a1 + a2*1j
coeff = (a0, a1, a2)
return wwa, phase, Neffs, coeff
[docs]def kirchner_nproc(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, nproc=8, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False):
''' Return the weighted wavelet amplitude (WWA) modified by Kirchner.
Method modified by kirchner. Supports multiprocessing.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom
nproc : int
the number of processes for multiprocessing
detrend : string
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa (array): the weighted wavelet amplitude
phase (array): the weighted wavelet phase
Neffs (array): the matrix of effective number of points in the time-scale coordinates
coeff (array): the wavelet transform coefficients (a0, a1, a2)
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
'''
assert nproc >= 2, "wwz_nproc() should use nproc >= 2, if want serial run, please use wwz_basic()"
assertPositiveInt(Neff_threshold)
nt = np.size(tau)
nts = np.size(ts)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs = np.ndarray(shape=(nt, nf))
a0 = np.ndarray(shape=(nt, nf))
a1 = np.ndarray(shape=(nt, nf))
a2 = np.ndarray(shape=(nt, nf))
def wwa_1g(tau, omega):
dz = omega * (ts - tau)
weights = np.exp(-c*dz**2)
sum_w = np.sum(weights)
Neff_loc = sum_w**2 / np.sum(weights**2)
if Neff_loc <= Neff_threshold:
a0_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
a1_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
a2_1g = np.nan
else:
def w_prod(xs, ys):
return np.sum(weights*xs*ys) / sum_w
sin_basis = np.sin(omega*ts)
cos_basis = np.cos(omega*ts)
one_v = np.ones(nts)
sin_one = w_prod(sin_basis, one_v)
cos_one = w_prod(cos_basis, one_v)
sin_cos = w_prod(sin_basis, cos_basis)
sin_sin = w_prod(sin_basis, sin_basis)
cos_cos = w_prod(cos_basis, cos_basis)
numerator = 2*(sin_cos - sin_one*cos_one)
denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2)
time_shift = np.arctan2(numerator, denominator) / (2*omega) # Eq. (S5)
sin_shift = np.sin(omega*(ts - time_shift))
cos_shift = np.cos(omega*(ts - time_shift))
sin_tau_center = np.sin(omega*(time_shift - tau))
cos_tau_center = np.cos(omega*(time_shift - tau))
ys_cos_shift = w_prod(pd_ys, cos_shift)
ys_sin_shift = w_prod(pd_ys, sin_shift)
ys_one = w_prod(pd_ys, one_v)
cos_shift_one = w_prod(cos_shift, one_v)
sin_shift_one = w_prod(sin_shift, one_v)
A = 2*(ys_cos_shift - ys_one*cos_shift_one)
B = 2*(ys_sin_shift - ys_one*sin_shift_one)
a0_1g = ys_one
a1_1g = cos_tau_center*A - sin_tau_center*B # Eq. (S6)
a2_1g = sin_tau_center*A + cos_tau_center*B # Eq. (S7)
return Neff_loc, a0_1g, a1_1g, a2_1g
tf_mesh = np.meshgrid(tau, omega)
list_of_grids = list(zip(*(grid.flat for grid in tf_mesh)))
tau_grids, omega_grids = zip(*list_of_grids)
with Pool(nproc) as pool:
res = pool.map(wwa_1g, tau_grids, omega_grids)
res_array = np.asarray(res)
Neffs = res_array[:, 0].reshape((np.size(omega), np.size(tau))).T
a0 = res_array[:, 1].reshape((np.size(omega), np.size(tau))).T
a1 = res_array[:, 2].reshape((np.size(omega), np.size(tau))).T
a2 = res_array[:, 3].reshape((np.size(omega), np.size(tau))).T
wwa = np.sqrt(a1**2 + a2**2)
phase = np.arctan2(a2, a1)
# coeff = a1 + a2*1j
coeff = (a0, a1, a2)
return wwa, phase, Neffs, coeff
[docs]def kirchner_numba(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False, nproc=1):
''' Return the weighted wavelet amplitude (WWA) modified by Kirchner.
Using numba.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom
nproc : int
fake argument, just for convenience
detrend : string
None - the original time series is assumed to have no trend;
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa : array
the weighted wavelet amplitude
phase : array
the weighted wavelet phase
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients (a0, a1, a2)
References
----------
Foster, G. Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal 112, 1709 (1996).
Witt, A. & Schumann, A. Y. Holocene climate variability on millennial scales recorded in Greenland ice cores.
Nonlinear Processes in Geophysics 12, 345–352 (2005).
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
'''
assertPositiveInt(Neff_threshold)
nt = np.size(tau)
nts = np.size(ts)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs = np.ndarray(shape=(nt, nf))
a0 = np.ndarray(shape=(nt, nf))
a1 = np.ndarray(shape=(nt, nf))
a2 = np.ndarray(shape=(nt, nf))
@nb.jit(nopython=True, parallel=True, fastmath=True)
def loop_over(nf, nt, Neffs, a0, a1, a2):
def wwa_1g(tau, omega):
dz = omega * (ts - tau)
weights = np.exp(-c*dz**2)
sum_w = np.sum(weights)
Neff_loc = sum_w**2 / np.sum(weights**2)
if Neff_loc <= Neff_threshold:
a0_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
a1_1g = np.nan # the coefficients cannot be estimated reliably when Neff_loc <= Neff_threshold
a2_1g = np.nan
else:
def w_prod(xs, ys):
return np.sum(weights*xs*ys) / sum_w
sin_basis = np.sin(omega*ts)
cos_basis = np.cos(omega*ts)
one_v = np.ones(nts)
sin_one = w_prod(sin_basis, one_v)
cos_one = w_prod(cos_basis, one_v)
sin_cos = w_prod(sin_basis, cos_basis)
sin_sin = w_prod(sin_basis, sin_basis)
cos_cos = w_prod(cos_basis, cos_basis)
numerator = 2*(sin_cos - sin_one*cos_one)
denominator = (cos_cos - cos_one**2) - (sin_sin - sin_one**2)
time_shift = np.arctan2(numerator, denominator) / (2*omega) # Eq. (S5)
sin_shift = np.sin(omega*(ts - time_shift))
cos_shift = np.cos(omega*(ts - time_shift))
sin_tau_center = np.sin(omega*(time_shift - tau))
cos_tau_center = np.cos(omega*(time_shift - tau))
ys_cos_shift = w_prod(pd_ys, cos_shift)
ys_sin_shift = w_prod(pd_ys, sin_shift)
ys_one = w_prod(pd_ys, one_v)
cos_shift_one = w_prod(cos_shift, one_v)
sin_shift_one = w_prod(sin_shift, one_v)
A = 2*(ys_cos_shift - ys_one*cos_shift_one)
B = 2*(ys_sin_shift - ys_one*sin_shift_one)
a0_1g = ys_one
a1_1g = cos_tau_center*A - sin_tau_center*B # Eq. (S6)
a2_1g = sin_tau_center*A + cos_tau_center*B # Eq. (S7)
return Neff_loc, a0_1g, a1_1g, a2_1g
for k in nb.prange(nf):
for j in nb.prange(nt):
Neffs[j, k], a0[j, k], a1[j, k], a2[j, k] = wwa_1g(tau[j], omega[k])
return Neffs, a0, a1, a2
Neffs, a0, a1, a2 = loop_over(nf, nt, Neffs, a0, a1, a2)
wwa = np.sqrt(a1**2 + a2**2)
phase = np.arctan2(a2, a1)
# coeff = a1 + a2*1j
coeff = (a0, a1, a2)
return wwa, phase, Neffs, coeff
[docs]def kirchner_f2py(ys, ts, freq, tau, c=1/(8*np.pi**2), Neff_threshold=3, nproc=8, detrend=False, sg_kwargs=None,
gaussianize=False, standardize=False):
''' Returns the weighted wavelet amplitude (WWA) modified by Kirchner.
Fastest method. Calls Fortran libraries.
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
the threshold of the number of effective degrees of freedom
nproc : int
fake argument, just for convenience
detrend : string
None - the original time series is assumed to have no trend;
Types of detrending:
- "linear" : the result of a linear least-squares fit to y is subtracted from y.
- "constant" : only the mean of data is subtracted.
- "savitzky-golay" : y is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
- "emd" : Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
Returns
-------
wwa : array
the weighted wavelet amplitude
phase : array
the weighted wavelet phase
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients (a0, a1, a2)
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.tsutils.preprocess : pre-processes a times series using Gaussianization and detrending.
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
'''
from . import f2py_wwz as f2py
assertPositiveInt(Neff_threshold, nproc)
nt = np.size(tau)
nts = np.size(ts)
nf = np.size(freq)
pd_ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs, gaussianize=gaussianize, standardize=standardize)
omega = make_omega(ts, freq)
Neffs, a0, a1, a2 = f2py.f2py_wwz.wwa(tau, omega, c, Neff_threshold, ts, pd_ys, nproc, nts, nt, nf)
undef = -99999.
a0[a0 == undef] = np.nan
a1[a1 == undef] = np.nan
a2[a2 == undef] = np.nan
wwa = np.sqrt(a1**2 + a2**2)
phase = np.arctan2(a2, a1)
# coeff = a1 + a2*1j
coeff = (a0, a1, a2)
return wwa, phase, Neffs, coeff
[docs]def make_coi(tau, Neff_threshold=3):
''' Return the cone of influence.
Parameters
----------
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
Neff_threshold : int
the threshold of the number of effective samples (default: 3)
Returns
-------
coi : array
cone of influence
References
----------
wave_signif() in http://paos.colorado.edu/research/wavelets/wave_python/waveletFunctions.py
'''
assert isinstance(Neff_threshold, int) and Neff_threshold >= 1
nt = np.size(tau)
fourier_factor = 4*np.pi / (Neff_threshold+np.sqrt(2+Neff_threshold**2))
coi_const = fourier_factor / np.sqrt(2)
dt = np.median(np.diff(tau))
nt_half = (nt+1)//2 - 1
A = np.append(0.00001, np.arange(nt_half)+1)
B = A[::-1]
if nt % 2 == 0:
C = np.append(A, B)
else:
C = np.append(A, B[1:])
coi = coi_const * dt * C
return coi
[docs]def make_omega(ts, freq):
''' Return the angular frequency based on the time axis and given frequency vector
Parameters
----------
ys : array
a time series
ts : array
time axis of the time series
freq : array
vector of frequency
Returns
-------
omega : array
the angular frequency vector
'''
# for the frequency band larger than f_Nyquist, the wwa will be marked as NaNs
f_Nyquist = 0.5 / np.median(np.diff(ts))
freq_with_nan = np.copy(freq)
freq_with_nan[freq > f_Nyquist] = np.nan
omega = 2*np.pi*freq_with_nan
return omega
[docs]def wwa2psd(wwa, ts, Neffs, freq=None, Neff_threshold=3, anti_alias=False, avgs=2):
""" Return the power spectral density (PSD) using the weighted wavelet amplitude (WWA).
Parameters
----------
wwa : array
the weighted wavelet amplitude.
ts : array
the time points, should be pre-truncated so that the span is exactly what is used for wwz
Neffs : array
the matrix of effective number of points in the time-scale coordinates obtained from wwz
freq : array
vector of frequency from wwz
Neff_threshold : int
the threshold of the number of effective samples
anti_alias : bool
whether to apply anti-alias filter
avgs : int
flag for whether spectrum is derived from instantaneous point measurements (avgs<>1) OR from measurements averaged over each sampling interval (avgs==1)
Returns
-------
psd : array
power spectral density
References
----------
Kirchner's C code for weighted psd calculation (see https://www.pnas.org/doi/full/10.1073/pnas.1304328110#supplementary-materials)
"""
af = AliasFilter()
# weighted psd calculation start
power = wwa**2 * 0.5 * (np.max(ts)-np.min(ts))/np.size(ts) * Neffs
Neff_diff = Neffs - Neff_threshold
Neff_diff[Neff_diff < 0] = 0
sum_power = np.nansum(power * Neff_diff, axis=0)
sum_eff = np.nansum(Neff_diff, axis=0)
psd = sum_power / sum_eff
# weighted psd calculation end
if anti_alias:
assert freq is not None, "freq is required for alias filter!"
dt = np.median(np.diff(ts))
f_sampling = 1/dt
psd_copy = psd[1:]
freq_copy = freq[1:]
alpha, filtered_pwr, model_pwer, aliased_pwr = af.alias_filter(
freq_copy, psd_copy, f_sampling, f_sampling*1e3, np.min(freq), avgs)
psd[1:] = np.copy(filtered_pwr)
return psd
[docs]def wwz(ys, ts, tau=None, ntau=None, freq=None, freq_method='log',
freq_kwargs={}, c=1/(8*np.pi**2), Neff_threshold=3, Neff_coi=3,
nproc=8, detrend=False, sg_kwargs=None, method='Kirchner_numba',
gaussianize=False, standardize=True, len_bd=0,
bc_mode='reflect', reflect_type='odd'):
''' Weighted wavelet Z transform (WWZ) for unevenly-spaced data
Parameters
----------
ys : array
a time series, NaNs will be deleted automatically
ts : array
the time points, if `ys` contains any NaNs, some of the time points will be deleted accordingly
tau : array
the evenly-spaced time vector for the analysis, namely the time shift for wavelet analysis
freq : array
vector of frequency
freq_method : str
Method to generate the frequency vector if not set directly. The following options are avialable:
- 'log' (default)
- 'lomb_scargle'
- 'welch'
- 'scale'
- 'nfft'
See :func:`pyleoclim.utils.wavelet.make_freq_vector()` for details
freq_kwargs : str
used when freq=None for certain methods
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
threshold for the effective number of points
nproc : int
the number of processes for multiprocessing
detrend : string, {None, 'linear', 'constant', 'savitzy-golay'}
Available methods for detrending, including
- None: the original time series is assumed to have no trend;
- 'linear': a linear least-squares fit to `ys` is subtracted;
- 'constant': the mean of `ys` is subtracted
- 'savitzy-golay': ys is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. See :func:`pyleoclim.utils.filter.savitzky_golay()` for details.
method : string, {'Foster', 'Kirchner', 'Kirchner_f2py', 'Kirchner_numba'}
Available specific implementation of WWZ, including
- 'Foster': the original WWZ method;
- 'Kirchner': the method Kirchner adapted from Foster;
- 'Kirchner_f2py': the method Kirchner adapted from Foster, implemented with f2py for acceleration;
- 'Kirchner_numba': the method Kirchner adapted from Foster, implemented with Numba for acceleration (default);
len_bd : int
the number of the ghost grids want to creat on each boundary
bc_mode : string, {'constant', 'edge', 'linear_ramp', 'maximum', 'mean', 'median', 'minimum', 'reflect' , 'symmetric', 'wrap'}
For more details, see np.lib.pad()
reflect_type : string, optional, {‘even’, ‘odd’}
Used in ‘reflect’, and ‘symmetric’. The ‘even’ style is the default with an unaltered reflection around the edge value.
For the ‘odd’ style, the extented part of the array is created by subtracting the reflected values from two times the edge value.
For more details, see np.lib.pad()
Returns
-------
res : namedtuple
a namedtuple that includes below items
wwa : array
the weighted wavelet amplitude.
coi : array
cone of influence
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
Neffs : array
the matrix of effective number of points in the time-scale coordinates
coeff : array
the wavelet transform coefficients
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
pyleoclim.utils.tsutils.preprocess: pre-processes a times series using Gaussianization and detrending.
Examples
--------
We perform an ideal test below.
We use a sine wave with a period of 50 yrs as the signal for test.
Then performing wavelet analysis should return an energy band around period of 50 yrs in the scalogram.
.. jupyter-execute::
from pyleoclim import utils
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter
import numpy as np
# Create a signal
time = np.arange(2001)
f = 1/50 # the period is then 1/f = 50
signal = np.cos(2*np.pi*f*time)
# Wavelet Analysis
res = utils.wwz(signal, time)
# Visualization
fig, ax = plt.subplots()
contourf_args = {'cmap': 'magma', 'origin': 'lower', 'levels': 11}
cbar_args = {'drawedges': False, 'orientation': 'vertical', 'fraction': 0.15, 'pad': 0.05}
cont = ax.contourf(res.time, 1/res.freq, res.amplitude.T, **contourf_args)
ax.plot(res.time, res.coi, 'k--') # plot the cone of influence
ax.set_yscale('log')
ax.set_yticks([2, 5, 10, 20, 50, 100, 200, 500, 1000])
ax.set_ylim([2, 1000])
ax.yaxis.set_major_formatter(ScalarFormatter())
ax.yaxis.set_major_formatter(FormatStrFormatter('%g'))
ax.set_xlabel('Time (yr)')
ax.set_ylabel('Period (yrs)')
cb = plt.colorbar(cont, **cbar_args)
'''
#assert isinstance(nMC, int) and nMC >= 0, "nMC should be larger than or equal to 0."
if standardize == True:
warnings.warn('Standardizing the timeseries')
ys_cut, ts_cut, freq, tau = prepare_wwz(
ys, ts, freq=freq, freq_method=freq_method, freq_kwargs=freq_kwargs,
tau=tau, len_bd=len_bd,
bc_mode=bc_mode, reflect_type=reflect_type
)
wwz_func = get_wwz_func(nproc, method)
wwa, phase, Neffs, coeff = wwz_func(ys_cut, ts_cut, freq, tau, Neff_threshold=Neff_threshold, c=c, nproc=nproc,
detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize)
# calculate the cone of influence
coi = make_coi(tau, Neff_threshold=Neff_coi)
# define `scale` as the `Period` axis for the scalogram
scale = 1/freq
# export
Results = collections.namedtuple('Results', ['amplitude', 'phase', 'coi', 'freq', 'time', 'Neffs', 'coeff', 'scale'])
res = Results(amplitude=wwa, phase=phase, coi=coi, freq=freq, time=tau, Neffs=Neffs, coeff=coeff, scale = scale)
return res
[docs]def wwz_coherence(y1, t1, y2, t2, smooth_factor=0.25,
tau=None, freq=None, freq_method='log', freq_kwargs=None,
c=1/(8*np.pi**2), Neff_threshold=3, nproc=8, detrend=False, sg_kwargs=None,
verbose=False, method='Kirchner_numba',
gaussianize=False, standardize=True):
''' Returns the wavelet coherence of two time series (WWZ method).
Parameters
----------
y1 : array
first of two time series
y2 : array
second of the two time series
t1 : array
time axis of first time series
t2 : array
time axis of the second time series
tau : array
the evenly-spaced time points
freq : array
vector of frequency
c : float
the decay constant that determines the analytical resolution of frequency for analysis, the smaller the higher resolution;
the default value 1/(8*np.pi**2) is good for most of the wavelet analysis cases
Neff_threshold : int
threshold for the effective number of points
nproc : int
the number of processes for multiprocessing
detrend : string
- None: the original time series is assumed to have no trend;
- 'linear': a linear least-squares fit to `ys` is subtracted;
- 'constant': the mean of `ys` is subtracted
- 'savitzy-golay': ys is filtered using the Savitzky-Golay filter and the resulting filtered series is subtracted from y.
Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
sg_kwargs : dict
The parameters for the Savitzky-Golay filter. see pyleoclim.utils.filter.savitzy_golay for details.
gaussianize : bool
If True, gaussianizes the timeseries
standardize : bool
If True, standardizes the timeseries
method : string
- 'Foster': the original WWZ method;
- 'Kirchner': the method Kirchner adapted from Foster;
- 'Kirchner_f2py': the method Kirchner adapted from Foster with f2py
- 'Kirchner_numba': Kirchner's algorithm with Numba support for acceleration (default)
verbose : bool
If True, print warning messages
smooth_factor : float
smoothing factor for the WTC (default: 0.25)
Returns
-------
res : dict
contains the cross wavelet coherence, cross-wavelet phase,
vector of frequency, evenly-spaced time points, AR1 sims, cone of influence
References
----------
Grinsted, A., J. C. Moore, and S. Jevrejeva (2004), Application of the cross
wavelet transform and wavelet coherence to geophysical time series,
Nonlinear Processes in Geophysics, 11, 561–566.
See also
--------
pyleoclim.utils.wavelet.wwz_basic : Returns the weighted wavelet amplitude using the original method from Kirchner. No multiprocessing
pyleoclim.utils.wavelet.wwz_nproc : Returns the weighted wavelet amplitude using the original method from Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_basic : Return the weighted wavelet amplitude (WWA) modified by Kirchner. No multiprocessing
pyleoclim.utils.wavelet.kirchner_nproc : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Supports multiprocessing
pyleoclim.utils.wavelet.kirchner_numba : Return the weighted wavelet amplitude (WWA) modified by Kirchner using Numba package.
pyleoclim.utils.wavelet.kirchner_f2py : Returns the weighted wavelet amplitude (WWA) modified by Kirchner. Uses Fortran. Fastest method but requires a compiler.
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
pyleoclim.utils.tsutils.preprocess: pre-processes a times series using Gaussianization and detrending.
'''
if standardize:
warnings.warn('Standardizing the timeseries')
if tau is None:
lb1, ub1 = np.min(t1), np.max(t1)
lb2, ub2 = np.min(t2), np.max(t2)
lb = np.max([lb1, lb2])
ub = np.min([ub1, ub2])
inside = t1[(t1>=lb) & (t1<=ub)]
tau = np.linspace(lb, ub, np.size(inside)//10)
print(f'Setting tau={tau[:3]}...{tau[-3:]}, ntau={np.size(tau)}')
if freq is None:
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = make_freq_vector(t1, method=freq_method, **freq_kwargs)
print(f'Setting freq={freq[:3]}...{freq[-3:]}, nf={np.size(freq)}')
y1_cut, t1_cut, freq1, tau1 = prepare_wwz(y1, t1, freq=freq, tau=tau)
y2_cut, t2_cut, freq2, tau2 = prepare_wwz(y2, t2, freq=freq, tau=tau)
if np.any(tau1 != tau2):
if verbose: print('inconsistent `tau`, recalculating...')
tau_min = np.min([np.min(tau1), np.min(tau2)])
tau_max = np.max([np.max(tau1), np.max(tau2)])
ntau = np.max([np.size(tau1), np.size(tau2)])
tau = np.linspace(tau_min, tau_max, ntau)
else:
tau = tau1
if np.any(freq1 != freq2):
if verbose: print('inconsistent `freq`, recalculating...')
freq_min = np.min([np.min(freq1), np.min(freq2)])
freq_max = np.max([np.max(freq1), np.max(freq2)])
nfreq = np.max([np.size(freq1), np.size(freq2)])
freq = np.linspace(freq_min, freq_max, nfreq)
else:
freq = freq1
if freq[0] == 0:
freq = freq[1:] # delete 0 frequency if present
res_wwz1 = wwz(y1_cut, t1_cut, tau=tau, freq=freq, c=c, Neff_threshold=Neff_threshold,
nproc=nproc, detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize, method=method)
res_wwz2 = wwz(y2_cut, t2_cut, tau=tau, freq=freq, c=c, Neff_threshold=Neff_threshold,
nproc=nproc, detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize, method=method)
wt_coeff1 = res_wwz1.coeff[1] - res_wwz1.coeff[2]*1j
wt_coeff2 = res_wwz2.coeff[1] - res_wwz2.coeff[2]*1j
scale = 1/freq # `scales` here is the `Period` axis in the wavelet plot
xw_coherence, xw_phase = wtc(wt_coeff1, wt_coeff2, scale, tau,
smooth_factor=smooth_factor)
xw_product, xw_amplitude, _ = xwt(wt_coeff1, wt_coeff2)
# export output
coi = make_coi(tau, Neff_threshold=Neff_threshold)
Results = collections.namedtuple('Results', ['xw_coherence', 'xw_amplitude',
'xw_phase', 'xwt', 'freq', 'time',
'coi', 'scale'])
res = Results(xw_coherence=xw_coherence, xw_amplitude=xw_amplitude,
xw_phase=xw_phase, xwt=xw_product, scale = scale,
freq=freq, time=tau, coi=coi)
return res
[docs]def freq_vector_lomb_scargle(ts, dt= None, nf=None, ofac=4, hifac=1):
''' Return the frequency vector based on the REDFIT recommendation.
Parameters
----------
ts : array
time axis of the time series
dt : float
The resolution of the data. If None, uses the median resolution. Defaults to None.
nf : int
Number of frequency points.
If None, calculated as the difference between the highest and lowest frequencies (set by hifac and ofac) divided by resolution. Defaults to None
ofac : float
Oversampling rate that influences the resolution of the frequency axis,
when equals to 1, it means no oversamling (should be >= 1).
The default value 4 is usually a good value.
hifac : float
fhi/fnyq (should be <= 1), where fhi is the highest frequency that
can be analyzed by the Lomb-Scargle algorithm and fnyq is the Nyquist frequency.
Returns
-------
freq : array
the frequency vector
References
----------
Trauth, M. H. MATLAB® Recipes for Earth Sciences. (Springer, 2015). pp 181.
See also
--------
pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method.
pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT
pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales
pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
'''
assert ofac >= 1 and hifac <= 1, "`ofac` should be >= 1, and `hifac` should be <= 1"
if dt is None:
dt = np.median(np.diff(ts))
flo = (1/(2*dt)) / (np.size(ts)*ofac)
fhi = hifac / (2*dt)
if nf is None:
df = flo
nf = int((fhi - flo) / df + 1)
freq = np.linspace(flo, fhi, nf)
return freq
[docs]def freq_vector_welch(ts):
''' Return the frequency vector based on Welch's method.
Parameters
----------
ts : array
time axis of the time series
Returns
-------
freq : array
the frequency vector
References
----------
https://github.com/scipy/scipy/blob/v0.14.0/scipy/signal/Spectral.py
See also
--------
pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT
recommendation.
pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT
pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales
pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
'''
nt = np.size(ts)
dt = np.median(np.diff(ts))
fs = 1 / dt
if nt % 2 == 0:
n_freq = nt//2 + 1
else:
n_freq = (nt+1) // 2
freq = np.arange(n_freq) * fs / nt
return freq
[docs]def freq_vector_nfft(ts):
''' Return the frequency vector based on NFFT
Parameters
----------
ts : array
time axis of the time series
Returns
-------
freq : array
the frequency vector
See also
--------
pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT
recommendation.
pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method.
pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales
pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
'''
nt = np.size(ts)
dt = np.median(np.diff(ts))
fs = 1 / dt
n_freq = nt//2 + 1
freq = np.linspace(0, fs/2, n_freq)
return freq
[docs]def freq_vector_scale(ts, dj=0.25, s0=None,j1=None, mother='MORLET',param=None):
''' Return the frequency vector based on scales for wavelet analysis.
This function is adapted from Torrence and Compo [1998]
Parameters
----------
ts: numpy.array
The time axis for the timeseries
dj : float, optional
The spacing between discrete scales. The default is 0.25. A smaller number will give better scale resolution, but be slower to plot.
s0 : float, optional
the smallest scale of the wavelet. The default is None, representing 2*dT.
j1 : float, optional
the number of scales minus one. Scales range from S0 up to S0*2**(J1*DJ),
to give a total of (J1+1) scales. The default is None, which represents (LOG2(N DT/S0))/DJ.
mother : string, optional
the mother wavelet function. The default is 'MORLET'. Options are: 'MORLET', 'PAUL', or 'DOG'
param : flaot, optional
the mother wavelet parameter. The default is None since it varies for each mother
- For 'MORLET' this is k0 (wavenumber), default is 6.
- For 'PAUL' this is m (order), default is 4.
- For 'DOG' this is m (m-th derivative), default is 2.
Returns
-------
freq : array
the frequency vector
See also
--------
pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT
recommendation.
pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method.
pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT
pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
if mother.upper() not in ['MORLET','DOG','PAUL']:
raise ValueError('The mother wavelet should be either "MORLET","PAUL", or "DOG"')
dt = np.diff(ts).mean()
n1=len(ts)
if s0 is None:
s0 = 2 * dt
if j1 is None:
j1 = np.fix((np.log(n1 * dt / s0) / np.log(2)) / dj)
# construct SCALE array & empty PERIOD & WAVE arrays
if mother.upper() == 'MORLET':
if param == None:
param = 6.
fourier_factor = 4 * np.pi / (param + np.sqrt(2 + param**2))
elif mother.upper() == 'PAUL':
if param == None:
param = 4.
fourier_factor = 4 * np.pi / (2 * param + 1)
elif mother.upper() == 'DOG':
if param == None:
param = 2.
fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * param + 1))
else:
fourier_factor = np.nan
j = np.arange(0, j1 + 1)
scale = s0 * 2. ** (j * dj)
freq = 1. / (fourier_factor * scale)
return freq
[docs]def freq_vector_log(ts, fmin=None, fmax= None, nf=None):
''' Return the frequency vector based on logspace
Parameters
----------
ts : array
time axis of the time series
fmin : float
minimum frequency. If None is provided (default), inferred by the method.
fmax : float
maximum frequency. If None is provided (default), inferred by the method.
nf : int
number of freq points. If None is provided, defaults to ceil(N/10).
Returns
-------
freq : array
the frequency vector
See also
--------
pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT
recommendation.
pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method.
pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT
pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales
pyleoclim.utils.wavelet.make_freq_vector : Make frequency vector
'''
nt = np.size(ts)
dt = np.median(np.diff(ts))
fs = 1 / dt
if nf is None:
nf = nt//10 + 1
if fmin is None:
fmin = 2/(np.max(ts)-np.min(ts))
if fmax is None:
fmax = fs/2
start = np.log2(fmin)
stop = np.log2(fmax)
freq = np.logspace(start, stop, nf, base=2)
return freq
[docs]def make_freq_vector(ts, method='log', **kwargs):
''' Make frequency vector
This function selects among five methods to obtain the frequency
vector.
Parameters
----------
ts : array
Time axis of the time series
method : string
The method to use. Options are 'log' (default), 'nfft', 'lomb_scargle', 'welch', and 'scale'
kwargs : dict, optional
-fmin : float
minimum frequency. If None is provided (default), inferred by the method.
- fmax : float
maximum frequency. If None is provided (default), inferred by the method.
- nf (int): number of frequency points
For Lomb_Scargle, additional parameters may be passed:
- ofac (float): Oversampling rate that influences the resolution of the frequency axis,
when equals to 1, it means no oversamling (should be >= 1).
The default value 4 is usaually a good value.
- hifac (float): fhi/fnyq (should be >= 1), where fhi is the highest frequency that
can be analyzed by the Lomb-Scargle algorithm and fnyq is the Nyquist frequency.
Returns
-------
freq : array
the frequency vector
See also
--------
pyleoclim.utils.wavelet.freq_vector_lomb_scargle : Return the frequency vector based on the REDFIT
recommendation.
pyleoclim.utils.wavelet.freq_vector_welch : Return the frequency vector based on the Welch's method.
pyleoclim.utils.wavelet.freq_vector_nfft : Return the frequency vector based on NFFT
pyleoclim.utils.wavelet.freq_vector_scale : Return the frequency vector based on scales
pyleoclim.utils.wavelet.freq_vector_log : Return the frequency vector based on logspace
'''
if method == 'lomb_scargle':
freq = freq_vector_lomb_scargle(ts,**kwargs)
elif method == 'welch':
freq = freq_vector_welch(ts)
elif method == 'nfft':
freq = freq_vector_nfft(ts)
elif method == 'scale':
freq = freq_vector_scale(ts, **kwargs)
elif method == 'log':
freq = freq_vector_log(ts, **kwargs)
else:
raise ValueError('This method is not supported')
# freq = freq[1:] # discard the first element 0
return freq
[docs]def get_wwz_func(nproc, method):
''' Return the wwz function to use.
Parameters
----------
nproc : int
the number of processes for multiprocessing
method : string
'Foster' - the original WWZ method;
'Kirchner' - the method Kirchner adapted from Foster;
'Kirchner_f2py' - the method Kirchner adapted from Foster with f2py
'Kirchner_numba' - Kirchner's algorithm with Numba support for acceleration (default)
Returns
-------
wwz_func : function
the wwz function to use
'''
assertPositiveInt(nproc)
if method == 'Foster':
if nproc == 1:
wwz_func = wwz_basic
else:
wwz_func = wwz_nproc
elif method == 'Kirchner':
if nproc == 1:
wwz_func = kirchner_basic
else:
wwz_func = kirchner_nproc
elif method == 'Kirchner_f2py':
wwz_func = kirchner_f2py
elif method == 'Kirchner_numba':
wwz_func = kirchner_numba
else:
raise ValueError('Wrong specific method name for WWZ. Should be one of {"Foster", "Kirchner", "Kirchner_f2py", "Kirchner_numba"}')
return wwz_func
[docs]def prepare_wwz(ys, ts, freq=None, freq_method='log', freq_kwargs=None, tau=None, len_bd=0, bc_mode='reflect', reflect_type='odd', **kwargs):
''' Return the truncated time series with NaNs deleted and estimate frequency vector and tau
Parameters
----------
ys : array
a time series, NaNs will be deleted automatically
ts : array
the time points, if `ys` contains any NaNs, some of the time points will be deleted accordingly
freq : array
vector of frequency. If None, will be generated according to freq_method.
may be set.
freq_method : str
when freq=None, freq will be ganerated according to freq_method
freq_kwargs : str
used when freq=None for certain methods
tau : array
The evenly-spaced time points, namely the time shift for wavelet analysis.
If the boundaries of tau are not exactly on two of the time axis points, then tau will be adjusted to be so.
If None, at most 50 tau points will be generated from the input time span.
len_bd : int
the number of the ghost grid points desired on each boundary
bc_mode : string
{'constant', 'edge', 'linear_ramp', 'maximum', 'mean', 'median', 'minimum', 'reflect' , 'symmetric', 'wrap'}
For more details, see np.lib.pad()
reflect_type : string
{‘even’, ‘odd’}, optional
Used in ‘reflect’, and ‘symmetric’. The ‘even’ style is the default with an unaltered reflection around the edge value.
For the ‘odd’ style, the extented part of the array is created by subtracting the reflected values from two times the edge value.
For more details, see np.lib.pad()
Returns
-------
ys_cut : array
the truncated time series with NaNs deleted
ts_cut : array
the truncated time axis of the original time series with NaNs deleted
freq : array
vector of frequency
tau : array
the evenly-spaced time points, namely the time shift for wavelet analysis
'''
ys, ts = clean_ts(ys, ts)
if tau is None:
ntau = np.min([np.size(ts), 50])
tau = np.linspace(np.min(ts), np.max(ts), ntau)
elif np.isnan(tau).any():
warnings.warn("The input tau contains some NaNs." +
"It will be regenerated using the boundarys of the time axis of the time series with NaNs deleted," +
"with the length of the size of the input tau.")
tau = np.linspace(np.min(ts), np.max(ts), np.size(tau))
elif np.min(tau) < np.min(ts) and np.max(tau) > np.max(ts):
warnings.warn("tau should be within the time span of the time series." +
"Note that sometimes if the leading points of the time series are NaNs," +
"they will be deleted and cause np.min(tau) < np.min(ts)." +
"A new tau with the same size of the input tau will be generated.")
tau = np.linspace(np.min(ts), np.max(ts), np.size(tau))
elif np.min(tau) not in ts or np.max(tau) not in ts:
warnings.warn("The boundaries of tau are not exactly on two of the time axis points," +
"and it will be adjusted to be so.")
tau_lb = np.min(ts[ts > np.min(tau)])
tau_ub = np.max(ts[ts < np.max(tau)])
tau = np.linspace(tau_lb, tau_ub, np.size(tau))
# boundary condition
if len_bd > 0:
dt = np.median(np.diff(ts))
dtau = np.median(np.diff(tau))
len_bd_tau = len_bd*dt//dtau
if bc_mode in ['reflect', 'symmetric']:
ys = np.lib.pad(ys, (len_bd, len_bd), bc_mode, reflect_type=reflect_type)
else:
ys = np.lib.pad(ys, (len_bd, len_bd), bc_mode)
ts_left_bd = np.linspace(ts[0]-dt*len_bd, ts[0]-dt, len_bd)
ts_right_bd = np.linspace(ts[-1]+dt, ts[-1]+dt*len_bd, len_bd)
ts = np.concatenate((ts_left_bd, ts, ts_right_bd))
warnings.warn("The tau will be regenerated to fit the boundary condition.")
tau_left_bd = np.linspace(tau[0]-dtau*len_bd_tau, tau[0]-dtau, len_bd_tau)
tau_right_bd = np.linspace(tau[-1]+dtau, tau[-1]+dtau*len_bd_tau, len_bd_tau)
tau = np.concatenate((tau_left_bd, tau, tau_right_bd))
# truncate the time series when the range of tau is smaller than that of the time series
ts_cut = ts[(np.min(tau) <= ts) & (ts <= np.max(tau))]
ys_cut = ys[(np.min(tau) <= ts) & (ts <= np.max(tau))]
if freq is None:
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = make_freq_vector(ts_cut, method=freq_method, **freq_kwargs)
# remove 0 in freq vector
freq = freq[freq != 0]
return ys_cut, ts_cut, freq, tau
[docs]def xwt(coeff1, coeff2):
''' Return the cross wavelet transform.
Parameters
----------
coeff1 : array
the first of two sets of wavelet transform coefficients **in the form of a1 + a2*1j**
coeff2 : array
the second of two sets of wavelet transform coefficients **in the form of a1 + a2*1j**
freq : array
vector of frequency
tau : array
evenly-spaced time axis (original ttime axis for CWT, time shift for WWZ).
Returns
-------
xw_t : array (complex)
the cross wavelet transform
xw_amplitude : array
the cross wavelet amplitude
xw_phase : array
the cross wavelet phase
References
----------
Grinsted, A., Moore, J. C. & Jevrejeva, S. Application of the cross wavelet transform and
wavelet coherence to geophysical time series. Nonlin. Processes Geophys. 11, 561–566 (2004).
'''
xw_t = coeff1 * np.conj(coeff2)
xw_amplitude = np.sqrt(xw_t.real**2 + xw_t.imag**2)
xw_phase = np.arctan2(xw_t.imag, xw_t.real)
return xw_t, xw_amplitude, xw_phase
[docs]def wtc(coeff1, coeff2, scales, tau, smooth_factor=0.25):
''' Return the wavelet transform coherency (WTC).
Parameters
----------
coeff1 : array
the first of two sets of wavelet transform coefficients **in the form of a1 + a2*1j**
coeff2 : array
the second of two sets of wavelet transform coefficients **in the form of a1 + a2*1j**
scales : array
vector of scales (period for WWZ; more complicated dependence for CWT)
tau : array'
the evenly-spaced time points, namely the time shift for wavelet analysis
Returns
-------
xw_coherence : array
the cross wavelet coherence
References
----------
1. Grinsted, A., Moore, J. C. & Jevrejeva, S. Application of the cross wavelet transform and
wavelet coherence to geophysical time series. Nonlin. Processes Geophys. 11, 561–566 (2004).
2. Matlab code by Grinsted (https://github.com/grinsted/wavelet-coherence)
3. Python code by Sebastian Krieger (https://github.com/regeirk/pycwt)
'''
def rect(length, normalize=False):
""" Rectangular function adapted from https://github.com/regeirk/pycwt/blob/master/pycwt/helpers.py
Args:
length (int): length of the rectangular function
normalize (bool): normalize or not
Returns:
rect (array): the (normalized) rectangular function
"""
rect = np.zeros(length)
rect[0] = rect[-1] = 0.5
rect[1:-1] = 1
if normalize:
rect /= rect.sum()
return rect
def smoothing(coeff, snorm, dj, smooth_factor=smooth_factor):
""" Smoothing function adapted from https://github.com/regeirk/pycwt/blob/master/pycwt/helpers.py
Args
----
coeff : array
the wavelet coefficients get from wavelet transform **in the form of a1 + a2*1j**
snorm : array
normalized scales
dj : float
it satisfies the equation [ Sj = S0 * 2**(j*dj)
smooth_factor : float
UNCLEAR (ask Feng)
Returns
-------
rect : array
the (normalized) rectangular function
"""
def fft_kwargs(signal, **kwargs):
return {'n': int(2 ** np.ceil(np.log2(len(signal))))}
W = coeff.transpose()
m, n = np.shape(W)
# Smooth in time
k = 2 * np.pi * fft.fftfreq(fft_kwargs(W[0, :])['n'])
k2 = k ** 2
# Notes by Smoothing by Gaussian window (absolute value of wavelet function)
# using the convolution theorem: multiplication by Gaussian curve in
# Fourier domain for each scale, outer product of scale and frequency
F = np.exp(-smooth_factor * (snorm[:, np.newaxis] ** 2) * k2) # Outer product
smooth = fft.ifft(F * fft.fft(W, axis=1, **fft_kwargs(W[0, :])),
axis=1, # Along Fourier frequencies
**fft_kwargs(W[0, :], overwrite_x=True))
T = smooth[:, :n] # Remove possibly padded region due to FFT
if np.isreal(W).all():
T = T.real
# Smooth in scale
wsize = 0.6 / dj * 2
win = rect(int(np.round(wsize)), normalize=True)
T = signal.convolve2d(T, win[:, np.newaxis], 'same')
S = T.transpose()
return S
xwt = coeff1 * np.conj(coeff2)
power1 = np.abs(coeff1)**2
power2 = np.abs(coeff2)**2
dt = np.median(np.diff(tau))
snorm = scales / dt # normalized scales
# with WWZ method, we don't have a constant dj, so we will just take the average over the whole scale range
N = np.size(scales)
s0 = scales[-1]
sN = scales[0]
dj = np.log2(sN/s0) / N
S12 = smoothing(xwt/scales, snorm, dj)
S1 = smoothing(power1/scales, snorm, dj)
S2 = smoothing(power2/scales, snorm, dj)
xw_coherence = np.abs(S12)**2 / (S1*S2)
wcs = S12 / (np.sqrt(S1)*np.sqrt(S2))
xw_phase = np.angle(wcs)
return xw_coherence, xw_phase
# def reconstruct_ts(coeff, freq, tau, t, len_bd=0):
# ''' Reconstruct the normalized time series from the wavelet coefficients.
# Parameters
# ----------
# coeff : array
# the coefficients of the corresponding basis functions (a0, a1, a2)
# freq : array
# vector of frequency of the basis functions
# tau : array
# the evenly-spaced time points of the basis functions
# t : array
# the specified evenly-spaced time points of the reconstructed time series
# len_bd : int
# the number of the ghost grids want to creat on each boundary
# Returns
# -------
# rec_ts : array
# the reconstructed normalized time series
# t : array
# the evenly-spaced time points of the reconstructed time series
# '''
# omega = 2*np.pi*freq
# nf = np.size(freq)
# dt = np.median(np.diff(t))
# if len_bd > 0:
# t_left_bd = np.linspace(t[0]-dt*len_bd, t[0]-dt, len_bd)
# t_right_bd = np.linspace(t[-1]+dt, t[-1]+dt*len_bd, len_bd)
# t = np.concatenate((t_left_bd, t, t_right_bd))
# ntau = np.size(tau)
# a_0, a_1, a_2 = coeff
# rec_ts = np.zeros(np.size(t))
# for k in range(nf):
# for j in range(ntau):
# if np.isnan(a_0[j, k]) or np.isnan(a_1[j, k]) or np.isnan(a_1[j, k]):
# continue
# else:
# dz = omega[k] * (t - tau[j])
# phi_1 = np.cos(dz)
# phi_2 = np.sin(dz)
# rec_ts += (a_0[j, k] + a_1[j, k]*phi_1 + a_2[j, k]*phi_2)
# rec_ts = preprocess(rec_ts, t, detrend=False, gaussianize=False, standardize=False)
# return rec_ts, t
############ Methods for Torrence and Compo#############
[docs]def cwt(ys,ts,freq=None,freq_method='log',freq_kwargs={}, scale = None, detrend=False,sg_kwargs={},
gaussianize=False, standardize=True, pad=False, mother='MORLET',param=None):
'''
Wrapper function to implement Torrence and Compo continuous wavelet transform
Parameters
----------
ys : numpy.array
the time series.
ts : numpy.array
the time axis.
freq : numpy.array, optional
The frequency vector. The default is None, which will prompt the use of one the underlying functions
freq_method : string, optional
The method by which to obtain the frequency vector. The default is 'log'.
Options are 'log' (default), 'nfft', 'lomb_scargle', 'welch', and 'scale'
freq_kwargs : dict, optional
Optional parameters for the choice of the frequency vector. See make_freq_vector and additional methods for details. The default is {}.
scale : numpy.array
Optional scale vector in place of a frequency vector. Default is None. If scale is not None, frequency method and attached arguments will be ignored.
detrend : bool, string, {'linear', 'constant', 'savitzy-golay', 'emd'}
Whether to detrend and with which option. The default is False.
sg_kwargs : dict, optional
Additional parameters for the savitzy-golay method. The default is {}.
gaussianize : bool, optional
Whether to gaussianize. The default is False.
standardize : bool, optional
Whether to standardize. The default is True.
pad : bool, optional
Whether or not to pad the timeseries. with zeroes to get N up to the next higher power of 2.
This prevents wraparound from the end of the time series to the beginning, and also speeds up the FFT's used to do the wavelet transform.
This will not eliminate all edge effects. The default is False.
mother : string, optional
the mother wavelet function. The default is 'MORLET'. Options are: 'MORLET', 'PAUL', or 'DOG'
param : flaot, optional
the mother wavelet parameter. The default is None since it varies for each mother
- For 'MORLET' this is k0 (wavenumber), default is 6.
- For 'PAUL' this is m (order), default is 4.
- For 'DOG' this is m (m-th derivative), default is 2.
Returns
-------
res : dict
Dictionary containing:
- amplitude: the wavelet amplitude
- coi: cone of influence
- freq: frequency vector
- coeff: the wavelet coefficients
- scale: the scale vector
- time: the time vector
- mother: the mother wavelet
- param : the wavelet parameter
See also
--------
pyleoclim.utils.wavelet.make_freq_vector : make the frequency vector with various methods
pyleoclim.utils.wavelet.tc_wavelet: the underlying wavelet function by Torrence and Compo
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
pyleoclim.utils.tsutils.preprocess: pre-processes a times series using Gaussianization and detrending.
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
if standardize == True:
warnings.warn('Standardizing the timeseries')
ts = np.array(ts)
ys = np.array(ys)
ys, ts = clean_ts(ys,ts)
if len(ts) != len(ys):
raise ValueError('Time and value axis should be the same length')
if is_evenly_spaced(ts) == False:
raise ValueError('Time vector should be evenly spaced for this method. Interpolate or use WWZ.')
if mother.upper() not in ['MORLET','DOG','PAUL']:
raise ValueError('The mother wavelet should be either "MORLET","PAUL", or "DOG"')
#preprocessing
# remove NaNs
dt = np.diff(ts).mean()
ys = preprocess(ys, ts, detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize) #TC seems to require standardization
# fourier factor determination
if mother.upper() == 'MORLET':
if param == None:
param = 6.
fourier_factor = 4 * np.pi / (param + np.sqrt(2 + param**2))
elif mother.upper() == 'PAUL':
if param == None:
param = 4.
fourier_factor = 4 * np.pi / (2 * param + 1)
elif mother.upper() == 'DOG':
if param == None:
param = 2.
fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * param + 1))
else:
fourier_factor = np.nan
#get the scale
if scale is None:
if freq is None:
if freq_method == 'scale':
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq_kwargs.update({'mother':mother,'param':param})
freq = make_freq_vector(ts,method=freq_method,**freq_kwargs)
scale = 1. / (fourier_factor * freq)
#calculate wavelet
wave, coi = tc_wavelet(ys, dt, scale, mother, param, pad)
amplitude=np.abs(wave)
Results = collections.namedtuple('Results', ['amplitude', 'coi', 'freq', 'time', 'scale', 'coeff', 'mother','param','gaussianize','standardize'])
res = Results(amplitude=amplitude.T, coi=coi, freq=freq, time=ts, scale=scale, coeff=wave, mother=mother,param=param, gaussianize=gaussianize,standardize=standardize)
return res
[docs]def cwt_coherence(y1, t1, y2, t2, freq=None, freq_method='log',freq_kwargs={},
scale = None, detrend=False,sg_kwargs={}, pad = False,
standardize = True, gaussianize=False, tau = None, Neff_threshold=3,
mother='MORLET',param=None, smooth_factor=0.25):
''' Returns the wavelet transform coherency of two time series using the CWT.
Parameters
----------
y1 : array
first of two time series
y2 : array
second of the two time series
t1 : array
time axis of first time series
t2 : array
time axis of the second time series (should be = t1)
tau : array
evenly-spaced time points at which to evaluate coherence
Defaults to None, which uses t1
freq : array
vector of frequency
freq_method : string, optional
The method by which to obtain the frequency vector. The default is 'log'.
Options are 'log' (default), 'nfft', 'lomb_scargle', 'welch', and 'scale'
freq_kwargs : dict, optional
Optional parameters for the choice of the frequency vector. See make_freq_vector and additional methods for details. The default is {}.
scale : numpy.array
Optional scale vector in place of a frequency vector. Default is None. If scale is not None, frequency method and attached arguments will be ignored.
detrend : bool, string, {'linear', 'constant', 'savitzy-golay', 'emd'}
Whether to detrend and with which option. The default is False.
sg_kwargs : dict, optional
Additional parameters for the savitzy-golay method. The default is {}.
gaussianize : bool, optional
Whether to gaussianize. The default is False.
standardize : bool, optional
Whether to standardize. The default is True.
pad : bool, optional
Whether or not to pad the timeseries with zeroes to increase N to the next higher power of 2.
This prevents wraparound from the end of the time series to the beginning, and also speeds up the FFT used to do the wavelet transform.
This will not eliminate all edge effects. The default is False.
mother : string, optional
the mother wavelet function. The default is 'MORLET'. Options are: 'MORLET', 'PAUL', or 'DOG'
param : float, optional
the mother wavelet parameter. The default is None since it varies for each mother
- For 'MORLET' this is k0 (wavenumber), default is 6.
- For 'PAUL' this is m (order), default is 4.
- For 'DOG' this is m (m-th derivative), default is 2.
smooth_factor : float
smoothing factor for the WTC (default: 0.25)
Neff_threshold : int
threshold for the effective number of points (3 by default, see make_coi())
Returns
-------
res : dict
contains the cross wavelet coherence (WTC), cross-wavelet transform (XWT),
cross-wavelet phase, vector of frequency, evenly-spaced time points,
nMC AR1 scalograms, cone of influence.
References
----------
Grinsted, A., J. C. Moore, and S. Jevrejeva (2004), Application of the cross
wavelet transform and wavelet coherence to geophysical time series,
Nonlinear Processes in Geophysics, 11, 561–566.
See also
--------
pyleoclim.utils.wavelet.cwt : Continuous Wavelet Transform (Torrence & Compo 1998)
pyleoclim.utils.filter.savitzky_golay : Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
pyleoclim.utils.wavelet.make_freq_vector : Makes frequency vector
pyleoclim.utils.tsutils.gaussianize: Quantile maps a 1D array to a Gaussian distribution
pyleoclim.utils.tsutils.detrend : detrending functionalities
pyleoclim.utils.tsutils.preprocess: pre-processes a times series using Gaussianization and detrending.
'''
assert np.array_equal(t1,t2) and len(y1) == len(y2) , "t1 and t2 should be the same. Suggest using common_time()"
if standardize == True:
warnings.warn('Standardizing the timeseries')
if tau is None:
tau = t1
if freq is None:
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = make_freq_vector(t1, method=freq_method, **freq_kwargs)
print(f'Setting freq={freq[:3]}...{freq[-3:]}, nfreq={np.size(freq)}')
if freq[0] == 0:
freq = freq[1:] # delete 0 frequency if present
# Compute CWT for both series
cwt1 = cwt(y1,t1,freq=freq,freq_method=freq_method,freq_kwargs=freq_kwargs,
scale = scale, detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize, pad=pad,
mother=mother,param=param)
cwt2 = cwt(y2,t2,freq=freq,freq_method=freq_method,freq_kwargs=freq_kwargs,
scale = scale, detrend=detrend, sg_kwargs=sg_kwargs,
gaussianize=gaussianize, standardize=standardize, pad=pad,
mother=mother,param=param)
wt_coeff1 = cwt1.coeff.T # transpose so that scale is second axis, as for wwz
wt_coeff2 = cwt2.coeff.T
scale = cwt1.scale
# compute XWT and CWT
xw_coherence, xw_phase = wtc(wt_coeff1, wt_coeff2, scale, tau, smooth_factor=smooth_factor)
xw_t, xw_amplitude, _ = xwt(wt_coeff1, wt_coeff2)
# evaluate cone of influence
coi = make_coi(tau, Neff_threshold=Neff_threshold)
Results = collections.namedtuple('Results', ['xw_coherence', 'xw_amplitude', 'xw_phase', 'xw_t',
'freq', 'scale', 'time', 'coi'])
res = Results(xw_coherence=xw_coherence, xw_amplitude=xw_amplitude,
xw_phase=xw_phase, xw_t=xw_t, freq=freq, time=tau,
coi=coi, scale = scale)
return res
[docs]def tc_wavelet(Y, dt, scale, mother, param, pad=False):
'''
WAVELET 1D Wavelet transform. Adapted from Torrence and Compo to fit existing Pyleoclim functionalities
Computes the wavelet transform of the vector Y (length N),
with sampling rate DT.
By default, the Morlet wavelet (k0=6) is used.
The wavelet basis is normalized to have total energy=1 at all scales.
Parameters
----------
Y : numpy.array
the time series of length N.
dt : float
the sampling time
mother : string, optional
the mother wavelet function. The default is 'MORLET'. Options are: 'MORLET', 'PAUL', or 'DOG'
param : flaot, optional
the mother wavelet parameter. The default is None since it varies for each mother
- For 'MORLET' this is k0 (wavenumber), default is 6.
- For 'PAUL' this is m (order), default is 4.
- For 'DOG' this is m (m-th derivative), default is 2.
pad : {True,False}, optional
Whether or not to pad the timeseries. with zeroes to get N up to the next higher power of 2.
This prevents wraparound from the end of the time series to the beginning, and also speeds up the FFT's used to do the wavelet transform.
This will not eliminate all edge effects. The default is False.
Returns
-------
wave : numpy.array
The wavelet coefficients
coi : numpy.array
The cone of influence. Periods greater than this are subject to edge effects.
See also
--------
pyleoclim.utils.wavelet.tc_wave_bases: 1D wavelet functions Morlet, Paul or Dog
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
n1 = len(Y)
# construct time series to analyze, pad if necessary
x = Y - np.mean(Y)
if pad == True:
# power of 2 nearest to N
base2 = np.fix(np.log(n1) / np.log(2) + 0.4999)
nzeroes = int(2 ** (base2 + 1) - n1)
x = np.concatenate((x, np.zeros(nzeroes)))
n = len(x)
# construct wavenumber array used in transform [Eqn(5)]
kplus = np.arange(1, int(n / 2) + 1)
kplus = (kplus * 2 * np.pi / (n * dt))
kminus = np.arange(1, int((n - 1) / 2) + 1)
kminus = np.sort((-kminus * 2 * np.pi / (n * dt)))
k = np.concatenate(([0.], kplus, kminus))
# compute FFT of the (padded) time series
f = np.fft.fft(x) # [Eqn(3)]
# define the wavelet array
wave = np.zeros(shape=(len(scale), n), dtype=complex)
# loop through all scales and compute transform
for a1 in range(0, len(scale)):
daughter, fourier_factor, coi, _ = \
tc_wave_bases(mother, k, scale[a1], param)
wave[a1, :] = np.fft.ifft(f * daughter) # wavelet transform[Eqn(4)]
# COI [Sec.3g]
coi = coi * dt * np.concatenate((
np.insert(np.arange(int((n1 + 1) / 2) - 1), [0], [1E-5]),
np.insert(np.flipud(np.arange(0, int(n1 / 2) - 1)), [-1], [1E-5])))
wave = wave[:, :n1] # get rid of padding before returning
return wave, coi
[docs]def tc_wave_bases(mother, k, scale, param):
'''
WAVE_BASES 1D Wavelet functions Morlet, Paul, or DOG
Parameters
----------
mother : string
equal to 'MORLET' or 'PAUL' or 'DOG'
k : numpy.array
the Fourier frequencies at which to calculate the wavelet
scale : float
The wavelet scale
param : float
the nondimensional parameter for the wavelet function
Returns
-------
daughter : numpy.array
a vector, the wavelet function
fourier_factor : float
the ratio of Fourier period to scale
coi : float
the cone-of-influence size at the scale
dofmin : float
degrees of freedom for each point in the wavelet power
(either 2 for Morlet and Paul, or 1 for the DOG)
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
n = len(k)
kplus = np.array(k > 0., dtype=float)
if mother == 'MORLET': # ----------------------------------- Morlet
if param == -1:
param = 6.
k0 = np.copy(param)
# calc psi_0(s omega) from Table 1
expnt = -(scale * k - k0) ** 2 / 2. * kplus
norm = np.sqrt(scale * k[1]) * (np.pi ** (-0.25)) * np.sqrt(n)
daughter = norm * np.exp(expnt)
daughter = daughter * kplus # Heaviside step function
# Scale-->Fourier [Sec.3h]
fourier_factor = (4 * np.pi) / (k0 + np.sqrt(2 + k0 ** 2))
coi = fourier_factor / np.sqrt(2) # Cone-of-influence [Sec.3g]
dofmin = 2 # Degrees of freedom
elif mother == 'PAUL': # -------------------------------- Paul
if param == -1:
param = 4.
m = param
# calc psi_0(s omega) from Table 1
expnt = -scale * k * kplus
norm_bottom = np.sqrt(m * np.prod(np.arange(1, (2 * m))))
norm = np.sqrt(scale * k[1]) * (2 ** m / norm_bottom) * np.sqrt(n)
daughter = norm * ((scale * k) ** m) * np.exp(expnt) * kplus
fourier_factor = 4 * np.pi / (2 * m + 1)
coi = fourier_factor * np.sqrt(2)
dofmin = 2
elif mother == 'DOG': # -------------------------------- DOG
if param == -1:
param = 2.
m = param
# calc psi_0(s omega) from Table 1
expnt = -(scale * k) ** 2 / 2.0
norm = np.sqrt(scale * k[1] / gamma(m + 0.5)) * np.sqrt(n)
daughter = -norm * (1j ** m) * ((scale * k) ** m) * np.exp(expnt)
fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * m + 1))
coi = fourier_factor / np.sqrt(2)
dofmin = 1
else:
print('Mother must be one of MORLET, PAUL, DOG')
return daughter, fourier_factor, coi, dofmin
[docs]def tc_wave_signif(ys, ts, scale, mother, param, sigtest='chi-square', qs=[0.95],
dof=None, gws=None):
'''
Asymptotic singificance testing.
Parameters
----------
ys : numpy.array
Values for the timeseries
ts : numpy.array
time vector.
scale : numpy.array
vector of scale
mother : str
Type of mother wavelet
param : int
mother wavelet parameter
sigtest : {'chi-square','time-average','scale-average'}, optional
Type of significance test to perform . The default is 'chi-square'.
- chi-square: a regular chi-square test Eq(18) from Torrence and Compo
- time-average: DOF should be set to NA, the number of local wavelet spectra that were averaged together.
For the Global Wavelet Spectrum, this would be NA=N, where N is the number of points in your time series. Eq23 in Torrence and Compo
-scale-average: In this case, DOF should be set to a two-element vector [S1,S2], which gives the scale range that was averaged together.
e.g. if one scale-averaged scales between 2 and 8, then DOF=[2,8].
qs : list, optional
Significance level. The default is [0.95].
dof : None, optional
Degrees of freedon for signif test. The default is None, which will automatically assign:
- chi-square: DOF = 2 (or 1 for MOTHER='DOG')
- time-average: DOF = NA, the number of times averaged together.
-scale-average: DOF = [S1,S2], the range of scales averaged.
gws : np.array, optional
Global wavelet spectrum. a vector of the same length as scale. If input then this is used as the theoretical background spectrum, rather than white or red noise. The default is None.
Returns
-------
signif_level : numpy.array
Array of values for significance level
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
See also
--------
pyleoclim.utils.wavelet.chisquare_inv : inverse of chi-square CDF
pyleoclim.utils.wavelet.chisquare_solve : return the difference between calculated percentile and true P
'''
if mother.upper() not in ['MORLET','DOG','PAUL']:
raise ValueError('The mother wavelet should be either "MORLET","PAUL", or "DOG"')
if sigtest not in ['chi-square','time-average','scale-average']:
raise ValueError("The type of significance test should be either 'chi-square','time-average','scale-average'")
J1 = len(scale) - 1
dj = np.log2(scale[1] / scale[0])
variance = np.std(ys,ddof=1) ** 2
# get the appropriate parameters [see Table(2)]
if mother.upper() == 'MORLET': # ---------------------------------- Morlet
empir = ([2., -1, -1, -1])
if param == 6:
empir[1:] = ([0.776, 2.32, 0.60])
k0 = param
# Scale-->Fourier [Sec.3h]
fourier_factor = (4 * np.pi) / (k0 + np.sqrt(2 + k0 ** 2))
elif mother.upper() == 'PAUL':
empir = ([2, -1, -1, -1])
if param == 4:
empir[1:] = ([1.132, 1.17, 1.5])
m = param
fourier_factor = (4 * np.pi) / (2 * m + 1)
elif mother.upper() == 'DOG': # -------------------------------------Paul
empir = ([1., -1, -1, -1])
if param ==2:
empir[1:] = ([3.541, 1.43, 1.4])
elif param == 6: # --------------------------------------DOG
empir[1:] = ([1.966, 1.37, 0.97])
m = param
fourier_factor = 2 * np.pi * np.sqrt(2. / (2 * m + 1))
period = scale * fourier_factor
dofmin = empir[0] # Degrees of freedom with no smoothing
Cdelta = empir[1] # reconstruction factor
gamma_fac = empir[2] # time-decorrelation factor
dj0 = empir[3] # scale-decorrelation factor
dt = float(np.mean(np.diff(ts)))
freq = dt / period # normalized frequency
if gws is not None: # use global-wavelet as background spectrum
fft_theor = gws
else:
# [Eqn(16)]
lag1 = ar1_fit(ys,ts)
fft_theor = (1 - lag1 ** 2) / \
(1 - 2 * lag1 * np.cos(freq * 2 * np.pi) + lag1 ** 2)
fft_theor = variance * fft_theor # include time-series variance
signif = fft_theor
if dof is None:
dof = dofmin
signif_level = []
for siglvl in qs:
if sigtest == 'chi-square': # no smoothing, DOF=dofmin [Sec.4]
dof = dofmin
chisquare = chisquare_inv(siglvl, dof) / dof
signif = fft_theor * chisquare # [Eqn(18)]
#expand
#signif = signif[:, np.newaxis].dot(np.ones(len(ys))[np.newaxis, :])
elif sigtest == 'time-average': # time-averaged significance
if len(np.atleast_1d(dof)) == 1:
dof = np.zeros(J1) + dof
dof[dof < 1] = 1
# [Eqn(23)]
dof = dofmin * np.sqrt(1 + (dof * dt / gamma_fac / scale) ** 2)
dof[dof < dofmin] = dofmin # minimum DOF is dofmin
for a1 in range(0, J1 + 1):
chisquare = chisquare_inv(siglvl, dof[a1]) / dof[a1]
signif[a1] = fft_theor[a1] * chisquare
elif sigtest == 'scale-average': # time-averaged significance
if len(dof) != 2:
raise ValueError('DOF must be set to [S1,S2],'
' the range of scale-averages')
if Cdelta == -1:
raise ValueError('Cdelta & dj0 not defined'
' for ' + mother + ' with param = ' + str(param))
s1 = dof[0]
s2 = dof[1]
avg = np.logical_and(scale >= 2, scale < 8) # scales between S1 & S2
navg = np.sum(np.array(np.logical_and(scale >= 2, scale < 8),
dtype=int))
if navg == 0:
raise ValueError('No valid scales between ' + s1 + ' and ' + s2)
Savg = 1. / np.sum(1. / scale[avg]) # [Eqn(25)]
Smid = np.exp((np.log(s1) + np.log(s2)) / 2.) # power-of-two midpoint
dof = (dofmin * navg * Savg / Smid) * \
np.sqrt(1 + (navg * dj / dj0) ** 2) # [Eqn(28)]
fft_theor = Savg * np.sum(fft_theor[avg] / scale[avg]) # [Eqn(27)]
chisquare = chisquare_inv(siglvl, dof) / dof
signif = (dj * dt / Cdelta / Savg) * fft_theor * chisquare # [Eqn(26)]
signif_level.append(np.sqrt(signif.T))
return signif_level
[docs]def chisquare_inv(P, V):
'''
Returns the inverse of chi-square CDF with V degrees of freedom at fraction P
Parameters
----------
P : float
fraction
V : float
degress of freedom
Returns
-------
X : float
Inverse chi-square
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
if (1 - P) < 1E-4:
raise ValueError('P must be < 0.9999')
if P == 0.95 and V == 2: # this is a no-brainer
X = 5.9915
return X
MINN = 0.01 # hopefully this is small enough
MAXX = 1 # actually starts at 10 (see while loop below)
X = 1
TOLERANCE = 1E-4 # this should be accurate enough
while (X + TOLERANCE) >= MAXX: # should only need to loop thru once
MAXX = MAXX * 10.
# this calculates value for X, NORMALIZED by V
X = fminbound(chisquare_solve, MINN, MAXX, args=(P, V), xtol=TOLERANCE)
MINN = MAXX
X = X * V # put back in the goofy V factor
return X
[docs]def chisquare_solve(XGUESS, P, V):
'''
Given XGUESS, a percentile P, and degrees-of-freedom V, return the difference between calculated percentile and P.
Parameters
----------
XGUESS : float
sig level
P : float
percentile
V : float
degrees of freedom
Returns
-------
PDIFF : float
difference between calculated percentile and P
References
----------
Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
'''
PGUESS = gammainc(V / 2, V * XGUESS / 2) # incomplete Gamma function
PDIFF = np.abs(PGUESS - P) # error in calculated P
TOL = 1E-4
if PGUESS >= 1 - TOL: # if P is very close to 1 (i.e. a bad guess)
PDIFF = XGUESS # then just assign some big number like XGUESS
return PDIFF
[docs]def angle_stats(theta):
''' Statistics of a phase angle
Parameters
----------
theta : numpy.array
array of phase angles
Returns
-------
mean_theta : float
mean angle
sigma : float
circular standard deviation
kappa: float
an estimate of the Von Mises distribution's kappa parameter
References
----------
_[1] Huber, R., Dutra, L. V., & da Costa Freitas, C. (2001, July).
SAR interferogram phase filtering based on the Von Mises distribution.
In IGARSS 2001. Scanning the Present and Resolving the Future.
Proceedings. IEEE 2001 International Geoscience and Remote Sensing Symposium
(Cat. No. 01CH37217) (Vol. 6, pp. 2816-2818). IEEE.
See also
--------
pyleoclim.utils.wavelet.angle_sig: significance of phase angle statistics
'''
n = len(theta)
S = np.sin(theta).sum()
C = np.cos(theta).sum()
mean_theta = np.arctan2(S,C)
R = np.sqrt(S**2+C**2)/n
# estimate kappa from von Mises distribution
if (R<.53):
kappa = 2*R+R**3+5*R**5/6
elif (R<.85):
kappa = -0.4+1.39*R+0.43/(1-R)
else:
kappa = 1/(R**3-4*R**2+3*R)
sigma = np.sqrt(-2*np.log(R)) # circular standard deviation
return mean_theta, kappa, sigma
[docs]def angle_sig(theta, nMC=1000, level = 0.05):
''' Calculates the mean angle and assesses the significance of its statistics.
In general, a consistent phase relationship will have low circular standard deviation
(sigma <= sigma_lo) and high concentration (kappa >= kappa_hi).
Parameters
----------
theta : numpy.array
array of phase angles
nMC : int
number of Monte Carlo simulations to assess angle confidence interval
if None, the simulation is not performed.
level : float
significance level against which to gauge sigma and kappa. default: 0.05
Returns
-------
angle_mean : float
mean angle
sigma : float
circular standard deviation
kappa: float
an estimate of the Von Mises distribution's kappa parameter.
kappa is a measure of concentration (a reciprocal measure of dispersion,
so 1/kappa is analogous to the variance).
sigma_lo : float
alpha-level quantile for sigma
kappa_hi : float
(1-alpha)-level quantile for kappa
References
----------
_[1] Grinsted, A., J. C. Moore, and S. Jevrejeva (2004), Application of the cross
wavelet transform and wavelet coherence to geophysical time series,
Nonlinear Processes in Geophysics, 11, 561–566.
_[2] Huber, R., Dutra, L. V., & da Costa Freitas, C. (2001, July).
SAR interferogram phase filtering based on the Von Mises distribution.
In IGARSS 2001. Scanning the Present and Resolving the Future.
Proceedings. IEEE 2001 International Geoscience and Remote Sensing Symposium
(Cat. No. 01CH37217) (Vol. 6, pp. 2816-2818). IEEE.
See also
--------
pyleoclim.utils.wavelet.angle_stats: phase angle statistics
'''
theta = np.unwrap(theta,discont=2*np.pi)
meantheta, kappa, sigma = angle_stats(theta)
if nMC is not None:
noise = ar1_sim(theta - meantheta, p=nMC) # generate noise matrix
sigmaMC = np.empty((nMC))
kappaMC = np.empty((nMC))
for i in range(nMC):
_, kappaMC[i], sigmaMC[i] = angle_stats(meantheta+noise[:,i])
sigma_lo = np.quantile(sigmaMC, level) # obtain sigma threshold
kappa_hi = np.quantile(kappaMC, 1-level) # obtain kappa threshold
else:
sigma_lo = kappa_hi = None
Results = collections.namedtuple('Results', ['mean_angle', 'kappa', 'sigma', 'kappa_hi', 'sigma_lo'])
res = Results(mean_angle=meantheta, kappa=kappa, sigma=sigma, kappa_hi=kappa_hi, sigma_lo = sigma_lo)
return res