Source code for pyleoclim.utils.causality

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for Liang and Granger causality analysis
"""

__all__ = [
    'liang_causality',
    'granger_causality'
]

import numpy as np
from statsmodels.tsa.stattools import grangercausalitytests
from tqdm import tqdm
from .tsmodel import ar1_fit_evenly, sm_ar1_sim
from .tsutils import phaseran
from scipy.stats.mstats import mquantiles

#-------
#Main functions
#--------
[docs]def granger_causality(y1, y2, maxlag=1,addconst=True,verbose=True): '''Granger causality tests Four tests for the Granger non-causality of 2 time series. All four tests give similar results. params_ftest and ssr_ftest are equivalent based on F test which is identical to lmtest:grangertest in R. Wrapper for the functions described in statsmodels (https://www.statsmodels.org/stable/generated/statsmodels.tsa.stattools.grangercausalitytests.html) Parameters ---------- y1, y2: array vectors of (real) numbers with identical length, no NaNs allowed maxlag : int or int iterable, optional If an integer, computes the test for all lags up to maxlag. If an iterable, computes the tests only for the lags in maxlag. addconst : bool, optional Include a constant in the model. verbose : bool, optional Print results Returns ------- dict All test results, dictionary keys are the number of lags. For each lag the values are a tuple, with the first element a dictionary with test statistic, pvalues, degrees of freedom, the second element are the OLS estimation results for the restricted model, the unrestricted model and the restriction (contrast) matrix for the parameter f_test. Notes ----- The null hypothesis for Granger causality tests is that y2, does NOT Granger cause y1. Granger causality means that past values of y2 have a statistically significant effect on the current value of y1, taking past values of y1 into account as regressors. We reject the null hypothesis that y2 does not Granger cause y1 if the p-values are below a desired threshold (e.g. 0.05). The null hypothesis for all four test is that the coefficients corresponding to past values of the second time series are zero. ‘params_ftest’, ‘ssr_ftest’ are based on the F distribution ‘ssr_chi2test’, ‘lrtest’ are based on the chi-square distribution See also -------- pyleoclim.utils.causality.liang_causality : information flow estimated using the Liang algorithm pyleoclim.utils.causality.signif_isopersist : significance test with AR(1) with same persistence pyleoclim.utils.causality.signif_isospec : significance test with surrogates with randomized phases References ---------- Granger, C. W. J. (1969). Investigating causal relations by econometric models and cross-spectral methods. Econometrica, 37(3), 424-438. Granger, C. W. J. (1980). Testing for causality: A personal viewpoont. Journal of Economic Dynamics and Control, 2, 329-352. Granger, C. W. J. (1988). Some recent development in a concept of causality. Journal of Econometrics, 39(1-2), 199-211. ''' if len(y1)!=len(y2): raise ValueError('Timeseries must be of same length') x=np.array([y1,y2]).T res = grangercausalitytests(x,maxlag=maxlag,addconst=addconst,verbose=verbose) return res
[docs]def liang_causality(y1, y2, npt=1, signif_test='isospec', nsim=1000, qs=[0.005, 0.025, 0.05, 0.95, 0.975, 0.995]): '''Liang-Kleeman information flow Estimate the Liang information transfer from series y2 to series y1 with significance estimates using either an AR(1) tests with series with the same persistence or surrogates with randomized phases. Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed npt : int >=1 time advance in performing Euler forward differencing, e.g., 1, 2. Unless the series are generated with a highly chaotic deterministic system, npt=1 should be used signif_test : str; {'isopersist', 'isospec'} the method for significance test see signif_isospec and signif_isopersist for details. nsim : int the number of AR(1) surrogates for significance test qs : list the quantiles for significance test Returns ------- res : dict A dictionary of results including: - T21 : float information flow from y2 to y1 (Note: not y1 -> y2!) - tau21 : float the standardized information flow from y2 to y1 - Z : float the total information flow from y2 to y1 - dH1_star : float dH*/dt (Liang, 2016) - dH1_noise : float - signif_qs : the quantiles for significance test - T21_noise : list the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise : list the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- pyleoclim.utils.causality.liang : information flow estimated using the Liang algorithm pyleoclim.utils.causality.granger_causality : information flow estimated using the Granger algorithm pyleoclim.utils.causality.signif_isopersist : significance test with AR(1) with same persistence pyleoclim.utils.causality.causality.signif_isospec : significance test with surrogates with randomized phases References ---------- Liang, X.S. (2013) The Liang-Kleeman Information Flow: Theory and Applications. Entropy, 15, 327-360, doi:10.3390/e15010327 Liang, X.S. (2014) Unraveling the cause-effect relation between timeseries. Physical review, E 90, 052150 Liang, X.S. (2015) Normalizing the causality between time series. Physical review, E 92, 022126 Liang, X.S. (2016) Information flow and causality as rigorous notions ab initio. Physical review, E 94, 052201 ''' dt=1 nm = np.size(y1) grad1 = (y1[0+npt:] - y1[0:-npt]) / (npt) grad2 = (y2[0+npt:] - y2[0:-npt]) / (npt) y1 = y1[:-npt] y2 = y2[:-npt] N = nm - npt C = np.cov(y1, y2) detC = np.linalg.det(C) dC = np.ndarray((2, 2)) dC[0, 0] = np.sum((y1-np.mean(y1))*(grad1-np.mean(grad1))) dC[0, 1] = np.sum((y1-np.mean(y1))*(grad2-np.mean(grad2))) dC[1, 0] = np.sum((y2-np.mean(y2))*(grad1-np.mean(grad1))) dC[1, 1] = np.sum((y2-np.mean(y2))*(grad2-np.mean(grad2))) dC /= N-1 a11 = C[1, 1]*dC[0, 0] - C[0, 1]*dC[1, 0] a12 = -C[0, 1]*dC[0, 0] + C[0, 0]*dC[1, 0] a11 /= detC a12 /= detC f1 = np.mean(grad1) - a11*np.mean(y1) - a12*np.mean(y2) R1 = grad1 - (f1 + a11*y1 + a12*y2) Q1 = np.sum(R1*R1) b1 = np.sqrt(Q1*dt/N) NI = np.ndarray((4, 4)) NI[0, 0] = N*dt/b1**2 NI[1, 1] = dt/b1**2*np.sum(y1*y1) NI[2, 2] = dt/b1**2*np.sum(y2*y2) NI[3, 3] = 3*dt/b1**4*np.sum(R1*R1) - N/b1**2 NI[0, 1] = dt/b1**2*np.sum(y1) NI[0, 2] = dt/b1**2*np.sum(y2) NI[0, 3] = 2*dt/b1**3*np.sum(R1) NI[1, 2] = dt/b1**2*np.sum(y1*y2) NI[1, 3] = 2*dt/b1**3*np.sum(R1*y1) NI[2, 3] = 2*dt/b1**3*np.sum(R1*y2) NI[1, 0] = NI[0, 1] NI[2, 0] = NI[0, 2] NI[2, 1] = NI[1, 2] NI[3, 0] = NI[0, 3] NI[3, 1] = NI[1, 3] NI[3, 2] = NI[2, 3] invNI = np.linalg.pinv(NI) var_a12 = invNI[2, 2] T21 = C[0, 1]/C[0, 0] * (-C[1, 0]*dC[0, 0] + C[0, 0]*dC[1, 0]) / detC var_T21 = (C[0, 1]/C[0, 0])**2 * var_a12 dH1_star= a11 dH1_noise = b1**2 / (2*C[0, 0]) Z = np.abs(T21) + np.abs(dH1_star) + np.abs(dH1_noise) tau21 = T21 / Z dH1_star = dH1_star / Z dH1_noise = dH1_noise / Z signif_test_func = { 'isopersist': signif_isopersist, 'isospec': signif_isospec, } signif_dict = signif_test_func[signif_test](y1, y2, method='liang', nsim=nsim, qs=qs, npt=npt) T21_noise_qs = signif_dict['T21_noise_qs'] tau21_noise_qs = signif_dict['tau21_noise_qs'] res = { 'T21': T21, 'tau21': tau21, 'Z': Z, 'dH1_star': dH1_star, 'dH1_noise': dH1_noise, 'signif_qs' : qs, 'T21_noise' : T21_noise_qs, 'tau21_noise' : tau21_noise_qs } return res
[docs]def liang(y1, y2, npt=1): ''' Estimate the Liang information transfer from series y2 to series y1 Parameters ---------- y1, y2 : array Vectors of (real) numbers with identical length, no NaNs allowed npt : int >=1 Time advance in performing Euler forward differencing, e.g., 1, 2. Unless the series are generated with a highly chaotic deterministic system, npt=1 should be used Returns ------- res : dict A dictionary of results including: - T21 : float information flow from y2 to y1 (Note: not y1 -> y2!) - tau21 : float the standardized information flow from y2 to y1 - Z : float the total information flow from y2 to y1 - dH1_star : float dH*/dt (Liang, 2016) - dH1_noise : float See also -------- pyleoclim.utils.causality.liang_causality : information flow estimated using the Liang algorithm pyleoclim.utils.causality.granger_causality : information flow estimated using the Granger algorithm pyleoclim.utils.causality.signif_isopersist : significance test with AR(1) with same persistence pyleoclim.utils.causality.signif_isospec : significance test with surrogates with randomized phases References ---------- Liang, X.S. (2013) The Liang-Kleeman Information Flow: Theory and Applications. Entropy, 15, 327-360, doi:10.3390/e15010327 Liang, X.S. (2014) Unraveling the cause-effect relation between timeseries. Physical review, E 90, 052150 Liang, X.S. (2015) Normalizing the causality between time series. Physical review, E 92, 022126 Liang, X.S. (2016) Information flow and causality as rigorous notions ab initio. Physical review, E 94, 052201 ''' dt=1 nm = np.size(y1) grad1 = (y1[0+npt:] - y1[0:-npt]) / (npt) grad2 = (y2[0+npt:] - y2[0:-npt]) / (npt) y1 = y1[:-npt] y2 = y2[:-npt] N = nm - npt C = np.cov(y1, y2) detC = np.linalg.det(C) dC = np.ndarray((2, 2)) dC[0, 0] = np.sum((y1-np.mean(y1))*(grad1-np.mean(grad1))) dC[0, 1] = np.sum((y1-np.mean(y1))*(grad2-np.mean(grad2))) dC[1, 0] = np.sum((y2-np.mean(y2))*(grad1-np.mean(grad1))) dC[1, 1] = np.sum((y2-np.mean(y2))*(grad2-np.mean(grad2))) dC /= N-1 a11 = C[1, 1]*dC[0, 0] - C[0, 1]*dC[1, 0] a12 = -C[0, 1]*dC[0, 0] + C[0, 0]*dC[1, 0] a11 /= detC a12 /= detC f1 = np.mean(grad1) - a11*np.mean(y1) - a12*np.mean(y2) R1 = grad1 - (f1 + a11*y1 + a12*y2) Q1 = np.sum(R1*R1) b1 = np.sqrt(Q1*dt/N) NI = np.ndarray((4, 4)) NI[0, 0] = N*dt/b1**2 NI[1, 1] = dt/b1**2*np.sum(y1*y1) NI[2, 2] = dt/b1**2*np.sum(y2*y2) NI[3, 3] = 3*dt/b1**4*np.sum(R1*R1) - N/b1**2 NI[0, 1] = dt/b1**2*np.sum(y1) NI[0, 2] = dt/b1**2*np.sum(y2) NI[0, 3] = 2*dt/b1**3*np.sum(R1) NI[1, 2] = dt/b1**2*np.sum(y1*y2) NI[1, 3] = 2*dt/b1**3*np.sum(R1*y1) NI[2, 3] = 2*dt/b1**3*np.sum(R1*y2) NI[1, 0] = NI[0, 1] NI[2, 0] = NI[0, 2] NI[2, 1] = NI[1, 2] NI[3, 0] = NI[0, 3] NI[3, 1] = NI[1, 3] NI[3, 2] = NI[2, 3] invNI = np.linalg.pinv(NI) var_a12 = invNI[2, 2] T21 = C[0, 1]/C[0, 0] * (-C[1, 0]*dC[0, 0] + C[0, 0]*dC[1, 0]) / detC var_T21 = (C[0, 1]/C[0, 0])**2 * var_a12 dH1_star= a11 dH1_noise = b1**2 / (2*C[0, 0]) Z = np.abs(T21) + np.abs(dH1_star) + np.abs(dH1_noise) tau21 = T21 / Z dH1_star = dH1_star / Z dH1_noise = dH1_noise / Z res = { 'T21': T21, 'tau21': tau21, 'Z': Z, 'dH1_star': dH1_star, 'dH1_noise': dH1_noise, } return res
[docs]def signif_isopersist(y1, y2, method, nsim=1000, qs=[0.005, 0.025, 0.05, 0.95, 0.975, 0.995], **kwargs): ''' significance test with AR(1) with same persistence Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed method : str; {'liang'} estimates for the Liang method nsim : int the number of AR(1) surrogates for significance test qs : list the quantiles for significance test Returns ------- res_dict : dict A dictionary with the following information: - T21_noise_qs : list the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise_qs : list the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- pyleoclim.utils.causality.liang_causality : information flow estimated using the Liang algorithm pyleoclim.utils.causality.granger_causality : information flow estimated using the Granger algorithm pyleoclim.utils.causality.signif_isospec : significance test with surrogates with randomized phases ''' g1 = ar1_fit_evenly(y1) g2 = ar1_fit_evenly(y2) sig1 = np.std(y1) sig2 = np.std(y2) n = np.size(y1) noise1 = sm_ar1_sim(n, nsim, g1, sig1) noise2 = sm_ar1_sim(n, nsim, g2, sig2) if method == 'liang': npt = kwargs['npt'] if 'npt' in kwargs else 1 T21_noise = [] tau21_noise = [] for i in tqdm(range(nsim), desc='Calculating causality between surrogates'): res_noise = liang(noise1[:, i], noise2[:, i], npt=npt) tau21_noise.append(res_noise['tau21']) T21_noise.append(res_noise['T21']) tau21_noise = np.array(tau21_noise) T21_noise = np.array(T21_noise) tau21_noise_qs = mquantiles(tau21_noise, qs) T21_noise_qs = mquantiles(T21_noise, qs) res_dict = { 'tau21_noise_qs': tau21_noise_qs, 'T21_noise_qs': T21_noise_qs, } #TODO add granger method else: raise KeyError(f'{method} is not a valid method') return res_dict
[docs]def signif_isospec(y1, y2, method, nsim=1000, qs=[0.005, 0.025, 0.05, 0.95, 0.975, 0.995], **kwargs): ''' significance test with surrogates with randomized phases Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed method : str; {'liang'} estimates for the Liang method nsim : int the number of surrogates for significance test qs : list the quantiles for significance test kwargs : dict keyword arguments for the causality method (e.g. npt for Liang-Kleeman) Returns ------- res_dict : dict A dictionary with the following information: - T21_noise_qs : list the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise_qs : list the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- pyleoclim.utils.causality.liang_causality : information flow estimated using the Liang algorithm pyleoclim.utils.causality.granger_causality : information flow estimated using the Granger algorithm pyleoclim.utils.causality.signif_isopersist : significance test with AR(1) with same persistence ''' noise1 = phaseran(y1, nsim) noise2 = phaseran(y2, nsim) if method == 'liang': npt = kwargs['npt'] if 'npt' in kwargs else 1 T21_noise = [] tau21_noise = [] for i in tqdm(range(nsim), desc='Calculating causality between surrogates'): res_noise = liang(noise1[:, i], noise2[:, i], npt=npt) tau21_noise.append(res_noise['tau21']) T21_noise.append(res_noise['T21']) tau21_noise = np.array(tau21_noise) T21_noise = np.array(T21_noise) tau21_noise_qs = mquantiles(tau21_noise, qs) T21_noise_qs = mquantiles(T21_noise, qs) res_dict = { 'tau21_noise_qs': tau21_noise_qs, 'T21_noise_qs': T21_noise_qs, } #TODO Recode with Surrogate class else: raise KeyError(f'{method} is not a valid method') return res_dict