Source code for pyleoclim.utils.correlation

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Relevant functions for correlation analysis
"""

__all__ = [
    'corr_sig',
    'fdr',
]

import numpy as np
from scipy.stats import pearsonr
from scipy.stats.mstats import gmean
from scipy.stats import t as stu
from scipy.stats import gaussian_kde
import statsmodels.api as sm
from sklearn import preprocessing
from .tsmodel import ar1_fit_evenly


[docs]def corr_sig(y1, y2, nsim=1000, method='isospectral', alpha=0.05): """ Estimates the Pearson's correlation and associated significance between two non IID time series The significance of the correlation is assessed using one of the following methods: 1) 'ttest': T-test adjusted for effective sample size. This is a parametric test (data are Gaussian and identically distributed) with a rather ad-hoc adjustment. It is instantaneous but makes a lot of assumptions about the data, many of which may not be met. 2) 'isopersistent': AR(1) modeling of x and y. This is a parametric test as well (series follow an AR(1) model) but solves the issue by direct simulation. 3) 'isospectral': phase randomization of original inputs. (default) This is a non-parametric method, assuming only wide-sense stationarity For 2 and 3, computational requirements scale with nsim. When possible, nsim should be at least 1000. Parameters ---------- y1 : array vector of (real) numbers of same length as y2, no NaNs allowed y2 : array vector of (real) numbers of same length as y1, no NaNs allowed nsim : int the number of simulations [default: 1000] method : str; {'ttest','isopersistent','isospectral' (default)} method for significance testing alpha : float significance level for critical value estimation [default: 0.05] Returns ------- res : dict the result dictionary, containing - r : float correlation coefficient - p : float the p-value - signif : bool true if significant; false otherwise Note that signif = True if and only if p <= alpha. See also -------- pyleoclim.utils.correlation.corr_ttest : Estimates the significance of correlations between 2 time series using the classical T-test adjusted for effective sample size pyleoclim.utils.correlation.corr_isopersist : Computes correlation between two timeseries, and their significance using Ar(1) modeling pyleoclim.utils.correlation.corr_isospec : Estimates the significance of the correlation using phase randomization pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate """ y1 = np.array(y1, dtype=float) y2 = np.array(y2, dtype=float) assert np.size(y1) == np.size(y2), 'The size of X and the size of Y should be the same!' if method == 'ttest': (r, signif, p) = corr_ttest(y1, y2, alpha=alpha) elif method == 'isopersistent': (r, signif, p) = corr_isopersist(y1, y2, alpha=alpha, nsim=nsim) elif method == 'isospectral': (r, signif, p) = corr_isospec(y1, y2, alpha=alpha, nsim=nsim) res={'r':r,'signif':signif,'p':p} return res
[docs]def fdr(pvals, qlevel=0.05, method='original', adj_method=None, adj_args={}): ''' Determine significance based on the false discovery rate The false discovery rate is a method of conceptualizing the rate of type I errors in null hypothesis testing when conducting multiple comparisons. Translated from fdr.R by Dr. Chris Paciorek Parameters ---------- pvals : list or array A vector of p-values on which to conduct the multiple testing. qlevel : float The proportion of false positives desired. method : {'original', 'general'} Method for performing the testing. - 'original' follows Benjamini & Hochberg (1995); - 'general' is much more conservative, requiring no assumptions on the p-values (see Benjamini & Yekutieli (2001)). 'original' is recommended, and if desired, using 'adj_method="mean"' to increase power. adj_method: {'mean', 'storey', 'two-stage'} Method for increasing the power of the procedure by estimating the proportion of alternative p-values. - 'mean', the modified Storey estimator in Ventura et al. (2004) - 'storey', the method of Storey (2002) - 'two-stage', the iterative approach of Benjamini et al. (2001) adj_args : dict Arguments for adj_method; see prop_alt() for description, but note that for "two-stage", qlevel and fdr_method are taken from the qlevel and method arguments for fdr() Returns ------- fdr_res : array or None A vector of the indices of the significant tests; None if no significant tests See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series References ---------- - fdr.R by Dr. Chris Paciorek: https://www.stat.berkeley.edu/~paciorek/research/code/code.html ''' n = len(pvals) a = 0 if adj_method is not None: if adj_method == 'two-stage': qlevel = qlevel / (1+qlevel) # see Benjamini et al. (2001) for proof that this controls the FDR at level qlevel adj_args['qlevel'] = qlevel adj_args['fdr_method'] = method print(f'Adjusting cutoff using two-stage method, with method: {adj_args["fdr_method"]}; qlevel: {adj_args["qlevel"]}') elif adj_method == 'mean': if adj_args == {}: # default arguments for "mean" method of Ventura et al. (2004) adj_args['edf_lower'] = 0.8 adj_args['num_steps'] = 20 print(f'Adjusting cutoff using mean method, with edf_lower: {adj_args["edf_lower"]}; num_steps: {adj_args["num_steps"]}') a = prop_alt(pvals, adj_method, adj_args) if a == 1: # all hypotheses are estimated to be alternatives fdr_res = np.arange(n) else: qlevel = qlevel / (1-a) # adjust for estimate of a; default is 0 fdr_res = fdr_master(pvals, qlevel, method) return fdr_res
#----------- # Utilities #----------- def corr_ttest(y1, y2, alpha=0.05): """ Estimates the significance of correlations between 2 time series using the classical T-test adjusted for effective sample size. The degrees of freedom are adjusted following n_eff=n(1-g)/(1+g) where g is the lag-1 autocorrelation. Parameters ---------- y1 : array vectors of (real) numbers with identical length, no NaNs allowed y2 : array vectors of (real) numbers with identical length, no NaNs allowed alpha : float significance level for critical value estimation [default: 0.05] Returns ------- r : float correlation between x and y signif : bool true (1) if significant; false (0) otherwise pval : float test p-value (the probability of the test statistic exceeding the observed one by chance alone) See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.corr_isopersist : Estimate Pearson's correlation and associated significance using AR(1) pyleoclim.utils.correlation.corr_isospec : Estimate Pearson's correlation and associated significance using phase randomization pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate """ r = pearsonr(y1, y2)[0] g1 = ar1_fit_evenly(y1) g2 = ar1_fit_evenly(y2) N = np.size(y1) Ney1 = N * (1-g1) / (1+g1) Ney2 = N * (1-g2) / (1+g2) Ne = gmean([Ney1+Ney2]) assert Ne >= 10, 'Too few effective d.o.f. to apply this method!' df = Ne - 2 t = np.abs(r) * np.sqrt(df/(1-r**2)) pval = 2 * stu.cdf(-np.abs(t), df) signif = pval <= alpha return r, signif, pval def corr_isopersist(y1, y2, alpha=0.05, nsim=1000): ''' Computes the Pearson's correlation between two timeseries, and their significance using Ar(1) modeling. The significance is gauged via a non-parametric (Monte Carlo) simulation of correlations with nsim AR(1) processes with identical persistence properties as x and y ; the measure of which is the lag-1 autocorrelation (g). Parameters ---------- y1 : array vectors of (real) numbers with identical length, no NaNs allowed y2 : array vectors of (real) numbers with identical length, no NaNs allowed alpha : float significance level for critical value estimation [default: 0.05] nsim : int number of simulations [default: 1000] Returns ------- r : float correlation between x and y signif : bool true (1) if significant; false (0) otherwise pval : float test p-value (the probability of the test statstic exceeding the observed one by chance alone) Notes ----- The probability of obtaining a test statistic at least as extreme as the one actually observed, assuming that the null hypothesis is true. The test is 1 tailed on |r|: Ho = { |r| = 0 }, Ha = { |r| > 0 } The test is rejected (signif = 1) if pval <= alpha, otherwise signif=0; (Some Rights Reserved) Hepta Technologies, 2009 v1.0 USC, Aug 10 2012, based on corr_signif. See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.corr_ttest: Estimates Pearson's correlation and associated significance using a t-test pyleoclim.utils.correlation.corr_isospec : Estimates Pearson's correlation and associated significance using pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate ''' r = pearsonr(y1, y2)[0] ra = np.abs(r) y1_red, g1 = isopersistent_rn(y1, nsim) y2_red, g2 = isopersistent_rn(y2, nsim) rs = np.zeros(nsim) for i in np.arange(nsim): rs[i] = pearsonr(y1_red[:, i], y2_red[:, i])[0] rsa = np.abs(rs) xi = np.linspace(0, 1.1*np.max([ra, np.max(rsa)]), 200) kde = gaussian_kde(rsa) prob = kde(xi).T diff = np.abs(ra - xi) # min_diff = np.min(diff) pos = np.argmin(diff) pval = np.trapz(prob[pos:], xi[pos:]) rcrit = np.percentile(rsa, 100*(1-alpha)) signif = ra >= rcrit return r, signif, pval def isopersistent_rn(X, p): ''' Generates p realization of a red noise [i.e. AR(1)] process with same persistence properties as X (Mean and variance are also preserved). Parameters ---------- X : array vector of (real) numbers as a time series, no NaNs allowed p : int number of simulations Returns ------- red : numpy array n rows by p columns matrix of an AR1 process, where n is the size of X g :float lag-1 autocorrelation coefficient See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate Notes ----- (Some Rights Reserved) Hepta Technologies, 2008 ''' n = np.size(X) sig = np.std(X, ddof=1) g = ar1_fit_evenly(X) # red = red_noise(N, M, g) red = sm_ar1_sim(n, p, g, sig) return red, g def sm_ar1_sim(n, p, g, sig): ''' Produce p realizations of an AR1 process of length n with lag-1 autocorrelation g using statsmodels Parameters ---------- n : int row dimensions p : int column dimensions g : float lag-1 autocorrelation coefficient sig : float the standard deviation of the original time series Returns ------- red : numpy matrix n rows by p columns matrix of an AR1 process See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate ''' # specify model parameters (statsmodel wants lag0 coefficents as unity) ar = np.r_[1, -g] # AR model parameter ma = np.r_[1, 0.0] # MA model parameters sig_n = sig*np.sqrt(1-g**2) # theoretical noise variance for red to achieve the same variance as X red = np.empty(shape=(n, p)) # declare array # simulate AR(1) model for each column for i in np.arange(p): red[:, i] = sm.tsa.arma_generate_sample(ar=ar, ma=ma, nsample=n, burnin=50, scale=sig_n) return red # def red_noise(N, M, g): # ''' Produce M realizations of an AR1 process of length N with lag-1 autocorrelation g # Parameters # ---------- # N : int # row dimensions # M : int # column dimensions # g : float # lag-1 autocorrelation coefficient # Returns # ------- # red : numpy array # N rows by M columns matrix of an AR1 process # Notes # ----- # (Some Rights Reserved) Hepta Technologies, 2008 # J.E.G., GaTech, Oct 20th 2008 # ''' # red = np.zeros(shape=(N, M)) # red[0, :] = np.random.randn(1, M) # for i in np.arange(1, N): # red[i, :] = g * red[i-1, :] + np.random.randn(1, M) # return red def corr_isospec(y1, y2, alpha=0.05, nsim=1000): ''' Estimates the significance of the correlation using phase randomization Estimates the significance of correlations between non IID time series by phase randomization of original inputs. This function creates 'nsim' random time series that have the same power spectrum as the original time series but random phases. Parameters ---------- y1 : array vectors of (real) numbers with identical length, no NaNs allowed y2 : array vectors of (real) numbers with identical length, no NaNs allowed alpha : float significance level for critical value estimation [default: 0.05] nsim : int number of simulations [default: 1000] Returns ------- r : float correlation between y1 and y2 signif : bool true (1) if significant; false (0) otherwise F : float Fraction of time series with higher correlation coefficents than observed (approximates the p-value). See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.corr_ttest : Estimates Pearson's correlation and associated significance using a t-test pyleoclim.utils.correlation.corr_isopersist : Estimates Pearson's correlation and associated significance using AR(1) simulations pyleoclim.utils.correlation.fdr : Determine significance based on the false discovery rate References --------- - Ebisuzaki, W, 1997: A method to estimate the statistical significance of a correlation when the data are serially correlated. J. of Climate, 10, 2147-2153. - Prichard, D., Theiler, J. Generating Surrogate Data for Time Series with Several Simultaneously Measured Variables (1994) Physical Review Letters, Vol 73, Number 7 (Some Rights Reserved) USC Climate Dynamics Lab, 2012. ''' r = pearsonr(y1, y2)[0] # generate phase-randomized samples using the Theiler & Prichard method Y1surr = phaseran(y1, nsim) Y2surr = phaseran(y2, nsim) # compute correlations Y1s = preprocessing.scale(Y1surr) Y2s = preprocessing.scale(Y2surr) n = np.size(y1) C = np.dot(np.transpose(Y1s), Y2s) / (n-1) rSim = np.diag(C) # compute fraction of values higher than observed F = np.sum(np.abs(rSim) >= np.abs(r)) / nsim # establish significance signif = F < alpha # significant or not? return r, signif, F def phaseran(recblk, nsurr): ''' Simultaneous phase randomization of a set of time series It creates blocks of surrogate data with the same second order properties as the original time series dataset by transforming the oriinal data into the frequency domain, randomizing the phases simultaneoulsy across the time series and converting the data back into the time domain. Written by Carlos Gias for MATLAB http://www.mathworks.nl/matlabcentral/fileexchange/32621-phase-randomization/content/phaseran.m Parameters ---------- recblk : numpy array 2D array , Row: time sample. Column: recording. An odd number of time samples (height) is expected. If that is not the case, recblock is reduced by 1 sample before the surrogate data is created. The class must be double and it must be nonsparse. nsurr : int is the number of image block surrogates that you want to generate. Returns ------- surrblk : numpy array 3D multidimensional array image block with the surrogate datasets along the third dimension See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- - Prichard, D., Theiler, J. Generating Surrogate Data for Time Series with Several Simultaneously Measured Variables (1994) Physical Review Letters, Vol 73, Number 7 - Carlos Gias (2020). Phase randomization, MATLAB Central File Exchange ''' # Get parameters nfrms = recblk.shape[0] if nfrms % 2 == 0: nfrms = nfrms-1 recblk = recblk[0:nfrms] len_ser = int((nfrms-1)/2) interv1 = np.arange(1, len_ser+1) interv2 = np.arange(len_ser+1, nfrms) # Fourier transform of the original dataset fft_recblk = np.fft.fft(recblk) surrblk = np.zeros((nfrms, nsurr)) # for k in tqdm(np.arange(nsurr)): for k in np.arange(nsurr): ph_rnd = np.random.rand(len_ser) # Create the random phases for all the time series ph_interv1 = np.exp(2*np.pi*1j*ph_rnd) ph_interv2 = np.conj(np.flipud(ph_interv1)) # Randomize all the time series simultaneously fft_recblk_surr = np.copy(fft_recblk) fft_recblk_surr[interv1] = fft_recblk[interv1] * ph_interv1 fft_recblk_surr[interv2] = fft_recblk[interv2] * ph_interv2 # Inverse transform surrblk[:, k] = np.real(np.fft.ifft(fft_recblk_surr)) return surrblk ''' The FDR procedures translated from fdr.R by Dr. Chris Paciorek (https://www.stat.berkeley.edu/~paciorek/research/code/code.html) ''' def fdr_basic(pvals,qlevel=0.05): ''' The basic FDR of Benjamini & Hochberg (1995). Parameters ---------- pvals : list or array A vector of p-values on which to conduct the multiple testing. qlevel : float The proportion of false positives desired. Returns ------- fdr_res : array or None A vector of the indices of the significant tests; None if no significant tests See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- Benjamini, Yoav; Hochberg, Yosef (1995). "Controlling the false discovery rate: a practical and powerful approach to multiple testing". Journal of the Royal Statistical Society, Series B. 57 (1): 289–300. MR 1325392 ''' n = len(pvals) sorted_pvals = np.sort(pvals) sort_index = np.argsort(pvals) indices = np.arange(1, n+1)*(sorted_pvals <= qlevel*np.arange(1, n+1)/n) num_reject = np.max(indices) if num_reject: indices = np.arange(num_reject) fdr_res = np.sort(sort_index[indices]) else: fdr_res = None return fdr_res def fdr_master(pvals, qlevel=0.05, method='original'): ''' Perform various versions of the FDR procedure Parameters ---------- pvals : list or array A vector of p-values on which to conduct the multiple testing. qlevel : float The proportion of false positives desired. method : {'original', 'general'} Method for performing the testing. - 'original' follows Benjamini & Hochberg (1995); - 'general' is much more conservative, requiring no assumptions on the p-values (see Benjamini & Yekutieli (2001)). We recommend using 'original', and if desired, using 'adj_method="mean"' to increase power. Returns ------- fdr_res : array or None A vector of the indices of the significant tests; None if no significant tests See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- - Benjamini, Yoav; Hochberg, Yosef (1995). "Controlling the false discovery rate: a practical and powerful approach to multiple testing". Journal of the Royal Statistical Society, Series B. 57 (1): 289–300. MR 1325392 - Benjamini, Yoav; Yekutieli, Daniel (2001). "The control of the false discovery rate in multiple testing under dependency". Annals of Statistics. 29 (4): 1165–1188. doi:10.1214/aos/1013699998 ''' if method == 'general': n = len(pvals) qlevel = qlevel / np.sum(1/np.arange(1, n+1)) fdr_res = fdr_basic(pvals, qlevel) return fdr_res def storey(edf_quantile, pvals): ''' The basic Storey (2002) estimator of a, the proportion of alternative hypotheses. Parameters ---------- edf_quantile : float The quantile of the empirical distribution function at which to estimate a. pvals : list or array A vector of p-values on which to estimate a Returns ------- a : int estimate of a, the number of alternative hypotheses See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- - Storey, J.D., 2002, A direct approach to False Discovery Rates. Journal of the Royal Statistical Society, Series B, 64, 3, 479-498 ''' if edf_quantile >= 1 or edf_quantile <= 0: raise ValueError(f'Wrong edf_quantile: {edf_quantile}; must be within (0, 1)!') pvals = np.array(pvals) a = (np.mean(pvals<=edf_quantile) - edf_quantile) / (1 - edf_quantile) a = np.max(a, 0) # set to 0 if a is negative return a def prop_alt(pvals, adj_method='mean', adj_args={'edf_lower': 0.8, 'num_steps': 20}): ''' Calculate an estimate of a, the proportion of alternative hypotheses, using one of several methods Parameters ---------- pvals : list or array A vector of p-values on which to estimate a adj_method: str; {'mean', 'storey', 'two-stage'} Method for increasing the power of the procedure by estimating the proportion of alternative p-values. - 'mean', the modified Storey estimator that we suggest in Ventura et al. (2004) - 'storey', the method of Storey (2002) - 'two-stage', the iterative approach of Benjamini et al. (2001) adj_args : dict - for "mean", specify "edf_lower", the smallest quantile at which to estimate a, and "num_steps", the number of quantiles to use the approach uses the average of the Storey (2002) estimator for the num_steps quantiles starting at "edf_lower" and finishing just less than 1 - for "storey", specify "edf_quantile", the quantile at which to calculate the estimator - for "two-stage", the method uses a standard FDR approach to estimate which p-values are significant this number is the estimate of a; therefore the method requires specification of qlevel, the proportion of false positives and "fdr_method" ('original' or 'general'), the FDR method to be used. We do not recommend 'general' as this is very conservative and will underestimate a. Returns ------- a : int estimate of a, the number of alternative hypotheses See also -------- pyleoclim.utils.correlation.corr_sig : Estimates the Pearson's correlation and associated significance between two non IID time series pyleoclim.utils.correlation.fdf : Determine significance based on the false discovery rate References ---------- - Storey, J.D. (2002). A direct approach to False Discovery Rates. Journal of the Royal Statistical Society, Series B, 64, 3, 479-498 - Benjamini, Yoav; Yekutieli, Daniel (2001). "The control of the false discovery rate in multiple testing under dependency". Annals of Statistics. 29 (4): 1165–1188. doi:10.1214/aos/1013699998 - Ventura, V., Paciorek, C., Risbey, J.S. (2004). Controlling the proportion of falsely rejected hypotheses when conducting multiple tests with climatological data. Journal of climate, 17, 4343-4356 ''' n = len(pvals) if adj_method == 'two-stage': fdr_res = fdr_master(pvals, adj_method['qlevel'], adj_args['fdr_method']) a = len(fdr_res)/n return a elif adj_method == 'storey': if 'edf_quantile' not in adj_args: raise ValueError('`edf_quantile` must be specified in `adj_args`!') a = storey(adj_args['edf_quantile'], pvals) return a elif adj_method == 'mean': if adj_args['edf_lower']>=1 or adj_args['edf_lower']<=0: raise ValueError(f'Wrong edf_lower: {adj_args["edf_lower"]}; must be within (0, 1)!') if adj_args['num_steps']<1 or type(adj_args['num_steps']) is not int: raise ValueError(f'Wrong num_steps: {adj_args["num_steps"]}; must be an integer >= 1') stepsize = (1 - adj_args['edf_lower']) / adj_args['num_steps'] edf_quantiles = np.linspace(adj_args['edf_lower'], adj_args['edf_lower']+stepsize*(adj_args['num_steps']-1), adj_args['num_steps']) a_vec = [storey(edf_q, pvals) for edf_q in edf_quantiles] a = np.mean(a_vec) return a else: raise ValueError('Wrong method: {method}!')