Source code for pyleoclim.core.ui

''' The application interface for the users

@author: fengzhu

Created on Jan 31, 2020
'''
from ..utils import tsutils, plotting, mapping, lipdutils, tsmodel
from ..utils import wavelet as waveutils
from ..utils import spectral as specutils
from ..utils import correlation as corrutils
from ..utils import causality as causalutils
from ..utils import decomposition
from ..utils import filter as filterutils

#from textwrap import dedent

import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tabulate import tabulate
from collections import namedtuple
from copy import deepcopy

from matplotlib.ticker import ScalarFormatter, FormatStrFormatter, MaxNLocator
import matplotlib.transforms as transforms
from matplotlib import cm
from matplotlib import gridspec
import matplotlib as mpl
#from matplotlib.colors import BoundaryNorm, Normalize

import cartopy.crs as ccrs
import cartopy.feature as cfeature


from tqdm import tqdm
from scipy.stats.mstats import mquantiles
from scipy import stats, signal
import warnings
import os

import lipd as lpd

def pval_format(p, threshold=0.01, style='exp'):
    ''' Print p-value with proper format when p is close to 0
    '''
    if p < threshold:
        if p == 0:
            if style == 'float':
                s = '< 0.0001'
            elif style == 'exp':
                s = '< 1e-4'
            else:
                raise ValueError('Wrong style.')
        else:
            n = int(np.ceil(np.log10(p)))
            if style == 'float':
                s = f'< {10**n}'
            elif style == 'exp':
                s = f'< 1e{n}'
            else:
                raise ValueError('Wrong style.')
    else:
        s = f'{p:.2f}'

    return s

def dict2namedtuple(d):
    ''' Convert a dictionary to a namedtuple
    '''
    tupletype = namedtuple('tupletype', sorted(d))
    return tupletype(**d)

def infer_period_unit_from_time_unit(time_unit):
    ''' infer a period unit based on the given time unit

    '''
    if time_unit is None:
        period_unit = None
    else:
        unit_group = lipdutils.timeUnitsCheck(time_unit)
        if unit_group != 'unknown':
            if unit_group == 'kage_units':
                period_unit = 'kyrs'
            else:
                period_unit = 'yrs'
        else:
            if time_unit[-1] == 's':
                period_unit = time_unit
            else:
                period_unit = f'{time_unit}s'

    return period_unit

[docs]def gen_ts(model, t=None, nt=1000, **kwargs): ''' Generate pyleoclim.Series with timeseries models Parameters ---------- model : str, {'colored_noise', 'colored_noise_2regimes', 'ar1'} the timeseries model to use - colored_noise : colored noise with one scaling slope - colored_noise_2regimes : colored noise with two regimes of two different scaling slopes - ar1 : AR(1) series t : array the time axis nt : number of time points only works if 't' is None, and it will use an evenly-spaced vector with nt points kwargs : dict the keyward arguments for the specified timeseries model Returns ------- ts : `pyleoclim.Series` See also -------- pyleoclim.utils.tsmodel.colored_noise : generate a colored noise timeseries pyleoclim.utils.tsmodel.colored_noise_2regimes : generate a colored noise timeseries with two regimes pyleoclim.utils.tsmodel.gen_ar1_evenly : generate an AR(1) series Examples -------- - AR(1) series .. ipython:: python :okwarning: import pyleoclim as pyleo # default length nt=1000; default persistence parameter g=0.5 ts = pyleo.gen_ts(model='ar1') g = pyleo.utils.tsmodel.ar1_fit(ts.value) @savefig gen_ar1_t0.png fig, ax = ts.plot(label=f'g={g:.2f}') pyleo.closefig(fig) # use 'nt' to modify the data length ts = pyleo.gen_ts(model='ar1', nt=100) g = pyleo.utils.tsmodel.ar1_fit(ts.value) @savefig gen_ar1_t1.png fig, ax = ts.plot(label=f'g={g:.2f}') pyleo.closefig(fig) # use 'settings' to modify the persistence parameter 'g' ts = pyleo.gen_ts(model='ar1', g=0.9) g = pyleo.utils.tsmodel.ar1_fit(ts.value) @savefig gen_ar1_t2.png fig, ax = ts.plot(label=f'g={g:.2f}') pyleo.closefig(fig) - Colored noise with 1 regime .. ipython:: python :okwarning: # default scaling slope 'alpha' is 1 ts = pyleo.gen_ts(model='colored_noise') psd = ts.spectral() # estimate the scaling slope beta_info = psd.beta_est(fmin=1/50, fmax=1/2) print(beta_info['beta']) # visualize @savefig gen_cn_t0.png fig, ax = psd.plot() pyleo.closefig(fig) # modify 'alpha' with 'settings' ts = pyleo.gen_ts(model='colored_noise', alpha=2) psd = ts.spectral() # estimate the scaling slope beta_info = psd.beta_est(fmin=1/50, fmax=1/2) print(beta_info['beta']) # visualize @savefig gen_cn_t1.png fig, ax = psd.plot() pyleo.closefig(fig) - Colored noise with 2 regimes .. ipython:: python :okwarning # default scaling slopes 'alpha1' is 0.5 and 'alpha2' is 2, with break at 1/20 ts = pyleo.gen_ts(model='colored_noise_2regimes') psd = ts.spectral() # estimate the scaling slope beta_info_lf = psd.beta_est(fmin=1/50, fmax=1/20) beta_info_hf = psd.beta_est(fmin=1/20, fmax=1/2) print(beta_info_lf['beta']) print(beta_info_hf['beta']) # visualize @savefig gen_cn2_t0.png fig, ax = psd.plot() pyleo.closefig(fig) # modify the scaling slopes and scaling break with 'settings' ts = pyleo.gen_ts(model='colored_noise_2regimes', alpha1=2, alpha2=1, f_break=1/10) psd = ts.spectral() # estimate the scaling slope beta_info_lf = psd.beta_est(fmin=1/50, fmax=1/10) beta_info_hf = psd.beta_est(fmin=1/10, fmax=1/2) print(beta_info_lf['beta']) print(beta_info_hf['beta']) # visualize @savefig gen_cn2_t1.png fig, ax = psd.plot() pyleo.closefig(fig) ''' if t is None: t = np.arange(nt) tsm = { 'colored_noise': tsmodel.colored_noise, 'colored_noise_2regimes': tsmodel.colored_noise_2regimes, 'ar1': tsmodel.gen_ar1_evenly, } tsm_args = {} tsm_args['colored_noise'] = {'alpha': 1} tsm_args['colored_noise_2regimes'] = {'alpha1': 1/2, 'alpha2': 2, 'f_break': 1/20} tsm_args['ar1'] = {'g': 0.5} tsm_args[model].update(kwargs) v = tsm[model](t=t, **tsm_args[model]) ts = Series(time=t, value=v) return ts
[docs]class Series: ''' pyleoSeries object The Series class is, at its heart, a simple structure containing two arrays y and t of equal length, and some metadata allowing to interpret and plot the series. It is similar to a pandas Series, but the concept was extended because pandas does not yet support geologic time. Parameters ---------- time : list or numpy.array independent variable (t) value : list of numpy.array values of the dependent variable (y) time_unit : string Units for the time vector (e.g., 'years'). Default is 'years' time_name : string Name of the time vector (e.g., 'Time','Age'). Default is None. This is used to label the time axis on plots value_name : string Name of the value vector (e.g., 'temperature') Default is None value_unit : string Units for the value vector (e.g., 'deg C') Default is None label : string Name of the time series (e.g., 'Nino 3.4') Default is None clean_ts : boolean flag set to True to remove the NaNs and make time axis strictly prograde with duplicated timestamps reduced by averaging the values Default is True verbose : bool If True, will print warning messages if there is any Examples -------- In this example, we import the Southern Oscillation Index (SOI) into a pandas dataframe and create a pyleoSeries object. .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data=pd.read_csv( 'https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv', skiprows=0, header=1 ) time=data.iloc[:,1] value=data.iloc[:,2] ts=pyleo.Series( time=time, value=value, time_name='Year (CE)', value_name='SOI', label='Southern Oscillation Index' ) ts ts.__dict__.keys() ''' def __init__(self, time, value, time_name=None, time_unit=None, value_name=None, value_unit=None, label=None, clean_ts=True, verbose=False): if clean_ts==True: value, time = tsutils.clean_ts(np.array(value), np.array(time), verbose=verbose) self.time = time self.value = value self.time_name = time_name self.time_unit = time_unit self.value_name = value_name self.value_unit = value_unit self.label = label
[docs] def convert_time_unit(self, time_unit='years'): ''' Convert the time unit of the Series object Parameters ---------- time_unit : str the target time unit, possible input: { 'year', 'years', 'yr', 'yrs', 'y BP', 'yr BP', 'yrs BP', 'year BP', 'years BP', 'ky BP', 'kyr BP', 'kyrs BP', 'ka BP', 'ka', 'my BP', 'myr BP', 'myrs BP', 'ma BP', 'ma', } Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv( 'https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv', skiprows=0, header=1 ) time = data.iloc[:,1] value = data.iloc[:,2] ts = pyleo.Series(time=time, value=value, time_unit='years') new_ts = ts.convert_time_unit(time_unit='yrs BP') print('Original timeseries:') print('time unit:', ts.time_unit) print('time:', ts.time) print() print('Converted timeseries:') print('time unit:', new_ts.time_unit) print('time:', new_ts.time) ''' new_ts = self.copy() if time_unit is not None: tu = time_unit.lower() if tu.find('ky')>=0 or tu.find('ka')>=0: time_unit_label = 'ky BP' elif tu.find('my')>=0 or tu.find('ma')>=0: time_unit_label = 'my BP' elif tu.find('y bp')>=0 or tu.find('yr bp')>=0 or tu.find('yrs bp')>=0 or tu.find('year bp')>=0 or tu.find('years bp')>=0: time_unit_label = 'yrs BP' elif tu.find('yr')>=0 or tu.find('year')>=0 or tu.find('yrs')>=0 or tu.find('years')>=0: time_unit_label = 'yrs' else: raise ValueError(f"Input time_unit={time_unit} is not supported. Supported input: 'year', 'years', 'yr', 'yrs', 'y BP', 'yr BP', 'yrs BP', 'year BP', 'years BP', 'ky BP', 'kyr BP', 'kyrs BP', 'ka BP', 'my BP', 'myr BP', 'myrs BP', 'ma BP'.") else: return new_ts def convert_to_years(): def prograde_time(time, time_datum, time_exponent): new_time = (time_datum + time)*10**(time_exponent) return new_time def retrograde_time(time, time_datum, time_exponent): new_time = (time_datum - time)*10**(time_exponent) return new_time convert_func = { 'prograde': prograde_time, 'retrograde': retrograde_time, } if self.time_unit is not None: tu = self.time_unit.lower() if tu.find('ky')>=0 or tu.find('ka')>=0: time_dir = 'retrograde' time_datum = 1950/1e3 time_exponent = 3 time_unit_label = 'ky BP' elif tu.find('my')>=0 or tu.find('ma')>=0: time_dir = 'retrograde' time_datum = 1950/1e6 time_exponent = 6 elif tu.find('y bp')>=0 or tu.find('yr bp')>=0 or tu.find('yrs bp')>=0 or tu.find('year bp')>=0 or tu.find('years bp')>=0: time_dir ='retrograde' time_datum = 1950 time_exponent = 0 else: time_dir ='prograde' time_datum = 0 time_exponent = 0 new_time = convert_func[time_dir](self.time, time_datum, time_exponent) else: new_time = None return new_time def convert_to_bp(): time_yrs = convert_to_years() time_bp = 1950 - time_yrs return time_bp def convert_to_ka(): time_bp = convert_to_bp() time_ka = time_bp / 1e3 return time_ka def convert_to_ma(): time_bp = convert_to_bp() time_ma = time_bp / 1e6 return time_ma convert_to = { 'yrs': convert_to_years(), 'yrs BP': convert_to_bp(), 'ky BP': convert_to_ka(), 'my BP': convert_to_ma(), } new_time = convert_to[time_unit_label] dt = np.diff(new_time) if any(dt<=0): new_value, new_time = tsutils.sort_ts(self.value, new_time) else: new_value = self.copy().value new_ts.time = new_time new_ts.value = new_value new_ts.time_unit = time_unit return new_ts
[docs] def make_labels(self): ''' Initialization of labels Returns ------- time_header : str Label for the time axis value_header : str Label for the value axis ''' if self.time_name is not None: time_name_str = self.time_name else: time_name_str = 'time' if self.value_name is not None: value_name_str = self.value_name else: value_name_str = 'value' if self.value_unit is not None: value_header = f'{value_name_str} [{self.value_unit}]' else: value_header = f'{value_name_str}' if self.time_unit is not None: time_header = f'{time_name_str} [{self.time_unit}]' else: time_header = f'{time_name_str}' return time_header, value_header
def __str__(self): ''' Prints out the series in a table format and length of the series Returns ------- str length of the timeseries. ''' time_label, value_label = self.make_labels() table = { time_label: self.time, value_label: self.value, } msg = print(tabulate(table, headers='keys')) return f'Length: {np.size(self.time)}'
[docs] def stats(self): """ Compute basic statistics for the time series Computes the mean, median, min, max, standard deviation, and interquartile range of a numpy array y, ignoring NaNs. Returns ------- res : dictionary Contains the mean, median, minimum value, maximum value, standard deviation, and interquartile range for the Series. Examples -------- Compute basic statistics for the SOI series .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data=pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time=data.iloc[:,1] value=data.iloc[:,2] ts=pyleo.Series(time=time,value=value,time_name='Year C.E', value_name='SOI', label='SOI') ts.stats() """ mean, median, min_, max_, std, IQR = tsutils.simple_stats(self.value) res={'mean':mean, 'median':median, 'min':min_, 'max':max_, 'std':std, 'IQR': IQR} return res
[docs] def plot(self, figsize=[10, 4], marker=None, markersize=None, color=None, linestyle=None, linewidth=None, xlim=None, ylim=None, label=None, xlabel=None, ylabel=None, title=None, zorder=None, legend=True, plot_kwargs=None, lgd_kwargs=None, alpha=None, savefig_settings=None, ax=None, mute=False, invert_xaxis=False): ''' Plot the timeseries Parameters ---------- figsize : list a list of two integers indicating the figure size marker : str e.g., 'o' for dots See [matplotlib.markers](https://matplotlib.org/3.1.3/api/markers_api.html) for details markersize : float the size of the marker color : str, list the color for the line plot e.g., 'r' for red See [matplotlib colors] (https://matplotlib.org/3.2.1/tutorials/colors/colors.html) for details linestyle : str e.g., '--' for dashed line See [matplotlib.linestyles](https://matplotlib.org/3.1.0/gallery/lines_bars_and_markers/linestyles.html) for details linewidth : float the width of the line label : str the label for the line xlabel : str the label for the x-axis ylabel : str the label for the y-axis title : str the title for the figure zorder : int The default drawing order for all lines on the plot legend : {True, False} plot legend or not invert_xaxis : bool, optional if True, the x-axis of the plot will be inverted plot_kwargs : dict the dictionary of keyword arguments for ax.plot() See [matplotlib.pyplot.plot](https://matplotlib.org/3.1.3/api/_as_gen/matplotlib.pyplot.plot.html) for details lgd_kwargs : dict the dictionary of keyword arguments for ax.legend() See [matplotlib.pyplot.legend](https://matplotlib.org/3.1.3/api/_as_gen/matplotlib.pyplot.legend.html) for details alpha : float Transparency setting savefig_settings : dict the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} ax : matplotlib.axis, optional the axis object from matplotlib See [matplotlib.axes](https://matplotlib.org/api/axes_api.html) for details. mute : {True,False} if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax Returns ------- fig : matplotlib.figure the figure object from matplotlib See [matplotlib.pyplot.figure](https://matplotlib.org/3.1.1/api/_as_gen/matplotlib.pyplot.figure.html) for details. ax : matplotlib.axis the axis object from matplotlib See [matplotlib.axes](https://matplotlib.org/api/axes_api.html) for details. Notes ----- When `ax` is passed, the return will be `ax` only; otherwise, both `fig` and `ax` will be returned. See also -------- pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim Examples -------- Plot the SOI record .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time = data.iloc[:,1] value = data.iloc[:,2] ts = pyleo.Series(time=time,value=value,time_name='Year C.E', value_name='SOI', label='SOI') @savefig ts_plot.png fig, ax = ts.plot() pyleo.closefig(fig) Change the line color .. ipython:: python :okwarning: @savefig ts_plot2.png fig, ax = ts.plot(color='r') pyleo.closefig(fig) Save the figure. Two options available: * Within the plotting command * After the figure has been generated .. ipython:: python :okwarning: fig, ax = ts.plot(color='k', savefig_settings={'path': 'ts_plot3.png'}) pyleo.savefig(fig,path='ts_plot3.png') ''' # Turn the interactive mode off. plt.ioff() # generate default axis labels time_label, value_label = self.make_labels() if xlabel is None: xlabel = time_label if ylabel is None: ylabel = value_label plot_kwargs = {} if plot_kwargs is None else plot_kwargs.copy() if label is None: label = self.label if label is not None: plot_kwargs.update({'label': label}) if marker is not None: plot_kwargs.update({'marker': marker}) if markersize is not None: plot_kwargs.update({'markersize': markersize}) if color is not None: plot_kwargs.update({'color': color}) if linestyle is not None: plot_kwargs.update({'linestyle': linestyle}) if linewidth is not None: plot_kwargs.update({'linewidth': linewidth}) if alpha is not None: plot_kwargs.update({'alpha': alpha}) if zorder is not None: plot_kwargs.update({'zorder': zorder}) res = plotting.plot_xy( self.time, self.value, figsize=figsize, xlabel=xlabel, ylabel=ylabel, title=title, savefig_settings=savefig_settings, ax=ax, legend=legend, xlim=xlim, ylim=ylim, plot_kwargs=plot_kwargs, lgd_kwargs=lgd_kwargs, mute=mute, invert_xaxis=invert_xaxis, ) return res
[docs] def ssa(self, M=None, nMC=0, f=0.3, trunc = None, var_thresh=80): ''' Singular Spectrum Analysis Nonparametric, orthogonal decomposition of timeseries into constituent oscillations. This implementation uses the method of [1], with applications presented in [2]. Optionally (MC>0), the significance of eigenvalues is assessed by Monte-Carlo simulations of an AR(1) model fit to X, using [3]. The method expects regular spacing, but is tolerant to missing values, up to a fraction 0<f<1 (see [4]). Parameters ---------- M : int, optional window size. The default is None (10% of the length of the series). MC : int, optional Number of iteration in the Monte-Carlo process. The default is 0. f : float, optional maximum allowable fraction of missing values. The default is 0.3. trunc : str if present, truncates the expansion to a level K < M owing to one of 3 criteria: (1) 'kaiser': variant of the Kaiser-Guttman rule, retaining eigenvalues larger than the median (2) 'mc-ssa': Monte-Carlo SSA (use modes above the 95% threshold) (3) 'var': first K modes that explain at least var_thresh % of the variance. Default is None, which bypasses truncation (K = M) var_thresh : float variance threshold for reconstruction (only impactful if trunc is set to 'var') Returns ------- res : dict Containing: - eigval : (M, 1) array of eigenvalue spectrum of length r, the number of SSA modes. As in Principal Component Analysis, eigenvaluesare closely related to the fraction of variance accounted for ("explained", a common but not-so-helpful term) by each mode. - eig_vec : is a matrix of the temporal eigenvectors (T-EOFs), i.e. the temporal patterns that explain most of the variations in the original series. - PC : (N - M + 1, M) array of principal components, i.e. the loadings that, convolved with the T-EOFs, produce the reconstructed components, or RCs - RC : (N, M) array of reconstructed components, One can think of each RC as the contribution of each mode to the timeseries, weighted by their eigenvalue (loosely speaking, their "amplitude"). Summing over all columns of RC recovers the original series. (synthesis, the reciprocal operation of analysis). - eigval_q : (M, 2) array containing the 5% and 95% quantiles of the Monte-Carlo eigenvalue spectrum [ if MC >0 ] Examples -------- SSA with SOI .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time = data.iloc[:,1] value = data.iloc[:,2] ts = pyleo.Series(time=time, value=value, time_name='Year C.E', value_name='SOI', label='SOI') # plot @savefig ts_plot4.png fig, ax = ts.plot() pyleo.closefig(fig) # SSA nino_ssa = ts.ssa(M=60) Let us now see how to make use of all these arrays. The first step is too inspect the eigenvalue spectrum ("scree plot") to identify remarkable modes. Let us restrict ourselves to the first 40, so we can see something: .. ipython:: python :okwarning: import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import numpy as np d = nino_ssa['eigvals'] # extract eigenvalue vector M = len(d) # infer window size de = d*np.sqrt(2/(M-1)) var_pct = nino_ssa['pctvar'] # extract the fraction of variance attributable to each mode # plot eigenvalues r = 20 rk = np.arange(0,r)+1 fig, ax = plt.subplots() ax.errorbar(rk,d[:r],yerr=de[:r],label='SSA eigenvalues w/ 95% CI') ax.set_title('Scree plot of SSA eigenvalues') ax.set_xlabel('Rank $i$'); plt.ylabel(r'$\lambda_i$') ax.legend(loc='upper right') @savefig ts_eigen.png pyleo.showfig(fig) pyleo.closefig(fig) This highlights a few common phenomena with SSA: * the eigenvalues are in descending order * their uncertainties are proportional to the eigenvalues themselves * the eigenvalues tend to come in pairs : (1,2) (3,4), are all clustered within uncertainties . (5,6) looks like another doublet * around i=15, the eigenvalues appear to reach a floor, and all subsequent eigenvalues explain a very small amount of variance. So, summing the variance of all modes higher than 19, we get: .. ipython:: python :okwarning: print(var_pct[15:].sum()*100) That is, over 95% of the variance is in the first 15 modes. That is a typical result for a (paleo)climate timeseries; a few modes do the vast majority of the work. That means we can focus our attention on these modes and capture most of the interesting behavior. To see this, let's use the reconstructed components (RCs), and sum the RC matrix over the first 15 columns: .. ipython:: python :okwarning: RCk = nino_ssa['RCmat'][:,:14].sum(axis=1) fig, ax = ts.plot(title='ONI',mute=True) # we mute the first call to only get the plot with 2 lines ax.plot(time,RCk,label='SSA reconstruction, 14 modes',color='orange') ax.legend() @savefig ssa_recon.png pyleo.showfig(fig) pyleo.closefig(fig) Indeed, these first few modes capture the vast majority of the low-frequency behavior, including all the El Niño/La Niña events. What is left (the blue wiggles not captured in the orange curve) are high-frequency oscillations that might be considered "noise" from the standpoint of ENSO dynamics. This illustrates how SSA might be used for filtering a timeseries. One must be careful however: * there was not much rhyme or reason for picking 15 modes. Why not 5, or 39? All we have seen so far is that they gather >95% of the variance, which is by no means a magic number. * there is no guarantee that the first few modes will filter out high-frequency behavior, or at what frequency cutoff they will do so. If you need to cut out specific frequencies, you are better off doing it with a classical filter, like the butterworth filter implemented in Pyleoclim. However, in many instances the choice of a cutoff frequency is itself rather arbitrary. In such cases, SSA provides a principled alternative for generating a version of a timeseries that preserves features and excludes others (i.e, a filter). * as with all orthgonal decompositions, summing over all RCs will recover the original signal within numerical precision. Monte-Carlo SSA Selecting meaningful modes in eigenproblems (e.g. EOF analysis) is more art than science. However, one technique stands out: Monte Carlo SSA, introduced by Allen & Smith, (1996) to identiy SSA modes that rise above what one would expect from "red noise", specifically an AR(1) process_process). To run it, simply provide the parameter MC, ideally with a number of iterations sufficient to get decent statistics. Here's let's use MC = 1000. The result will be stored in the eigval_q array, which has the same length as eigval, and its two columns contain the 5% and 95% quantiles of the ensemble of MC-SSA eigenvalues. .. ipython:: python :okwarning: nino_mcssa = ts.ssa(M = 60, nMC=1000) Now let's look at the result: .. ipython:: python :okwarning: d = nino_mcssa['eigvals'] # extract eigenvalue vector de = d*np.sqrt(2/(M-1)) du = nino_mcssa['eigvals_q'][:,0] # extract upper quantile of MC-SSA eigenvalues dl = nino_mcssa['eigvals_q'][:,1] # extract lower quantile of MC-SSA eigenvalues # plot eigenvalues rk = np.arange(0,20)+1 fig = plt.figure() plt.fill_between(rk, dl[:20], du[:20], color='silver', alpha=0.5, label='MC-SSA 95% CI') plt.errorbar(rk,d[:20],yerr=de[:20],label='SSA eigenvalues w/ 95% CI') plt.title('Scree plot of SSA eigenvalues, w/ MC-SSA bounds') plt.xlabel('Rank $i$'); plt.ylabel(r'$\lambda_i$') plt.legend(loc='upper right') @savefig scree_nmc.png pyleo.showfig(fig) pyleo.closefig(fig) This suggests that modes 1-5 fall above the red noise benchmark. ''' res = decomposition.ssa(self.value, M=M, nMC=nMC, f=f, trunc = trunc, var_thresh=var_thresh) return res
[docs] def is_evenly_spaced(self): ''' Check if the timeseries is evenly-spaced Returns ------ res : bool ''' res = tsutils.is_evenly_spaced(self.time) return res
[docs] def filter(self, cutoff_freq=None, cutoff_scale=None, method='butterworth', **kwargs): ''' Apply a filter to the timeseries Parameters ---------- method : str, {'savitzky-golay', 'butterworth', 'firwin', 'lanczos'} the filtering method - 'butterworth': a Butterworth filter (default = 4th order) - 'savitzky-golay': Savitzky-Golay filter - 'firwin': finite impulse response filter design using the window method, with default window as Hamming - 'lanczos': Lanczos zero-phase filter cutoff_freq : float or list The cutoff frequency only works with the Butterworth method. If a float, it is interpreted as a low-frequency cutoff (lowpass). If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass). Note that only the Butterworth option (default) currently supports bandpass filtering. cutoff_scale : float or list cutoff_freq = 1 / cutoff_scale The cutoff scale only works with the Butterworth method and when cutoff_freq is None. If a float, it is interpreted as a low-frequency (high-scale) cutoff (lowpass). If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass). kwargs : dict a dictionary of the keyword arguments for the filtering method, see `pyleoclim.utils.filter.savitzky_golay`, `pyleoclim.utils.filter.butterworth`, `pyleoclim.utils.filter.lanczos` and `pyleoclim.utils.filter.firwin` for the details Returns ------- new : pyleoclim.Series See also -------- pyleoclim.utils.filter.butterworth : Butterworth method pyleoclim.utils.filter.savitzky_golay : Savitzky-Golay method pyleoclim.utils.filter.firwin : FIR filter design using the window method pyleoclim.utils.filter.lanczos : lowpass filter via Lanczos resampling Examples -------- In the example below, we generate a signal as the sum of two signals with frequency 10 Hz and 20 Hz, respectively. Then we apply a low-pass filter with a cutoff frequency at 15 Hz, and compare the output to the signal of 10 Hz. After that, we apply a band-pass filter with the band 15-25 Hz, and compare the outcome to the signal of 20 Hz. - Generating the test data .. ipython:: python :okwarning: import pyleoclim as pyleo import numpy as np t = np.linspace(0, 1, 1000) sig1 = np.sin(2*np.pi*10*t) sig2 = np.sin(2*np.pi*20*t) sig = sig1 + sig2 ts1 = pyleo.Series(time=t, value=sig1) ts2 = pyleo.Series(time=t, value=sig2) ts = pyleo.Series(time=t, value=sig) fig, ax = ts.plot(mute=True, label='mix') ts1.plot(ax=ax, label='10 Hz') ts2.plot(ax=ax, label='20 Hz') ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3) @savefig ts_filter1.png pyleo.showfig(fig) pyleo.closefig(fig) - Applying the low-pass filter .. ipython:: python :okwarning: fig, ax = ts.plot(mute=True, label='mix') ts.filter(cutoff_freq=15).plot(ax=ax, label='After 15 Hz low-pass filter') ts1.plot(ax=ax, label='10 Hz') ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3) @savefig ts_filter2.png pyleo.showfig(fig) pyleo.closefig(fig) - Applying the band-pass filter .. ipython:: python :okwarning: fig, ax = ts.plot(mute=True, label='mix') ts.filter(cutoff_freq=[15, 25]).plot(ax=ax, label='After 15-25 Hz band-pass filter') ts2.plot(ax=ax, label='20 Hz') ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3) @savefig ts_filter3.png pyleo.showfig(fig) pyleo.closefig(fig) Above is using the default Butterworth filtering. To use FIR filtering with a window like Hanning is also simple: .. ipython:: python :okwarning: fig, ax = ts.plot(mute=True, label='mix') ts.filter(cutoff_freq=[15, 25], method='firwin', window='hanning').plot(ax=ax, label='After 15-25 Hz band-pass filter') ts2.plot(ax=ax, label='20 Hz') ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3) @savefig ts_filter4.png pyleo.showfig(fig) pyleo.closefig(fig) ''' if not self.is_evenly_spaced(): raise ValueError('This method assumes evenly-spaced timeseries, while the input is not. Use the ".interp()", ".bin()" or ".gkernel()" methods prior to ".filter()".') new = self.copy() mu = np.mean(self.value) # extract the mean y = self.value - mu method_func = { 'savitzky-golay': filterutils.savitzky_golay, 'butterworth': filterutils.butterworth, 'firwin': filterutils.firwin, 'lanczos': filterutils.lanczos, } args = {} if method in ['butterworth', 'firwin', 'lanczos']: if cutoff_freq is None: if cutoff_scale is None: raise ValueError('Please set the cutoff frequency or scale argument: "cutoff_freq" or "cutoff_scale".') else: if np.isscalar(cutoff_scale): cutoff_freq = 1 / cutoff_scale elif len(cutoff_scale) == 2 and method in ['butterworth', 'firwin']: cutoff_scale = np.array(cutoff_scale) cutoff_freq = np.sort(1 / cutoff_scale) cutoff_freq = list(cutoff_freq) elif len(cutoff_scale) > 1 and method == 'lanczos': raise ValueError('Lanczos filter requires a scalar input as cutoff scale/frequency') else: raise ValueError('Wrong cutoff_scale; should be either one float value (lowpass) or a list two float values (bandpass).') args['butterworth'] = {'fc': cutoff_freq, 'fs': 1/np.mean(np.diff(self.time))} args['firwin'] = {'fc': cutoff_freq, 'fs': 1/np.mean(np.diff(self.time))} args['lanczos'] = {'fc': cutoff_freq, 'fs': 1/np.mean(np.diff(self.time))} args[method].update(kwargs) new_val = method_func[method](y, **args[method]) new.value = new_val + mu # restore the mean return new
[docs] def distplot(self, figsize=[10, 4], title=None, savefig_settings=None, ax=None, ylabel='KDE', vertical=False, edgecolor='w',mute=False, **plot_kwargs): ''' Plot the distribution of the timeseries values Parameters ---------- figsize : list a list of two integers indicating the figure size title : str the title for the figure savefig_settings : dict the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} ax : matplotlib.axis, optional A matplotlib axis ylabel : str Label for the count axis vertical : {True,False} Whether to flip the plot vertically edgecolor : matplotlib.color The color of the edges of the bar mute : {True,False} if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax plot_kwargs : dict Plotting arguments for seaborn histplot: https://seaborn.pydata.org/generated/seaborn.histplot.html See also -------- pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim Examples -------- Distribution of the SOI record .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data=pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time=data.iloc[:,1] value=data.iloc[:,2] ts=pyleo.Series(time=time,value=value,time_name='Year C.E', value_name='SOI', label='SOI') @savefig ts_plot5.png fig, ax = ts.plot() pyleo.closefig(fig) @savefig ts_dist.png fig, ax = ts.distplot() pyleo.closefig(fig) ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) #make the data into a dataframe so we can flip the figure time_label, value_label = self.make_labels() if vertical == True: data=pd.DataFrame({'value':self.value}) ax = sns.histplot(data=data, y="value", ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs) ax.set_ylabel(value_label) ax.set_xlabel(ylabel) else: ax = sns.histplot(self.value, ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs) ax.set_xlabel(value_label) ax.set_ylabel(ylabel) if title is not None: ax.set_title(title) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def summary_plot(self, psd=None, scalogram=None, figsize=[8, 10], title=None, savefig_settings=None, time_lim=None, value_lim=None, period_lim=None, psd_lim=None, n_signif_test=100, time_label=None, value_label=None, period_label=None, psd_label='PSD', mute=False): ''' Generate a plot of the timeseries and its frequency content through spectral and wavelet analyses. Parameters ---------- psd : PSD the PSD object of a Series. If None, will be calculated. This process can be slow as it will be using the WWZ method. scalogram : Scalogram the Scalogram object of a Series. If None, will be calculated. This process can be slow as it will be using the WWZ method. figsize : list a list of two integers indicating the figure size title : str the title for the figure time_lim : list or tuple the limitation of the time axis value_lim : list or tuple the limitation of the value axis of the timeseries period_lim : list or tuple the limitation of the period axis psd_lim : list or tuple the limitation of the psd axis n_signif_test=100 : int Number of Monte-Carlo simulations to perform for significance testing. Used when psd=None or scalogram=None time_label : str the label for the time axis value_label : str the label for the value axis of the timeseries period_label : str the label for the period axis psd_label : str the label for the amplitude axis of PDS savefig_settings : dict the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} mute : {True,False} if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax See also -------- pyleoclim.core.ui.Series.spectral : Spectral analysis for a timeseries pyleoclim.core.ui.Series.wavelet : Wavelet analysis for a timeseries pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim pyleoclim.core.ui.PSD : PSD object pyleoclim.core.ui.MultiplePSD : Multiple PSD object ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() fig = plt.figure(figsize=figsize) gs = gridspec.GridSpec(6, 12) gs.update(wspace=0, hspace=0) ax = {} ax['ts'] = plt.subplot(gs[0:1, :-3]) ax['ts'] = self.plot(ax=ax['ts']) if time_lim is not None: ax['ts'].set_xlim(time_lim) if value_lim is not None: ax['ts'].set_ylim(value_lim) ax['ts'].spines['bottom'].set_visible(False) ax['scal'] = plt.subplot(gs[1:5, :-3], sharex=ax['ts']) if scalogram is None: if n_signif_test > 0: scalogram = self.wavelet().signif_test(number=n_signif_test) else: scalogram = self.wavelet() ax['scal'] = scalogram.plot(ax=ax['scal'], cbar_style={'orientation': 'horizontal', 'pad': 0.1}) ax['psd'] = plt.subplot(gs[1:4, -3:], sharey=ax['scal']) if psd is None: if n_signif_test > 0: psd = self.spectral().signif_test(number=n_signif_test) else: psd = self.spectral() ax['psd'] = psd.plot(ax=ax['psd'], transpose=True) if period_lim is not None: ax['psd'].set_ylim(period_lim) ax['psd'].set_ylabel(None) ax['psd'].tick_params(axis='y', direction='in', labelleft=False) ax['psd'].legend().remove() if psd_lim is not None: ax['psd'].set_xlim(psd_lim) if title is not None: ax['ts'].set_title(title) if value_label is not None: time_label, value_label = self.make_labels() ax['ts'].set_ylabel(value_label) if time_label is not None: time_label, value_label = self.make_labels() ax['scal'].set_xlabel(time_label) if period_label is not None: period_unit = infer_period_unit_from_time_unit(self.time_unit) period_label = f'Period [{period_unit}]' if period_unit is not None else 'Period' ax['scal'].set_ylabel(period_label) if psd_label is not None: ax['psd'].set_xlabel(psd_label) if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax
[docs] def copy(self): '''Make a copy of the Series object Returns ------- Series A copy of the Series object ''' return deepcopy(self)
[docs] def clean(self, verbose=False): ''' Clean up the timeseries by removing NaNs and sort with increasing time points Parameters ---------- verbose : bool If True, will print warning messages if there is any Returns ------- Series Series object with removed NaNs and sorting ''' new = self.copy() v_mod, t_mod = tsutils.clean_ts(self.value, self.time, verbose=verbose) new.time = t_mod new.value = v_mod return new
[docs] def gaussianize(self): ''' Gaussianizes the timeseries Returns ------- new : pyleoclim.Series The Gaussianized series object ''' new = self.copy() v_mod = tsutils.gaussianize(self.value) new.value = v_mod return new
[docs] def standardize(self): '''Standardizes the time series Returns ------- new : pyleoclim.Series The standardized series object ''' new = self.copy() v_mod = tsutils.standardize(self.value)[0] new.value = v_mod return new
[docs] def anomaly(self, timespan=None): ''' Calculate the anomaly of the series Parameters ---------- timespan : tuple or list The timespan of the mean as the reference for anomaly calculation. It is in form of [a, b], where a, b are two time points. Returns ------- new : pyleoclim.Series The standardized series object ''' new = self.copy() if timespan is not None: v_mod = self.value - np.nanmean(self.slice(timespan).value) else: v_mod = self.value - np.nanmean(self.value) new.value = v_mod return new
[docs] def segment(self, factor=10): """Gap detection This function segments a timeseries into n number of parts following a gap detection algorithm. The rule of gap detection is very simple: we define the intervals between time points as dts, then if dts[i] is larger than factor * dts[i-1], we think that the change of dts (or the gradient) is too large, and we regard it as a breaking point and divide the time series into two segments here Parameters ---------- ts : pyleoclim Series factor : float The factor that adjusts the threshold for gap detection Returns ------- res : pyleoclim MultipleSeries Object or pyleoclim Series Object If gaps were detected, returns the segments in a MultipleSeries object, else, returns the original timeseries. """ seg_y, seg_t, n_segs = tsutils.ts2segments(self.value,self.time,factor=factor) if len(seg_y)>1: s_list=[] for idx,s in enumerate(seg_y): s_tmp=Series(time=seg_t[idx],value=s,time_name=self.time_name, time_unit=self.time_unit, value_name=self.value_name, value_unit=self.value_unit,label=self.label) s_list.append(s_tmp) res=MultipleSeries(series_list=s_list) elif len(seg_y)==1: res=self.copy() else: raise ValueError('No timeseries detected') return res
[docs] def slice(self, timespan): ''' Slicing the timeseries with a timespan (tuple or list) Parameters ---------- timespan : tuple or list The list of time points for slicing, whose length must be even. When there are n time points, the output Series includes n/2 segments. For example, if timespan = [a, b], then the sliced output includes one segment [a, b]; if timespan = [a, b, c, d], then the sliced output includes segment [a, b] and segment [c, d]. Returns ------- new : Series The sliced Series object. ''' new = self.copy() n_elements = len(timespan) if n_elements % 2 == 1: raise ValueError('The number of elements in timespan must be even!') n_segments = int(n_elements / 2) mask = [False for i in range(np.size(self.time))] for i in range(n_segments): mask |= (self.time >= timespan[i*2]) & (self.time <= timespan[i*2+1]) new.time = self.time[mask] new.value = self.value[mask] return new
[docs] def fill_na(self, timespan=None, dt=1): ''' Fill NaNs into the timespan Parameters ---------- timespan : tuple or list The list of time points for slicing, whose length must be 2. For example, if timespan = [a, b], then the sliced output includes one segment [a, b]. If None, will use the start point and end point of the original timeseries dt : float The time spacing to fill the NaNs; default is 1. Returns ------- new : Series The sliced Series object. ''' new = self.copy() if timespan is None: start = np.min(self.time) end = np.max(self.time) else: start = timespan[0] end = timespan[-1] new_time = np.arange(start, end+dt, dt) new_value = np.empty(np.size(new_time)) for i, t in enumerate(new_time): if t in self.time: loc = list(self.time).index(t) new_value[i] = self.value[loc] else: new_value[i] = np.nan new.time = new_time new.value = new_value return new
[docs] def detrend(self, method='emd', **kwargs): '''Detrend Series object Parameters ---------- method : str, optional The method for detrending. The default is 'emd'. Options include: * linear: the result of a linear least-squares fit to y is subtracted from y. * constant: only the mean of data is subtrated. * "savitzky-golay", y is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. * "emd" (default): Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series **kwargs : dict Relevant arguments for each of the methods. Returns ------- new : pyleoclim.Series Detrended Series object See also -------- pyleoclim.utils.tsutils.detrend : detrending wrapper functions Examples -------- We will generate a random signal and use the different detrending functions .. ipython:: python :okwarning: import pyleoclim as pyleo import numpy as np # Generate a mixed signal with known frequencies freqs=[1/20,1/80] time=np.arange(2001) signals=[] for freq in freqs: signals.append(np.cos(2*np.pi*freq*time)) signal=sum(signals) # Add a non-linear trend slope = 1e-5 intercept = -1 nonlinear_trend = slope*time**2 + intercept signal_trend = signal + nonlinear_trend # Add white noise sig_var = np.var(signal) noise_var = sig_var / 2 #signal is twice the size of noise white_noise = np.random.normal(0, np.sqrt(noise_var), size=np.size(signal)) signal_noise = signal_trend + white_noise # Create a series object ts = pyleo.Series(time=time,value=signal_noise) @savefig random_series.png fig, ax = ts.plot(title='Timeseries with nonlinear trend') pyleo.closefig(fig) # kStandardize ts_std = ts.standardize() # Detrend using EMD ts_emd = ts_std.detrend() @savefig ts_emd.png fig, ax = ts_emd.plot(title='Detrended with EMD method') pyleo.closefig(fig) # Detrend using Savitzky-Golay filter ts_sg = ts_std.detrend(method='savitzky-golay') @savefig ts_sg.png fig, ax = ts_sg.plot(title='Detrended with Savitzky-Golay filter') pyleo.closefig(fig) ''' new = self.copy() v_mod = tsutils.detrend(self.value, x=self.time, method=method, **kwargs) new.value = v_mod return new
[docs] def spectral(self, method='lomb_scargle', freq_method='log', freq_kwargs=None, settings=None, label=None, verbose=False): ''' Perform spectral analysis on the timeseries Parameters ---------- method : str {'wwz', 'mtm', 'lomb_scargle', 'welch', 'periodogram'} freq_method : str {'log','scale', 'nfft', 'lomb_scargle', 'welch'} freq_kwargs : dict Arguments for frequency vector settings : dict Arguments for the specific spectral method label : str Label for the PSD object verbose : bool If True, will print warning messages if there is any Returns ------- psd : pyleoclim.PSD A :mod:`pyleoclim.PSD` object See also -------- pyleoclim.utils.spectral.mtm : Spectral analysis using the Multitaper approach pyleoclim.utils.spectral.lomb_scargle : Spectral analysis using the Lomb-Scargle method pyleoclim.utils.spectral.welch: Spectral analysis using the Welch segement approach pyleoclim.utils.spectral.periodogram: Spectral anaysis using the basic Fourier transform pyleoclim.utils.spectral.wwz_psd : Spectral analysis using the Wavelet Weighted Z transform pyleoclim.utils.wavelet.make_freq_vector : Functions to create the frequency vector pyleoclim.utils.tsutils.detrend : Detrending function pyleoclim.core.ui.PSD : PSD object pyleoclim.core.ui.MultiplePSD : Multiple PSD object Examples -------- Calculate the spectrum of SOI using the various methods and compute significance .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time = data.iloc[:,1] value = data.iloc[:,2] ts = pyleo.Series(time=time, value=value, time_name='Year C.E', value_name='SOI', label='SOI') # Standardize the time series ts_std = ts.standardize() - Lomb-Scargle .. ipython:: python :okwarning: psd_ls = ts_std.spectral(method='lomb_scargle') psd_ls_signif = psd_ls.signif_test(number=20) #in practice, need more AR1 simulations @savefig spec_ls.png fig, ax = psd_ls_signif.plot(title='PSD using Lomb-Scargle method') pyleo.closefig(fig) We may pass in method-specific arguments via "settings", which is a dictionary. For instance, to adjust the number of overlapping segment for Lomb-Scargle, we may specify the method-specific argument "n50"; to adjust the frequency vector, we may modify the "freq_method" or modify the method-specific argument "freq". .. ipython:: python :okwarning: import numpy as np psd_LS_n50 = ts_std.spectral(method='lomb_scargle', settings={'n50': 4}) # c=1e-2 yields lower frequency resolution psd_LS_freq = ts_std.spectral(method='lomb_scargle', settings={'freq': np.linspace(1/20, 1/0.2, 51)}) psd_LS_LS = ts_std.spectral(method='lomb_scargle', freq_method='lomb_scargle') # with frequency vector generated using REDFIT method fig, ax = psd_LS_n50.plot( title='PSD using Lomb-Scargle method with 4 overlapping segments', label='settings={"n50": 4}', mute=True) psd_ls.plot(ax=ax, label='settings={"n50": 3}', marker='o') @savefig spec_ls_n50.png pyleo.showfig(fig) pyleo.closefig(fig) fig, ax = psd_LS_freq.plot( title='PSD using Lomb-Scargle method with differnt frequency vectors', mute=True, label='freq=np.linspace(1/20, 1/0.2, 51)', marker='o') psd_ls.plot(ax=ax, label='freq_method="log"', marker='o') @savefig spec_ls_freq.png pyleo.showfig(fig) pyleo.closefig(fig) You may notice the differences in the PSD curves regarding smoothness and the locations of the analyzed period points. For other method-specific arguments, please look up the specific methods in the "See also" section. - WWZ .. ipython:: python :okwarning: psd_wwz = ts_std.spectral(method='wwz') # wwz is the default method psd_wwz_signif = psd_wwz.signif_test(number=1) # significance test; for real work, should use number=200 or even larger @savefig spec_wwz.png fig, ax = psd_wwz_signif.plot(title='PSD using WWZ method') pyleo.closefig(fig) - Periodogram .. ipython:: python :okwarning: ts_interp = ts_std.interp() psd_perio = ts_interp.spectral(method='periodogram') psd_perio_signif = psd_perio.signif_test(number=20) #in practice, need more AR1 simulations @savefig spec_perio.png fig, ax = psd_perio_signif.plot(title='PSD using Periodogram method') pyleo.closefig(fig) - Welch .. ipython:: python :okwarning: ts_interp = ts_std.interp() psd_welch = ts_interp.spectral(method='welch') psd_welch_signif = psd_welch.signif_test(number=20) #in practice, need more AR1 simulations @savefig spec_welch.png fig, ax = psd_welch_signif.plot(title='PSD using Welch method') pyleo.closefig(fig) - MTM .. ipython:: python :okwarning: ts_interp = ts_std.interp() psd_mtm = ts_interp.spectral(method='mtm') psd_mtm_signif = psd_mtm.signif_test(number=20) #in practice, need more AR1 simulations @savefig spec_mtm.png fig, ax = psd_mtm_signif.plot(title='PSD using MTM method') pyleo.closefig(fig) ''' if not verbose: warnings.simplefilter('ignore') settings = {} if settings is None else settings.copy() spec_func = { 'wwz': specutils.wwz_psd, 'mtm': specutils.mtm, 'lomb_scargle': specutils.lomb_scargle, 'welch': specutils.welch, 'periodogram': specutils.periodogram } args = {} freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() freq = waveutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs) args['wwz'] = {'freq': freq} args['mtm'] = {} args['lomb_scargle'] = {'freq': freq} args['welch'] = {} args['periodogram'] = {} args[method].update(settings) spec_res = spec_func[method](self.value, self.time, **args[method]) if type(spec_res) is dict: spec_res = dict2namedtuple(spec_res) if label is None: label = self.label psd = PSD( frequency=spec_res.freq, amplitude=spec_res.psd, label=label, timeseries=self, spec_method=method, spec_args=args[method] ) return psd
[docs] def wavelet(self, method='wwz', settings=None, freq_method='log', ntau=None, freq_kwargs=None, verbose=False): ''' Perform wavelet analysis on the timeseries cwt wavelets documented on https://pywavelets.readthedocs.io/en/latest/ref/cwt.html Parameters ---------- method : {wwz, cwt} Whether to use the wwz method for unevenly spaced timeseries or traditional cwt (from pywavelets) freq_method : str {'log', 'scale', 'nfft', 'lomb_scargle', 'welch'} freq_kwargs : dict Arguments for frequency vector ntau : int The length of the time shift points that determins the temporal resolution of the result. If None, it will be either the length of the input time axis, or at most 50. settings : dict Arguments for the specific spectral method verbose : bool If True, will print warning messages if there is any Returns ------- scal : Series.Scalogram See also -------- pyleoclim.utils.wavelet.wwz : wwz function pyleoclim.utils.wavelet.cwt : cwt function pyleoclim.utils.wavelet.make_freq : Functions to create the frequency vector pyleoclim.utils.tsutils.detrend : Detrending function pyleoclim.core.ui.Scalogram : Scalogram object pyleoclim.core.ui.MultipleScalogram : Multiple Scalogram object Examples -------- Wavelet analysis on the SOI record. .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1) time = data.iloc[:,1] value = data.iloc[:,2] ts = pyleo.Series(time=time,value=value,time_name='Year C.E', value_name='SOI', label='SOI') # WWZ scal = ts.wavelet() scal_signif = scal.signif_test(number=1) # for real work, should use number=200 or even larger @savefig spec_mtm.png fig, ax = scal_signif.plot() pyleo.closefig(fig) ''' if not verbose: warnings.simplefilter('ignore') settings = {} if settings is None else settings.copy() wave_func = { 'wwz': waveutils.wwz, 'cwt': waveutils.cwt, } if method == 'cwt' and 'freq' in settings.keys(): scales=1/np.array(settings['freq']) settings.update({'scales':scales}) del settings['freq'] freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() freq = waveutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs) args = {} if ntau is None: ntau = np.min([np.size(self.time), 50]) tau = np.linspace(np.min(self.time), np.max(self.time), ntau) args['wwz'] = {'tau': tau, 'freq': freq} args['cwt'] = {'wavelet' : 'morl', 'scales':1/freq} args[method].update(settings) wave_res = wave_func[method](self.value, self.time, **args[method]) scal = Scalogram( frequency=wave_res.freq, time=wave_res.time, amplitude=wave_res.amplitude, coi=wave_res.coi, label=self.label, timeseries=self, wave_method=method, freq_method=freq_method, freq_kwargs=freq_kwargs, wave_args=args[method], ) return scal
[docs] def wavelet_coherence(self, target_series, method='wwz', settings=None, freq_method='log', ntau=None, freq_kwargs=None, verbose=False): ''' Perform wavelet coherence analysis with the target timeseries Parameters ---------- target_series : pyleoclim.Series A pyleoclim Series object on which to perform the coherence analysis method : {'wwz'} freq_method : str {'log','scale', 'nfft', 'lomb_scargle', 'welch'} freq_kwargs : dict Arguments for frequency vector ntau : int The length of the time shift points that determins the temporal resolution of the result. If None, it will be either the length of the input time axis, or at most 50. settings : dict Arguments for the specific spectral method verbose : bool If True, will print warning messages if there is any Returns ------- coh : pyleoclim.Coherence See also -------- pyleoclim.utils.wavelet.xwt : Cross-wavelet analysis based on WWZ method pyleoclim.utils.wavelet.make_freq : Functions to create the frequency vector pyleoclim.utils.tsutils.detrend : Detrending function pyleoclim.core.ui.Coherence : Coherence object ''' if not verbose: warnings.simplefilter('ignore') settings = {} if settings is None else settings.copy() xwc_func = { 'wwz': waveutils.xwc, } freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy() freq = waveutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs) t1 = np.copy(self.time) t2 = np.copy(target_series.time) dt1 = np.median(np.diff(t1)) dt2 = np.median(np.diff(t2)) overlap = np.arange(np.max([t1[0], t2[0]]), np.min([t1[-1], t2[-1]]), np.max([dt1, dt2])) if ntau is None: ntau = np.min([np.size(overlap), 50]) tau = np.linspace(np.min(overlap), np.max(overlap), ntau) args = {} args['wwz'] = {'tau': tau, 'freq': freq} args[method].update(settings) xwc_res = xwc_func[method](self.value, self.time, target_series.value, target_series.time, **args[method]) coh = Coherence( frequency=xwc_res.freq, time=xwc_res.time, coherence=xwc_res.xw_coherence, phase=xwc_res.xw_phase, coi=xwc_res.coi, timeseries1=self, timeseries2=target_series, freq_method=freq_method, freq_kwargs=freq_kwargs, ) return coh
[docs] def correlation(self, target_series, timespan=None, alpha=0.05, settings=None, common_time_kwargs=None, seed=None): ''' Estimates the Pearson's correlation and associated significance between two non IID time series The significance of the correlation is assessed using one of the following methods: 1) 'ttest': T-test adjusted for effective sample size. 2) 'isopersistent': AR(1) modeling of x and y. 3) 'isospectral': phase randomization of original inputs. (default) The T-test is a parametric test, hence computationally cheap but can only be performed in ideal circumstances. The others are non-parametric, but their computational requirements scale with the number of simulations. The choise of significance test and associated number of Monte-Carlo simulations are passed through the settings parameter. Parameters ---------- target_series : pyleoclim.Series A pyleoclim Series object timespan : tuple The time interval over which to perform the calculation alpha : float The significance level (default: 0.05) settings : dict Parameters for the correlation function, including: nsim : int the number of simulations (default: 1000) method : str, {'ttest','isopersistent','isospectral' (default)} method for significance testing common_time_kwargs : dict Parameters for the method `MultipleSeries.common_time()`. Will use interpolation by default. seed : float or int random seed for isopersistent and isospectral methods Returns ------- corr_res_dict : dict the result dictionary, containing - r : float correlation coefficient - p : float the p-value - signif : bool true if significant; false otherwise Note that signif = True if and only if p <= alpha. See also -------- pyleoclim.utils.correlation.corr_sig : Correlation function Examples -------- Correlation between the Nino3.4 index and the Deasonalized All Indian Rainfall Index .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/wtc_test_data_nino.csv') t = data.iloc[:, 0] air = data.iloc[:, 1] nino = data.iloc[:, 2] ts_nino = pyleo.Series(time=t, value=nino) ts_air = pyleo.Series(time=t, value=air) # with `nsim=20` and default `method='isospectral'` # set an arbitrary randome seed to fix the result corr_res = ts_nino.correlation(ts_air, settings={'nsim': 20}, seed=2333) print(corr_res) # using a simple t-test # set an arbitrary randome seed to fix the result corr_res = ts_nino.correlation(ts_air, settings={'nsim': 20, 'method': 'ttest'}, seed=2333) print(corr_res) # using the method "isopersistent" # set an arbitrary randome seed to fix the result corr_res = ts_nino.correlation(ts_air, settings={'nsim': 20, 'method': 'isopersistent'}, seed=2333) print(corr_res) ''' settings = {} if settings is None else settings.copy() corr_args = {'alpha': alpha} corr_args.update(settings) ms = MultipleSeries([self, target_series]) if list(self.time) != list(target_series.time): common_time_kwargs = {} if common_time_kwargs is None else common_time_kwargs.copy() ct_args = {'method': 'interp'} ct_args.update(common_time_kwargs) ms = ms.common_time(**ct_args) if timespan is None: value1 = ms.series_list[0].value value2 = ms.series_list[1].value else: value1 = ms.series_list[0].slice(timespan).value value2 = ms.series_list[1].slice(timespan).value if seed is not None: np.random.seed(seed) corr_res = corrutils.corr_sig(value1, value2, **corr_args) signif = True if corr_res['signif'] == 1 else False corr_res_dict = { 'r': corr_res['r'], 'p': corr_res['p'], 'signif': signif, 'alpha': alpha, } return corr_res_dict
[docs] def causality(self, target_series, method='liang', settings=None): ''' Perform causality analysis with the target timeseries Parameters ---------- target_series : pyleoclim.Series A pyleoclim Series object on which to compute causality method : {'liang', 'granger'} The causality method to use. settings : dict Parameters associated with the causality methods. Note that each method has different parameters. See individual methods for details Returns ------- res : dict Dictionary containing the results of the the causality analysis. See indivudal methods for details See also -------- pyleoclim.utils.causality.liang_causality : Liang causality pyleoclim.utils.causality.granger_causality : Granger causality Examples -------- Liang causality .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data=pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/wtc_test_data_nino.csv') t=data.iloc[:,0] air=data.iloc[:,1] nino=data.iloc[:,2] ts_nino=pyleo.Series(time=t,value=nino) ts_air=pyleo.Series(time=t,value=air) # plot the two timeseries @savefig ts_nino.png fig, ax = ts_nino.plot(title='El Nino Region 3 -- SST Anomalies') pyleo.closefig(fig) @savefig ts_air.png fig, ax = ts_air.plot(title='Deasonalized All Indian Rainfall Index') pyleo.closefig(fig) # we use the specific params below in ts_nino.causality() just to make the example less heavier; # please drop the `settings` for real work caus_res = ts_nino.causality(ts_air, settings={'nsim': 2, 'signif_test': 'isopersist'}) print(caus_res) Granger causality .. ipython:: python :okwarning: caus_res = ts_nino.causality(ts_air, method='granger') print(caus_res) ''' settings = {} if settings is None else settings.copy() spec_func={ 'liang':causalutils.liang_causality, 'granger':causalutils.granger_causality} args = {} args['liang'] = {} args['granger'] = {} args[method].update(settings) causal_res = spec_func[method](self.value, target_series.value, **args[method]) return causal_res
[docs] def surrogates(self, method='ar1', number=1, length=None, seed=None, settings=None): ''' Generate surrogates with increasing time axis Parameters ---------- method : {ar1} Uses an AR1 model to generate surrogates of the timeseries number : int The number of surrogates to generate length : int Lenght of the series seed : int Control seed option for reproducibility settings : dict Parameters for surogate generator. See individual methods for details. Returns ------- surr : pyleoclim SurrogateSeries See also -------- pyleoclim.utils.tsmodel.ar1_sim : AR1 simulator ''' settings = {} if settings is None else settings.copy() surrogate_func = { 'ar1': tsmodel.ar1_sim, } args = {} args['ar1'] = {'t': self.time} args[method].update(settings) if seed is not None: np.random.seed(seed) surr_res = surrogate_func[method](self.value, number, **args[method]) if len(np.shape(surr_res)) == 1: surr_res = surr_res[:, np.newaxis] s_list = [] for s in surr_res.T: s_tmp = Series(time=self.time, value=s, time_name=self.time_name, time_unit=self.time_unit, value_name=self.value_name, value_unit=self.value_unit) s_list.append(s_tmp) surr = SurrogateSeries(series_list=s_list, surrogate_method=method, surrogate_args=args[method]) return surr
[docs] def outliers(self, auto=True, remove=True, fig_outliers=True,fig_knee=True, plot_outliers_kwargs=None,plot_knee_kwargs=None,figsize=[10,4], saveknee_settings=None,saveoutliers_settings=None, mute=False): ''' Detects outliers in a timeseries and removes if specified Parameters ---------- auto : boolean True by default, detects knee in the plot automatically remove : boolean True by default, removes all outlier points if detected fig_knee : boolean True by default, plots knee plot if true fig_outliers : boolean True by degault, plots outliers if true save_knee : dict default parameters from matplotlib savefig None by default save_outliers : dict default parameters from matplotlib savefig None by default plot_knee_kwargs : dict arguments for the knee plot plot_outliers_kwargs : dict arguments for the outliers plot figsize : list by default [10,4] mute : {True,False} if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax Returns ------- new : Series Time series with outliers removed if they exist See also -------- pyleoclim.utils.tsutils.remove_outliers : remove outliers function pyleoclim.utils.plotting.plot_xy : basic x-y plot pyleoclim.utils.plotting.plot_scatter_xy : Scatter plot on top of a line plot ''' new = self.copy() #outlier_indices,fig1,ax1,fig2,ax2 = tsutils.detect_outliers(self.time, self.value, auto=auto, plot_knee=fig_knee,plot_outliers=fig_outliers,\ # figsize=figsize,save_knee=save_knee,save_outliers=save_outliers,plot_outliers_kwargs=plot_outliers_kwargs,plot_knee_kwargs=plot_knee_kwargs) outlier_indices = tsutils.detect_outliers( self.time, self.value, auto=auto, plot_knee=fig_knee,plot_outliers=fig_outliers, figsize=figsize,saveknee_settings=saveknee_settings,saveoutliers_settings=saveoutliers_settings, plot_outliers_kwargs=plot_outliers_kwargs,plot_knee_kwargs=plot_knee_kwargs, mute=mute, ) outlier_indices = np.asarray(outlier_indices) if remove == True: new = self.copy() ys = np.delete(self.value, outlier_indices) t = np.delete(self.time, outlier_indices) new.value = ys new.time = t return new
[docs] def interp(self, method='linear', **kwargs): '''Interpolate a Series object onto a new time axis Parameters ---------- method : {‘linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’} where ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a spline interpolation of zeroth, first, second or third order; ‘previous’ and ‘next’ simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is ‘linear’. kwargs : Arguments specific to each interpolation function. See pyleoclim.utils.tsutils.interp for details Returns ------- new : pyleoclim.Series An interpolated Series object See also -------- pyleoclim.utils.tsutils.interp : interpolation function ''' new = self.copy() ti, vi = tsutils.interp(self.time,self.value,interp_type=method,**kwargs) new.time = ti new.value = vi return new
[docs] def gkernel(self, step_type='median', **kwargs): ''' Coarse-grain a Series object via a Gaussian kernel. Parameters ---------- step_type : str type of timestep: 'mean', 'median', or 'max' of the time increments kwargs : Arguments for kernel function. See pyleoclim.utils.tsutils.gkernel for details Returns ------- new : pyleoclim.Series The coarse-grained Series object See also -------- pyleoclim.utils.tsutils.gkernel : application of a Gaussian kernel ''' new=self.copy() ti, vi = tsutils.gkernel(self.time, self.value, **kwargs) # apply kernel new.time = ti new.value = vi return new
[docs] def bin(self,**kwargs): '''Bin values in a time series Parameters ---------- kwargs : Arguments for binning function. See pyleoclim.utils.tsutils.bin for details Returns ------- new : pyleoclim.Series An binned Series object See also -------- pyleoclim.utils.tsutils.bin : bin the time series into evenly-spaced bins ''' new=self.copy() res_dict = tsutils.bin(self.time,self.value,**kwargs) new.time = res_dict['bins'] new.value = res_dict['binned_values'] return new
[docs]class PSD: '''PSD object obtained from spectral analysis. See examples in pyleoclim.core.ui.Series.spectral to see how to create and manipulate these objects See also -------- pyleoclim.core.ui.Series.spectral : spectral analysis ''' def __init__(self, frequency, amplitude, label=None, timeseries=None, plot_kwargs=None, spec_method=None, spec_args=None, signif_qs=None, signif_method=None, period_unit=None, beta_est_res=None): self.frequency = np.array(frequency) self.amplitude = np.array(amplitude) self.label = label self.timeseries = timeseries self.spec_method = spec_method self.spec_args = spec_args self.signif_qs = signif_qs self.signif_method = signif_method self.plot_kwargs = {} if plot_kwargs is None else plot_kwargs.copy() self.beta_est_res = beta_est_res if period_unit is not None: self.period_unit = period_unit elif timeseries is not None: self.period_unit = infer_period_unit_from_time_unit(timeseries.time_unit) else: self.period_unit = None
[docs] def copy(self): '''Copy object ''' return deepcopy(self)
def __str__(self): table = { 'Frequency': self.frequency, 'Amplitude': self.amplitude, } msg = print(tabulate(table, headers='keys')) return f'Length: {np.size(self.frequency)}'
[docs] def signif_test(self, number=200, method='ar1', seed=None, qs=[0.95], settings=None): ''' Parameters ---------- number : int, optional Number of surrogate series to generate for significance testing. The default is 200. method : {ar1}, optional Method to generate surrogates. The default is 'ar1'. seed : int, optional Option to set the seed for reproducibility. The default is None. qs : list, optional Singificance levels to return. The default is [0.95]. settings : dict, optional Parameters. The default is None. Returns ------- new : pyleoclim.PSD New PSD object with appropriate significance test ''' if number == 0: return self new = self.copy() surr = self.timeseries.surrogates( number=number, seed=seed, method=method, settings=settings ) surr_psd = surr.spectral(method=self.spec_method, settings=self.spec_args) new.signif_qs = surr_psd.quantiles(qs=qs) new.signif_method = method return new
[docs] def beta_est(self, fmin=None, fmax=None, logf_binning_step='max', verbose=False): ''' Estimate the scaling factor beta of the PSD in a log-log space Parameters ---------- fmin : float the minimum frequency edge for beta estimation; the default is the minimum of the frequency vector of the PSD obj fmax : float the maximum frequency edge for beta estimation; the default is the maximum of the frequency vector of the PSD obj logf_binning_step : str, {'max', 'first'} if 'max', then the maximum spacing of log(f) will be used as the binning step if 'first', then the 1st spacing of log(f) will be used as the binning step verbose : bool If True, will print warning messages if there is any Returns ------- res_dict : dictionary - beta: the scaling factor - std_err: the one standard deviation error of the scaling factor - f_binned: the binned frequency series, used as X for linear regression - psd_binned: the binned PSD series, used as Y for linear regression - Y_reg: the predicted Y from linear regression, used with f_binned for the slope curve plotting ''' if fmin is None: fmin = np.min(self.frequency) if fmax is None: fmax = np.max(self.frequency) res = waveutils.beta_estimation(self.amplitude, self.frequency, fmin=fmin, fmax=fmax, logf_binning_step=logf_binning_step, verbose=verbose) res_dict = { 'beta': res.beta, 'std_err': res.std_err, 'f_binned': res.f_binned, 'psd_binned': res.psd_binned, 'Y_reg': res.Y_reg, } return res_dict
[docs] def plot(self, in_loglog=True, in_period=True, label=None, xlabel=None, ylabel='PSD', title=None, marker=None, markersize=None, color=None, linestyle=None, linewidth=None, transpose=False, xlim=None, ylim=None, figsize=[10, 4], savefig_settings=None, ax=None, mute=False, legend=True, lgd_kwargs=None, xticks=None, yticks=None, alpha=None, zorder=None, plot_kwargs=None, signif_clr='red', signif_linestyles=['--', '-.', ':'], signif_linewidth=1): '''Plots the PSD estimates and signif level if included Parameters ---------- in_loglog : bool, optional Plot on loglog axis. The default is True. in_period : bool, optional Plot the x-axis as periodicity rather than frequency. The default is True. label : str, optional label for the series. The default is None. xlabel : str, optional Label for the x-axis. The default is None. Will guess based on Series ylabel : str, optional Label for the y-axis. The default is 'PSD'. title : str, optional Plot title. The default is None. marker : str, optional marker to use. The default is None. markersize : int, optional size of the marker. The default is None. color : str, optional Line color. The default is None. linestyle : str, optional linestyle. The default is None. linewidth : float, optional Width of the line. The default is None. transpose : bool, optional Plot periodicity on y-. The default is False. xlim : list, optional x-axis limits. The default is None. ylim : list, optional y-axis limits. The default is None. figsize : list, optional Figure size. The default is [10, 4]. savefig_settings : dict, optional save settings options. The default is None. the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} ax : ax, optional The matplotlib.Axes object onto which to return the plot. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax The default is False. legend : bool, optional whether to plot the legend. The default is True. lgd_kwargs : dict, optional Arguments for the legend. The default is None. xticks : list, optional xticks to use. The default is None. yticks : list, optional yticks to use. The default is None. alpha : float, optional Transparency setting. The default is None. zorder : int, optional Order for the plot. The default is None. plot_kwargs : dict, optional Other plotting argument. The default is None. signif_clr : str, optional Color for the significance line. The default is 'red'. signif_linestyles : list of str, optional Linestyles for significance. The default is ['--', '-.', ':']. signif_linewidth : float, optional width of the significance line. The default is 1. Returns ------- fig, ax See also -------- pyleoclim.core.ui.Series.spectral : spectral analysis ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() plot_kwargs = self.plot_kwargs if plot_kwargs is None else plot_kwargs.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() if label is None: label = self.label if label is not None: plot_kwargs.update({'label': label}) if marker is not None: plot_kwargs.update({'marker': marker}) if markersize is not None: plot_kwargs.update({'markersize': markersize}) if color is not None: plot_kwargs.update({'color': color}) if linestyle is not None: plot_kwargs.update({'linestyle': linestyle}) if linewidth is not None: plot_kwargs.update({'linewidth': linewidth}) if alpha is not None: plot_kwargs.update({'alpha': alpha}) if zorder is not None: plot_kwargs.update({'zorder': zorder}) if ax is None: fig, ax = plt.subplots(figsize=figsize) if in_period: idx = np.argwhere(self.frequency==0) x_axis = 1/np.delete(self.frequency, idx) y_axis = np.delete(self.amplitude, idx) if xlabel is None: xlabel = f'Period [{self.period_unit}]' if self.period_unit is not None else 'Period' if xticks is None: xticks_default = np.array([0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5, 1e6]) mask = (xticks_default >= np.nanmin(x_axis)) & (xticks_default <= np.nanmax(x_axis)) xticks = xticks_default[mask] if xlim is None: xlim = [np.max(xticks), np.min(xticks)] else: idx = np.argwhere(self.frequency==0) x_axis = np.delete(self.frequency, idx) y_axis = np.delete(self.amplitude, idx) if xlabel is None: xlabel = f'Frequency [1/{self.period_unit}]' if self.period_unit is not None else 'Frequency' if xlim is None: xlim = ax.get_xlim() xlim = [np.min(xlim), np.max(xlim)] if transpose: x_axis, y_axis = y_axis, x_axis xlim, ylim = ylim, xlim xticks, yticks = yticks, xticks xlabel, ylabel = ylabel, xlabel ax.set_ylim(ylim[::-1]) else: ax.set_xlim(xlim) ax.plot(x_axis, y_axis, **plot_kwargs) if xlim is not None: ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) # plot significance levels if self.signif_qs is not None: signif_method_label = { 'ar1': 'AR(1)', } nqs = np.size(self.signif_qs.psd_list) for i, q in enumerate(self.signif_qs.psd_list): idx = np.argwhere(q.frequency==0) signif_x_axis = 1/np.delete(q.frequency, idx) if in_period else np.delete(q.frequency, idx) signif_y_axis = np.delete(q.amplitude, idx) if transpose: signif_x_axis, signif_y_axis = signif_y_axis, signif_x_axis ax.plot( signif_x_axis, signif_y_axis, label=f'{signif_method_label[self.signif_method]}, {q.label} threshold', color=signif_clr, linestyle=signif_linestyles[i%3], linewidth=signif_linewidth, ) if in_loglog: ax.set_xscale('log') ax.set_yscale('log') if xticks is not None: ax.set_xticks(xticks) ax.xaxis.set_major_formatter(ScalarFormatter()) ax.xaxis.set_major_formatter(FormatStrFormatter('%g')) if yticks is not None: ax.set_yticks(yticks) ax.yaxis.set_major_formatter(ScalarFormatter()) ax.yaxis.set_major_formatter(FormatStrFormatter('%g')) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) if legend: lgd_args = {'frameon': False} lgd_args.update(lgd_kwargs) ax.legend(**lgd_args) if title is not None: ax.set_title(title) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs]class Scalogram: def __init__(self, frequency, time, amplitude, coi=None, label=None, Neff=3, timeseries=None, wave_method=None, wave_args=None, signif_qs=None, signif_method=None, freq_method=None, freq_kwargs=None, period_unit=None, time_label=None): ''' Parameters ---------- frequency : array the frequency axis time : array the time axis amplitude : array the amplitude at each (frequency, time) point; note the dimension is assumed to be (frequency, time) ''' self.frequency = np.array(frequency) self.time = np.array(time) self.amplitude = np.array(amplitude) if coi is not None: self.coi = np.array(coi) else: self.coi = waveutils.make_coi(self.time, Neff=Neff) self.label = label self.timeseries = timeseries self.wave_method = wave_method self.wave_args = wave_args self.signif_qs = signif_qs self.signif_method = signif_method self.freq_method = freq_method self.freq_kwargs = freq_kwargs if period_unit is not None: self.period_unit = period_unit elif timeseries is not None: self.period_unit = infer_period_unit_from_time_unit(timeseries.time_unit) else: self.period_unit = None if time_label is not None: self.time_label = time_label elif timeseries is not None: if timeseries.time_unit is not None: self.time_label = f'{timeseries.time_name} [{timeseries.time_unit}]' else: self.time_label = f'{timeseries.time_name}' else: self.time_label = None
[docs] def copy(self): '''Copy object ''' return deepcopy(self)
def __str__(self): table = { 'Frequency': self.frequency, 'Time': self.time, 'Amplitude': self.amplitude, } msg = print(tabulate(table, headers='keys')) return f'Dimension: {np.size(self.frequency)} x {np.size(self.time)}'
[docs] def plot(self, in_period=True, xlabel=None, ylabel=None, title=None, ylim=None, xlim=None, yticks=None, figsize=[10, 8], mute=False, signif_clr='white', signif_linestyles='-', signif_linewidths=1, contourf_style={}, cbar_style={}, savefig_settings={}, ax=None): '''Plot the scalogram Parameters ---------- in_period : bool, optional Plot the in period instead of frequency space. The default is True. xlabel : str, optional Label for the x-axis. The default is None. ylabel : str, optional Label for the y-axis. The default is None. title : str, optional Title for the figure. The default is None. ylim : list, optional Limits for the y-axis. The default is None. xlim : list, optional Limits for the x-axis. The default is None. yticks : list, optional yticks label. The default is None. figsize : list, optional Figure size The default is [10, 8]. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax The default is False. signif_clr : str, optional Color of the singificance line. The default is 'white'. signif_linestyles : str, optional Linestyle of the significance line. The default is '-'. signif_linewidths : float, optional Width for the significance line. The default is 1. contourf_style : dict, optional Arguments for the contour plot. The default is {}. cbar_style : dict, optional Arguments for the colarbar. The default is {}. savefig_settings : dict, optional saving options for the figure. The default is {}. ax : ax, optional Matplotlib Axis on which to return the figure. The default is None. Returns ------- fig, ax See also -------- pyleoclim.core.ui.Series.wavelet : Wavelet analysis ''' # Turn the interactive mode off. plt.ioff() contourf_args = {'cmap': 'magma', 'origin': 'lower', 'levels': 11} contourf_args.update(contourf_style) if ax is None: fig, ax = plt.subplots(figsize=figsize) if in_period: y_axis = 1/self.frequency if ylabel is None: ylabel = f'Period [{self.period_unit}]' if self.period_unit is not None else 'Period' if yticks is None: yticks_default = np.array([0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5, 1e6]) mask = (yticks_default >= np.min(y_axis)) & (yticks_default <= np.max(y_axis)) yticks = yticks_default[mask] else: y_axis = self.frequency if ylabel is None: ylabel = f'Frequency [1/{self.period_unit}]' if self.period_unit is not None else 'Frequency' cont = ax.contourf(self.time, y_axis, self.amplitude.T, **contourf_args) ax.set_yscale('log') # plot colorbar cbar_args = {'drawedges': False, 'orientation': 'vertical', 'fraction': 0.15, 'pad': 0.05} cbar_args.update(cbar_style) cb = plt.colorbar(cont, **cbar_args) # plot cone of influence if self.coi is not None: ax.plot(self.time, self.coi, 'k--') if yticks is not None: ax.set_yticks(yticks) ax.yaxis.set_major_formatter(ScalarFormatter()) ax.yaxis.set_major_formatter(FormatStrFormatter('%g')) if title is not None: ax.set_title(title) if ylim is None: ylim = [np.min(y_axis), np.min([np.max(y_axis), np.max(self.coi)])] ax.fill_between(self.time, self.coi, np.max(self.coi), color='white', alpha=0.5) # plot significance levels if self.signif_qs is not None: signif_method_label = { 'ar1': 'AR(1)', } signif_scal = self.signif_qs.scalogram_list[0] signif_boundary = self.amplitude.T / signif_scal.amplitude.T ax.contour( self.time, y_axis, signif_boundary, [-99, 1], colors=signif_clr, linestyles=signif_linestyles, linewidths=signif_linewidths, ) if xlabel is None: xlabel = self.time_label if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) if xlim is not None: ax.set_xlim(xlim) ax.set_ylim(ylim) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def signif_test(self, number=200, method='ar1', seed=None, qs=[0.95], settings=None): '''Significance test for wavelet analysis Parameters ---------- number : int, optional Number of surrogates to generate for significance analysis. The default is 200. method : {'ar1'}, optional Method to use to generate the surrogates. The default is 'ar1'. seed : int, optional Set the seed for the random number generator. Useful for reproducibility The default is None. qs : list, optional Significane level to consider. The default is [0.95]. settings : dict, optional Parameters for the model. The default is None. Raises ------ ValueError qs should be a list with at least one value. Returns ------- new : pyleoclim.Scalogram A new Scalogram object with the significance level See also -------- pyleoclim.core.ui.Series.wavelet : wavelet analysis ''' if number == 0: return self new = self.copy() surr = self.timeseries.surrogates( number=number, seed=seed, method=method, settings=settings ) surr_scal = surr.wavelet(method=self.wave_method, settings=self.wave_args) if len(qs) > 1: raise ValueError('qs should be a list with size 1!') new.signif_qs = surr_scal.quantiles(qs=qs) new.signif_method = method return new
[docs]class Coherence: '''Coherence object See also -------- pyleoclim.core.ui.Series.wavelet_coherence : Wavelet coherence ''' def __init__(self, frequency, time, coherence, phase, coi=None, timeseries1=None, timeseries2=None, signif_qs=None, signif_method=None, freq_method=None, freq_kwargs=None, Neff=3, period_unit=None, time_label=None): self.frequency = np.array(frequency) self.time = np.array(time) self.coherence = np.array(coherence) if coi is not None: self.coi = np.array(coi) else: self.coi = waveutils.make_coi(self.time, Neff=Neff) self.phase = np.array(phase) self.timeseries1 = timeseries1 self.timeseries2 = timeseries2 self.signif_qs = signif_qs self.signif_method = signif_method self.freq_method = freq_method self.freq_kwargs = freq_kwargs if period_unit is not None: self.period_unit = period_unit elif timeseries1 is not None: self.period_unit = infer_period_unit_from_time_unit(timeseries1.time_unit) elif timeseries2 is not None: self.period_unit = infer_period_unit_from_time_unit(timeseries2.time_unit) else: self.period_unit = None if time_label is not None: self.time_label = time_label elif timeseries1 is not None: if timeseries1.time_unit is not None: self.time_label = f'{timeseries1.time_name} [{timeseries1.time_unit}]' else: self.time_label = f'{timeseries1.time_name}' elif timeseries2 is not None: if timeseries2.time_unit is not None: self.time_label = f'{timeseries2.time_name} [{timeseries2.time_unit}]' else: self.time_label = f'{timeseries2.time_name}' else: self.time_label = None
[docs] def copy(self): '''Copy object ''' return deepcopy(self)
[docs] def plot(self, xlabel=None, ylabel=None, title=None, figsize=[10, 8], ylim=None, xlim=None, in_period=True, yticks=None, mute=False, contourf_style={}, phase_style={}, cbar_style={}, savefig_settings={}, ax=None, signif_clr='white', signif_linestyles='-', signif_linewidths=1, under_clr='ivory', over_clr='black', bad_clr='dimgray'): '''Plot the cross-wavelet results Parameters ---------- xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is None. title : str, optional Title of the plot. The default is None. figsize : list, optional Figure size. The default is [10, 8]. ylim : list, optional y-axis limits. The default is None. xlim : list, optional x-axis limits. The default is None. in_period : bool, optional Plots periods instead of frequencies The default is True. yticks : list, optional y-ticks label. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax The default is False. The default is False. contourf_style : dict, optional Arguments for the contour plot. The default is {}. phase_style : dict, optional Arguments for the phase arrows. The default is {}. It includes: - 'pt': the default threshold above which phase arrows will be plotted - 'skip_x': the number of points to skip between phase arrows along the x-axis - 'skip_y': the number of points to skip between phase arrows along the y-axis - 'scale': number of data units per arrow length unit (see matplotlib.pyplot.quiver) - 'width': shaft width in arrow units (see matplotlib.pyplot.quiver) cbar_style : dict, optional Arguments for the color bar. The default is {}. savefig_settings : dict, optional The default is {}. the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} ax : ax, optional Matplotlib axis on which to return the figure. The default is None. signif_clr : str, optional Color of the singificance line. The default is 'white'. signif_linestyles : str, optional Style of the significance line. The default is '-'. signif_linewidths : float, optional Width of the significance line. The default is 1. under_clr : str, optional Color for under 0. The default is 'ivory'. over_clr : str, optional Color for over 1. The default is 'black'. bad_clr : str, optional Color for missing values. The default is 'dimgray'. Returns ------- fig, ax See also -------- pyleoclim.core.ui.Series.wavelet_coherence matplotlib.pyplot.quiver ''' # Turn the interactive mode off. plt.ioff() if ax is None: fig, ax = plt.subplots(figsize=figsize) # handling NaNs mask_freq = [] for i in range(np.size(self.frequency)): if all(np.isnan(self.coherence[:, i])): mask_freq.append(False) else: mask_freq.append(True) if in_period: y_axis = 1/self.frequency[mask_freq] if ylabel is None: ylabel = f'Period [{self.period_unit}]' if self.period_unit is not None else 'Period' if yticks is None: yticks_default = np.array([0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5, 1e6]) mask = (yticks_default >= np.min(y_axis)) & (yticks_default <= np.max(y_axis)) yticks = yticks_default[mask] else: y_axis = self.frequency[mask_freq] if ylabel is None: ylabel = f'Frequency [1/{self.period_unit}]' if self.period_unit is not None else 'Frequency' # plot coherence amplitude contourf_args = { 'cmap': 'magma', 'origin': 'lower', 'levels': np.linspace(0, 1, 11), } contourf_args.update(contourf_style) cmap = cm.get_cmap(contourf_args['cmap']) cmap.set_under(under_clr) cmap.set_over(over_clr) cmap.set_bad(bad_clr) contourf_args['cmap'] = cmap cont = ax.contourf(self.time, y_axis, self.coherence[:, mask_freq].T, **contourf_args) # plot significance levels if self.signif_qs is not None: signif_method_label = { 'ar1': 'AR(1)', } signif_coh = self.signif_qs.scalogram_list[0] signif_boundary = self.coherence[:, mask_freq].T / signif_coh.amplitude[:, mask_freq].T ax.contour( self.time, y_axis, signif_boundary, [-99, 1], colors=signif_clr, linestyles=signif_linestyles, linewidths=signif_linewidths, ) # plot colorbar cbar_args = { 'drawedges': False, 'orientation': 'vertical', 'fraction': 0.15, 'pad': 0.05, 'ticks': np.linspace(0, 1, 11) } cbar_args.update(cbar_style) cb = plt.colorbar(cont, **cbar_args) # plot cone of influence ax.set_yscale('log', nonposy='clip') ax.plot(self.time, self.coi, 'k--') if ylim is None: ylim = [np.min(y_axis), np.min([np.max(y_axis), np.max(self.coi)])] ax.fill_between(self.time, self.coi, np.max(self.coi), color='white', alpha=0.5) if yticks is not None: ax.set_yticks(yticks) ax.yaxis.set_major_formatter(ScalarFormatter()) ax.yaxis.set_major_formatter(FormatStrFormatter('%g')) if xlabel is None: xlabel = self.time_label if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) # plot phase skip_x = np.max([int(np.size(self.time)//20), 1]) skip_y = np.max([int(np.size(y_axis)//20), 1]) phase_args = {'pt': 0.5, 'skip_x': skip_x, 'skip_y': skip_y, 'scale': 30, 'width': 0.004} phase_args.update(phase_style) pt = phase_args['pt'] skip_x = phase_args['skip_x'] skip_y = phase_args['skip_y'] scale = phase_args['scale'] width = phase_args['width'] phase = np.copy(self.phase)[:, mask_freq] if self.signif_qs is None: phase[self.coherence < pt] = np.nan else: phase[signif_boundary.T < 1] = np.nan X, Y = np.meshgrid(self.time, 1/self.frequency[mask_freq]) U, V = np.cos(phase).T, np.sin(phase).T ax.quiver(X[::skip_y, ::skip_x], Y[::skip_y, ::skip_x], U[::skip_y, ::skip_x], V[::skip_y, ::skip_x], scale=scale, width=width, zorder=99) ax.set_ylim(ylim) if xlim is not None: ax.set_xlim(xlim) if title is not None: ax.set_title(title) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def signif_test(self, number=200, method='ar1', seed=None, qs=[0.95], settings=None, mute_pbar=False): '''Significance testing Parameters ---------- number : int, optional Number of surrogate series to create for significance testing. The default is 200. method : {'ar1'}, optional Method through which to generate the surrogate series. The default is 'ar1'. seed : int, optional Fixes the seed for the random number generator. Useful for reproducibility. The default is None. qs : list, optional Significanc level to return. The default is [0.95]. settings : dict, optional Parameters for surrogate model. The default is None. mute_pbar : bool, optional Mute the progress bar. The default is False. Returns ------- new : pyleoclim.Coherence Coherence with significance level See also -------- pyleoclim.core.ui.Series.wavelet_coherence : Wavelet coherence ''' if number == 0: return self new = self.copy() surr1 = self.timeseries1.surrogates( number=number, seed=seed, method=method, settings=settings ) surr2 = self.timeseries2.surrogates( number=number, seed=seed, method=method, settings=settings ) cohs = [] for i in tqdm(range(number), desc='Performing wavelet coherence on surrogate pairs', total=number, disable=mute_pbar): coh_tmp = surr1.series_list[i].wavelet_coherence(surr2.series_list[i], freq_method=self.freq_method, freq_kwargs=self.freq_kwargs) cohs.append(coh_tmp.coherence) cohs = np.array(cohs) ne, nf, nt = np.shape(cohs) coh_qs = np.ndarray(shape=(np.size(qs), nf, nt)) for i in range(nf): for j in range(nt): coh_qs[:,i,j] = mquantiles(cohs[:,i,j], qs) scal_list = [] for i, amp in enumerate(coh_qs): scal_tmp = Scalogram( frequency=self.frequency, time=self.time, amplitude=amp, coi=self.coi, freq_method=self.freq_method, freq_kwargs=self.freq_kwargs, label=f'{qs[i]*100:g}%', ) scal_list.append(scal_tmp) new.signif_qs = MultipleScalogram(scalogram_list=scal_list) new.signif_method = method return new
[docs]class MultipleSeries: '''MultipleSeries object. This object handles multiple objects of the type Series and can be created from a list of Series objects. MultipleSeries should be used when the need to run analysis on multiple records arises, such as running principal component analysis. Some of the methods automatically rescale the time axis prior to analysis to ensure that the analysis is run over the same time period. Parameters ---------- series_list : list a list of pyleoclim.Series objects time_unit : str The target time unit for every series in the list. If None, then no conversion will be applied; Otherwise, the time unit of every series in the list will be converted to the target. name : str name of the collection of timeseries (e.g. 'PAGES 2k ice cores') Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv( 'https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv', skiprows=0, header=1 ) time = data.iloc[:,1] value = data.iloc[:,2] ts1 = pyleo.Series(time=time, value=value, time_unit='years') ts2 = pyleo.Series(time=time, value=value, time_unit='years') ms = pyleo.MultipleSeries([ts1, ts2], name = 'SOI x2') ''' def __init__(self, series_list, time_unit=None, name=None): self.series_list = series_list self.time_unit = time_unit self.name = name if self.time_unit is not None: new_ts_list = [] for ts in self.series_list: new_ts = ts.convert_time_unit(time_unit=self.time_unit) new_ts_list.append(new_ts) self.series_list = new_ts_list
[docs] def convert_time_unit(self, time_unit='years'): ''' Convert the time unit of the timeseries Parameters ---------- time_unit : str the target time unit, possible input: { 'year', 'years', 'yr', 'yrs', 'y BP', 'yr BP', 'yrs BP', 'year BP', 'years BP', 'ky BP', 'kyr BP', 'kyrs BP', 'ka BP', 'ka', 'my BP', 'myr BP', 'myrs BP', 'ma BP', 'ma', } Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo import pandas as pd data = pd.read_csv( 'https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv', skiprows=0, header=1 ) time = data.iloc[:,1] value = data.iloc[:,2] ts1 = pyleo.Series(time=time, value=value, time_unit='years') ts2 = pyleo.Series(time=time, value=value, time_unit='years') ms = pyleo.MultipleSeries([ts1, ts2]) new_ms = ms.convert_time_unit('yr BP') print('Original timeseries:') print('time unit:', ms.time_unit) print() print('Converted timeseries:') print('time unit:', new_ms.time_unit) ''' new_ms = self.copy() new_ts_list = [] for ts in self.series_list: new_ts = ts.convert_time_unit(time_unit=time_unit) new_ts_list.append(new_ts) new_ms.time_unit = time_unit new_ms.series_list = new_ts_list return new_ms
[docs] def filter(self, cutoff_freq=None, cutoff_scale=None, method='butterworth', **kwargs): ''' Filtering the timeseries in the MultipleSeries object Parameters ---------- method : str, {'savitzky-golay', 'butterworth', 'firwin'} the filtering method - 'butterworth': the Butterworth method (default) - 'savitzky-golay': the Savitzky-Golay method - 'firwin': FIR filter design using the window method, with default window as Hamming - 'lanczos': lowpass filter via Lanczos resampling cutoff_freq : float or list The cutoff frequency only works with the Butterworth method. If a float, it is interpreted as a low-frequency cutoff (lowpass). If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass). cutoff_scale : float or list cutoff_freq = 1 / cutoff_scale The cutoff scale only works with the Butterworth method and when cutoff_freq is None. If a float, it is interpreted as a low-frequency (high-scale) cutoff (lowpass). If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass). kwargs : dict a dictionary of the keyword arguments for the filtering method, see `pyleoclim.utils.filter.savitzky_golay`, `pyleoclim.utils.filter.butterworth`, and `pyleoclim.utils.filter.firwin` for the details Returns ------- ms : pyleoclim.MultipleSeries See also -------- pyleoclim.utils.filter.butterworth : Butterworth method pyleoclim.utils.filter.savitzky_golay : Savitzky-Golay method pyleoclim.utils.filter.firwin : FIR filter design using the window method pyleoclim.utils.filter.lanczos : lowpass filter via Lanczos resampling ''' ms = self.copy() new_tslist = [] for ts in self.series_list: new_tslist.append(ts.filter(cutoff_freq=cutoff_freq, cutoff_scale=cutoff_scale, method=method, **kwargs)) ms.series_list = new_tslist return ms
[docs] def append(self,ts): '''Append timeseries ts to MultipleSeries object Returns ------- ms : pyleoclim.MultipleSeries The augmented object, comprising the old one plus `ts` ''' ms = self.copy() ts_list = deepcopy(ms.series_list) ts_list.append(ts) ms = MultipleSeries(ts_list) return ms
[docs] def copy(self): '''Copy the object ''' return deepcopy(self)
[docs] def standardize(self): '''Standardize each series object Returns ------- ms : pyleoclim.MultipleSeries The standardized Series ''' ms=self.copy() for idx,item in enumerate(ms.series_list): s=item.copy() v_mod=tsutils.standardize(item.value)[0] s.value=v_mod ms.series_list[idx]=s return ms
[docs] def common_time(self, method='interp', common_step = 'max', start=None, stop = None, step=None, step_style = None, **kwargs): ''' Aligns the time axes of a MultipleSeries object, via binning interpolation., or Gaussian kernel. Alignmentis critical for workflows that need to assume a common time axis for the group of series under consideration. The common time axis is characterized by the following parameters: start : the latest start date of the bunch (maximun of the minima) stop : the earliest stop date of the bunch (minimum of the maxima) step : The representative spacing between consecutive values Optional arguments for binning or interpolation are those of the underling functions. If the time axis are retrograde, this step makes them prograde. Parameters ---------- method : string either 'bin', 'interp' [default] or 'gkernel' common_step : string Method to obtain a representative step among all Series Valid entries: 'median' [default], 'mean', 'mode' or 'max' start : float starting point of the common time axis [default = None] stop : float end point of the common time axis [default = None] step : float increment the common time axis [default = None] if not provided, `pyleoclim` will use `grid_properties()` to determine these parameters step_style : 'string' step style to be applied from `grid_properties` [default = None] kwargs: dict keyword arguments (dictionary) of the various methods Returns ------- ms : pyleoclim.MultipleSeries The MultipleSeries objects with all series aligned to the same time axis. See also -------- pyleoclim.utils.tsutils.bin : put timeseries values into equal bins (possibly leaving NaNs in). pyleoclim.utils.tsutils.gkernel : coarse-graining using a Gaussian kernel pyleoclim.utils.tsutils.interp : interpolation onto a regular grid (default = linear interpolation) pyleoclim.utils.tsutils.grid_properties : infer grid properties Examples -------- .. ipython:: python :okwarning: import numpy as np import pyleoclim as pyleo import matplotlib.pyplot as plt from pyleoclim.utils.tsmodel import colored_noise # create 2 incompletely sampled series ns = 2 ; nt = 200; n_del = 20 serieslist = [] for j in range(ns): t = np.arange(nt) v = colored_noise(alpha=1, t=t) deleted_idx = np.random.choice(range(np.size(t)), n_del, replace=False) tu = np.delete(t, deleted_idx) vu = np.delete(v, deleted_idx) ts = pyleo.Series(time = tu, value = vu, label = 'series ' + str(j+1)) serieslist.append(ts) # create MS object from the list ms = pyleo.MultipleSeries(serieslist) fig, ax = plt.subplots(2,2) ax = ax.flatten() # apply common_time with default parameters msc = ms.common_time() msc.plot(title='linear interpolation',ax=ax[0]) # apply common_time with binning msc = ms.common_time(method='bin') msc.plot(title='Binning',ax=ax[1], legend=False) # apply common_time with gkernel msc = ms.common_time(method='gkernel') msc.plot(title=r'Gaussian kernel ($h=3$)',ax=ax[2],legend=False) # apply common_time with gkernel modified msc = ms.common_time(method='gkernel', h=11) msc.plot(title=r'Gaussian kernel ($h=11$)',ax=ax[3],legend=False) # display, save and close figure fig.tight_layout() @savefig ms_ct.png pyleo.showfig(fig) pyleo.closefig(fig) ''' if step_style == None: if method == 'bin' or method == 'gkernel': step_style = 'max' elif method == 'interp': step_style = 'mean' gp = np.empty((len(self.series_list),3)) # obtain grid parameters for idx,item in enumerate(self.series_list): item = item.clean(verbose=idx==0) gp[idx,:] = tsutils.grid_properties(item.time, step_style=step_style) # if gp[idx,2] < 0: # flip time axis if retrograde # item = item.clean(verbose=idx==0) # gp[idx,2] = abs(gp[idx,2]) # define parameters for common time axis start = gp[:,0].max() stop = gp[:,1].min() if start > stop: raise ValueError('At least one series has no common time interval with others. Please check the time axis of the series.') if common_step == 'mean': step = gp[:,2].mean() elif common_step == 'max': step = gp[:,2].max() elif common_step == 'mode': step = stats.mode(gp[:,2])[0][0] else: step = np.median(gp[:,2]) ms = self.copy() if method == 'bin': for idx,item in enumerate(self.series_list): ts = item.copy() d = tsutils.bin(ts.time, ts.value, bin_size=step, start=start, stop=stop, evenly_spaced = False, **kwargs) ts.time = d['bins'] ts.value = d['binned_values'] ms.series_list[idx] = ts elif method == 'interp': for idx,item in enumerate(self.series_list): ts = item.copy() ti, vi = tsutils.interp(ts.time, ts.value, step=step, start=start, stop=stop,**kwargs) ts.time = ti ts.value = vi ms.series_list[idx] = ts elif method == 'gkernel': for idx,item in enumerate(self.series_list): ts = item.copy() ti, vi = tsutils.gkernel(ts.time,ts.value,step=step, start=start, stop=stop, **kwargs) ts.time = ti ts.value = vi ms.series_list[idx] = ts else: raise NameError('Unknown methods; no action taken') return ms
[docs] def correlation(self, target=None, timespan=None, alpha=0.05, settings=None, common_time_kwargs=None, mute_pbar=False, seed=None): ''' Calculate the correlation between a MultipleSeries and a target Series If the target Series is not specified, then the 1st member of MultipleSeries will be the target Parameters ---------- target : pyleoclim.Series, optional A pyleoclim Series object. timespan : tuple The time interval over which to perform the calculation alpha : float The significance level (0.05 by default) settings : dict Parameters for the correlation function, including: nsim : int the number of simulations (default: 1000) method : str, {'ttest','isopersistent','isospectral' (default)} method for significance testing common_time_kwargs : dict Parameters for the method MultipleSeries.common_time() seed : float or int random seed for isopersistent and isospectral methods mute_pbar : bool If True, the progressbar will be muted. Default is False. Returns ------- res : dict Containing a list of the Pearson's correlation coefficient, associated significance and p-value. See also -------- pyleoclim.utils.correlation.corr_sig : Correlation function Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo from pyleoclim.utils.tsmodel import colored_noise nt = 100 t0 = np.arange(nt) v0 = colored_noise(alpha=1, t=t0) noise = np.random.normal(loc=0, scale=1, size=nt) ts0 = pyleo.Series(time=t0, value=v0) ts1 = pyleo.Series(time=t0, value=v0+noise) ts2 = pyleo.Series(time=t0, value=v0+2*noise) ts3 = pyleo.Series(time=t0, value=v0+1/2*noise) ts_list = [ts1, ts2, ts3] ms = pyleo.MultipleSeries(ts_list) ts_target = ts0 # set an arbitrary randome seed to fix the result corr_res = ms.correlation(ts_target, settings={'nsim': 20}, seed=2333) print(corr_res) # set an arbitrary randome seed to fix the result corr_res = ms.correlation(settings={'nsim': 20}, seed=2333) print(corr_res) ''' r_list = [] signif_list = [] p_list = [] if target is None: target = self.series_list[0] for idx, ts in tqdm(enumerate(self.series_list), total=len(self.series_list), disable=mute_pbar): corr_res = ts.correlation(target, timespan=timespan, alpha=alpha, settings=settings, common_time_kwargs=common_time_kwargs, seed=seed) r_list.append(corr_res['r']) signif_list.append(corr_res['signif']) p_list.append(corr_res['p']) r_lsit = np.array(r_list) p_lsit = np.array(p_list) corr_res = { 'r': r_list, 'p': p_list, 'signif': signif_list, 'alpha': alpha, } return corr_res
# def mssa(self, M, MC=0, f=0.5): # data = [] # for val in self.series_list: # data.append(val.value) # data = np.transpose(np.asarray(data)) # res = decomposition.mssa(data, M=M, MC=MC, f=f) # return res
[docs] def equal_lengths(self): ''' Test whether all series in object have equal length Parameters ---------- None Returns ------- flag : boolean lengths : list containing the lengths of the series in object ''' lengths = [] for ts in self.series_list: lengths.append(len(ts.value)) L = lengths[0] r = lengths[1:] flag = all (l==L for l in r) return flag, lengths
[docs] def pca(self,nMC=200,**pca_kwargs): ''' Principal Component Analysis Parameters ---------- nMC : int number of Monte Carlo simulations pca_kwargs : tuple Returns ------- res : dictionary containing: - eigval : eigenvalues (nrec,) - eig_ar1 : eigenvalues of the AR(1) ensemble (nrec, nMC) - pcs : PC series of all components (nrec, nt) - eofs : EOFs of all components (nrec, nrec) References: ---------- Deininger, M., McDermott, F., Mudelsee, M. et al. (2017): Coherency of late Holocene European speleothem δ18O records linked to North Atlantic Ocean circulation. Climate Dynamics, 49, 595–618. https://doi.org/10.1007/s00382-016-3360-8 See also -------- pyleoclim.utils.decomposition.mcpca: Monte Carlo PCA Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) tslist = data.to_LipdSeriesList() tslist = tslist[2:] # drop the first two series which only concerns age and depth ms = pyleo.MultipleSeries(tslist) # msc = ms.common_time() # res = msc.pca(nMC=20) ''' flag, lengths = self.equal_lengths() if flag==False: print('All Time Series should be of same length. Apply common_time() first') else: # if all series have equal length p = len(lengths) n = lengths[0] ys = np.empty((n,p)) for j in range(p): ys[:,j] = self.series_list[j].value res = decomposition.mcpca(ys, nMC, **pca_kwargs) return res
[docs] def bin(self, **kwargs): ''' Aligns the time axes of a MultipleSeries object, via binning. This is critical for workflows that need to assume a common time axis for the group of series under consideration. The common time axis is characterized by the following parameters: start : the latest start date of the bunch (maximin of the minima) stop : the earliest stop date of the bunch (minimum of the maxima) step : The representative spacing between consecutive values (mean of the median spacings) This is a special case of the common_time function. Parameters ---------- kwargs : dict Arguments for the binning function. See pyleoclim.utils.tsutils.bin Returns ------- ms : pyleoclim.MultipleSeries The MultipleSeries objects with all series aligned to the same time axis. See also -------- pyleoclim.core.ui.MultipleSeries.common_time: Base function on which this operates pyleoclim.utils.tsutils.bin: Underlying binning function pyleoclim.core.ui.Series.bin: Bin function for Series object Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) tslist = data.to_LipdSeriesList() tslist = tslist[2:] # drop the first two series which only concerns age and depth ms = pyleo.MultipleSeries(tslist) msbin = ms.bin() ''' ms = self.copy() ms = ms.common_time(method = 'bin', **kwargs) return ms
[docs] def gkernel(self, **kwargs): ''' Aligns the time axes of a MultipleSeries object, via Gaussian kernel. This is critical for workflows that need to assume a common time axis for the group of series under consideration. The common time axis is characterized by the following parameters: start : the latest start date of the bunch (maximin of the minima) stop : the earliest stop date of the bunch (minimum of the maxima) step : The representative spacing between consecutive values (mean of the median spacings) This is a special case of the common_time function. Parameters ---------- kwargs : dict Arguments for gkernel. See pyleoclim.utils.tsutils.gkernel for details. Returns ------- ms : pyleoclim.MultipleSeries The MultipleSeries objects with all series aligned to the same time axis. See also -------- pyleoclim.core.ui.MultipleSeries.common_time: Base function on which this operates pyleoclim.utils.tsutils.gkernel: Underlying kernel module Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) tslist = data.to_LipdSeriesList() tslist = tslist[2:] # drop the first two series which only concerns age and depth ms = pyleo.MultipleSeries(tslist) msk = ms.gkernel() ''' ms = self.copy() ms = ms.common_time(method = 'gkernel', **kwargs) return ms
[docs] def interp(self, **kwargs): ''' Aligns the time axes of a MultipleSeries object, via interpolation. This is critical for workflows that need to assume a common time axis for the group of series under consideration. The common time axis is characterized by the following parameters: start : the latest start date of the bunch (maximin of the minima) stop : the earliest stop date of the bunch (minimum of the maxima) step : The representative spacing between consecutive values (mean of the median spacings) This is a special case of the common_time function. Parameters ---------- kwargs: keyword arguments (dictionary) for the interpolation method Returns ------- ms : pyleoclim.MultipleSeries The MultipleSeries objects with all series aligned to the same time axis. See also -------- pyleoclim.core.ui.MultipleSeries.common_time: Base function on which this operates pyleoclim.utils.tsutils.interp: Underlying interpolation function pyleoclim.core.ui.Series.interp: Interpolation function for Series object Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) tslist = data.to_LipdSeriesList() tslist = tslist[2:] # drop the first two series which only concerns age and depth ms = pyleo.MultipleSeries(tslist) msinterp = ms.interp() ''' ms = self.copy() ms = ms.common_time(method='interp', **kwargs) return ms
[docs] def detrend(self,method='emd',**kwargs): '''Detrend timeseries Parameters ---------- method : str, optional The method for detrending. The default is 'emd'. Options include: * linear: the result of a linear least-squares fit to y is subtracted from y. * constant: only the mean of data is subtrated. * "savitzky-golay", y is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y. * "emd" (default): Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series **kwargs : dict Relevant arguments for each of the methods. Returns ------- ms : pyleoclim.MultipleSeries The detrended timeseries See also -------- pyleoclim.core.ui.Series.detrend : Detrending for a single series pyleoclim.utils.tsutils.detrend : Detrending function ''' ms=self.copy() for idx,item in enumerate(ms.series_list): s=item.copy() v_mod=tsutils.detrend(item.value,x=item.time,method=method,**kwargs) s.value=v_mod ms.series_list[idx]=s return ms
[docs] def spectral(self, method='lomb_scargle', settings=None, mute_pbar=False, freq_method='log', freq_kwargs=None, label=None, verbose=False): ''' Perform spectral analysis on the timeseries Parameters ---------- method : str {'wwz', 'mtm', 'lomb_scargle', 'welch', 'periodogram'} freq_method : str {'log','scale', 'nfft', 'lomb_scargle', 'welch'} freq_kwargs : dict Arguments for frequency vector settings : dict Arguments for the specific spectral method label : str Label for the PSD object verbose : bool If True, will print warning messages if there is any mute_pbar : {True, False} Mute the progress bar. Default is False. Returns ------- psd : pyleoclim.MultiplePSD A Multiple PSD object See also -------- pyleoclim.utils.spectral.mtm : Spectral analysis using the Multitaper approach pyleoclim.utils.spectral.lomb_scargle : Spectral analysis using the Lomb-Scargle method pyleoclim.utils.spectral.welch: Spectral analysis using the Welch segement approach pyleoclim.utils.spectral.periodogram: Spectral anaysis using the basic Fourier transform pyleoclim.utils.spectral.wwz_psd : Spectral analysis using the Wavelet Weighted Z transform pyleoclim.utils.wavelet.make_freq : Functions to create the frequency vector pyleoclim.utils.tsutils.detrend : Detrending function pyleoclim.core.ui.Series.spectral : Spectral analysis for a single timeseries pyleoclim.core.ui.PSD : PSD object pyleoclim.core.ui.MultiplePSD : Multiple PSD object ''' settings = {} if settings is None else settings.copy() psd_list = [] for s in tqdm(self.series_list, desc='Performing spectral analysis on individual series', position=0, leave=True, disable=mute_pbar): psd_tmp = s.spectral(method=method, settings=settings, freq_method=freq_method, freq_kwargs=freq_kwargs, label=label, verbose=verbose) psd_list.append(psd_tmp) psds = MultiplePSD(psd_list=psd_list) return psds
[docs] def wavelet(self, method='wwz', settings={}, freq_method='log', ntau=None, freq_kwargs=None, verbose=False, mute_pbar=False): '''Wavelet analysis Parameters ---------- method : {wwz, cwt} Whether to use the wwz method for unevenly spaced timeseries or traditional cwt (from pywavelets) settings : dict, optional Settings for the particular method. The default is {}. freq_method : str {'log', 'scale', 'nfft', 'lomb_scargle', 'welch'} freq_kwargs : dict Arguments for frequency vector ntau : int The length of the time shift points that determins the temporal resolution of the result. If None, it will be either the length of the input time axis, or at most 100. settings : dict Arguments for the specific spectral method verbose : bool If True, will print warning messages if there is any mute_pbar : bool, optional Whether to mute the progress bar. The default is False. Returns ------- scals : pyleoclim.MultipleScalograms See also -------- pyleoclim.utils.wavelet.wwz : wwz function pyleoclim.utils.wavelet.make_freq : Functions to create the frequency vector pyleoclim.utils.tsutils.detrend : Detrending function pyleoclim.core.ui.Series.wavelet : wavelet analysis on single object pyleoclim.core.ui.MultipleScalogram : Multiple Scalogram object ''' settings = {} if settings is None else settings.copy() scal_list = [] for s in tqdm(self.series_list, desc='Performing wavelet analysis on individual series', position=0, leave=True, disable=mute_pbar): scal_tmp = s.wavelet(method=method, settings=settings, freq_method=freq_method, freq_kwargs=freq_kwargs, verbose=verbose, ntau=ntau) scal_list.append(scal_tmp) scals = MultipleScalogram(scalogram_list=scal_list) return scals
[docs] def plot(self, figsize=[10, 4], marker=None, markersize=None, linestyle=None, linewidth=None, colors=None, cmap='tab10', norm=None, xlabel=None, ylabel=None, title=None, legend=True, plot_kwargs=None, lgd_kwargs=None, savefig_settings=None, ax=None, mute=False, invert_xaxis=False): '''Plot multiple timeseries on the same axis Parameters ---------- figsize : list, optional Size of the figure. The default is [10, 4]. marker : str, optional marker type. The default is None. markersize : float, optional marker size. The default is None. colors : a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values) Colors for plotting. If None, the plotting will cycle the 'tab10' colormap; if only one color is specified, then all curves will be plotted with that single color; if a list of colors are specified, then the plotting will cycle that color list. cmap : str The colormap to use when "colors" is None. norm : matplotlib.colors.Normalize like The nomorlization for the colormap. If None, a linear normalization will be used. linestyle : str, optional Line style. The default is None. linewidth : float, optional The width of the line. The default is None. xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is None. title : str, optional Title. The default is None. legend : bool, optional Wether the show the legend. The default is True. plot_kwargs : dict, optional Plot parameters. The default is None. lgd_kwargs : dict, optional Legend parameters. The default is None. savefig_settings : dictionary, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. ax : matplotlib.ax, optional The matplotlib axis onto which to return the figure. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax invert_xaxis : bool, optional if True, the x-axis of the plot will be inverted Returns ------- fig, ax ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() plot_kwargs = {} if plot_kwargs is None else plot_kwargs.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) if ylabel is None: consistent_ylabels = True time_label, value_label = self.series_list[0].make_labels() for s in self.series_list[1:]: time_label_tmp, value_label_tmp = s.make_labels() if value_label_tmp != value_label: consistent_ylabels = False if consistent_ylabels: ylabel = value_label else: ylabel = 'value' for idx, s in enumerate(self.series_list): if colors is None: cmap_obj = plt.get_cmap(cmap) if hasattr(cmap_obj, 'colors'): nc = len(cmap_obj.colors) else: nc = len(self.series_list) if norm is None: norm = mpl.colors.Normalize(vmin=0, vmax=nc-1) clr = cmap_obj(norm(idx%nc)) elif type(colors) is str: clr = colors elif type(colors) is list: nc = len(colors) clr = colors[idx%nc] else: raise TypeError('"colors" should be a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values)') ax = s.plot( figsize=figsize, marker=marker, markersize=markersize, color=clr, linestyle=linestyle, linewidth=linewidth, label=s.label, xlabel=xlabel, ylabel=ylabel, title=title, legend=legend, lgd_kwargs=lgd_kwargs, plot_kwargs=plot_kwargs, ax=ax, ) if invert_xaxis: ax.invert_xaxis() if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def stackplot(self, figsize=None, savefig_settings=None, xlim=None, fill_between_alpha=0.2, colors=None, cmap='tab10', norm=None, spine_lw=1.5, grid_lw=0.5, font_scale=0.8, label_x_loc=-0.15, v_shift_factor=3/4, linewidth=1.5): ''' Stack plot of multiple series Note that the plotting style is uniquely designed for this one and cannot be properly reset with `pyleoclim.set_style()`. Parameters ---------- figsize : list Size of the figure. colors : a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values) Colors for plotting. If None, the plotting will cycle the 'tab10' colormap; if only one color is specified, then all curves will be plotted with that single color; if a list of colors are specified, then the plotting will cycle that color list. cmap : str The colormap to use when "colors" is None. norm : matplotlib.colors.Normalize like The nomorlization for the colormap. If None, a linear normalization will be used. savefig_settings : dictionary the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. xlim : list The x-axis limit. fill_between_alpha : float The transparency for the fill_between shades. spine_lw : float The linewidth for the spines of the axes. grid_lw : float The linewidth for the gridlines. linewidth : float The linewidth for the curves. font_scale : float The scale for the font sizes. Default is 0.8. label_x_loc : float The x location for the label of each curve. v_shift_factor : float The factor for the vertical shift of each axis. The default value 3/4 means the top of the next axis will be located at 3/4 of the height of the previous one. Returns ------- fig, ax Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) tslist = data.to_LipdSeriesList() tslist = tslist[2:] # drop the first two series which only concerns age and depth ms = pyleo.MultipleSeries(tslist) @savefig mts_stackplot.png fig, ax = ms.stackplot() pyleo.closefig(fig) ''' plt.ioff() current_style = deepcopy(mpl.rcParams) plotting.set_style('journal', font_scale=font_scale) savefig_settings = {} if savefig_settings is None else savefig_settings.copy() n_ts = len(self.series_list) fig = plt.figure(figsize=figsize) if xlim is None: time_min = np.inf time_max = -np.inf for ts in self.series_list: if np.min(ts.time) <= time_min: time_min = np.min(ts.time) if np.max(ts.time) >= time_max: time_max = np.max(ts.time) xlim = [time_min, time_max] ax = {} left = 0 width = 1 height = 1/n_ts bottom = 1 for idx, ts in enumerate(self.series_list): if colors is None: cmap_obj = plt.get_cmap(cmap) if hasattr(cmap_obj, 'colors'): nc = len(cmap_obj.colors) else: nc = len(self.series_list) if norm is None: norm = mpl.colors.Normalize(vmin=0, vmax=nc-1) clr = cmap_obj(norm(idx%nc)) elif type(colors) is str: clr = colors elif type(colors) is list: nc = len(colors) clr = colors[idx%nc] else: raise TypeError('"colors" should be a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values)') bottom -= height*v_shift_factor ax[idx] = fig.add_axes([left, bottom, width, height]) ax[idx].plot(ts.time, ts.value, color=clr, lw=linewidth) ax[idx].patch.set_alpha(0) ax[idx].set_xlim(xlim) time_label, value_label = ts.make_labels() ax[idx].set_ylabel(value_label, weight='bold') mu = np.mean(ts.value) std = np.std(ts.value) ylim = [mu-4*std, mu+4*std] ax[idx].fill_between(ts.time, ts.value, y2=mu, alpha=fill_between_alpha, color=clr) trans = transforms.blended_transform_factory(ax[idx].transAxes, ax[idx].transData) if ts.label is not None: ax[idx].text(label_x_loc, mu, ts.label, horizontalalignment='right', transform=trans, color=clr, weight='bold') ax[idx].set_ylim(ylim) ax[idx].set_yticks(ylim) ax[idx].yaxis.set_major_formatter(FormatStrFormatter('%.1f')) ax[idx].grid(False) if idx % 2 == 0: ax[idx].spines['left'].set_visible(True) ax[idx].spines['left'].set_linewidth(spine_lw) ax[idx].spines['left'].set_color(clr) ax[idx].spines['right'].set_visible(False) ax[idx].yaxis.set_label_position('left') ax[idx].yaxis.tick_left() else: ax[idx].spines['left'].set_visible(False) ax[idx].spines['right'].set_visible(True) ax[idx].spines['right'].set_linewidth(spine_lw) ax[idx].spines['right'].set_color(clr) ax[idx].yaxis.set_label_position('right') ax[idx].yaxis.tick_right() ax[idx].yaxis.label.set_color(clr) ax[idx].tick_params(axis='y', colors=clr) ax[idx].spines['top'].set_visible(False) ax[idx].spines['bottom'].set_visible(False) ax[idx].tick_params(axis='x', which='both', length=0) ax[idx].set_xlabel('') ax[idx].set_xticklabels([]) xt = ax[idx].get_xticks()[1:-1] for x in xt: ax[idx].axvline(x=x, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) ax[idx].axhline(y=mu, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) bottom -= height*(1-v_shift_factor) ax[n_ts] = fig.add_axes([left, bottom, width, height]) ax[n_ts].set_xlabel(time_label) ax[n_ts].spines['left'].set_visible(False) ax[n_ts].spines['right'].set_visible(False) ax[n_ts].spines['bottom'].set_visible(True) ax[n_ts].spines['bottom'].set_linewidth(spine_lw) ax[n_ts].set_yticks([]) ax[n_ts].patch.set_alpha(0) ax[n_ts].set_xlim(xlim) ax[n_ts].grid(False) ax[n_ts].tick_params(axis='x', which='both', length=3.5) xt = ax[n_ts].get_xticks()[1:-1] for x in xt: ax[n_ts].axvline(x=x, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: plotting.showfig(fig) # reset the plotting style mpl.rcParams.update(current_style) return fig, ax
[docs]class SurrogateSeries(MultipleSeries): ''' Object containing surrogate timeseries, usually obtained through recursive modeling (e.g., AR1) Surrogate Series is a child of MultipleSeries. All methods available for MultipleSeries are available for surrogate series. ''' def __init__(self, series_list, surrogate_method=None, surrogate_args=None): self.series_list = series_list self.surrogate_method = surrogate_method self.surrogate_args = surrogate_args
[docs]class EnsembleSeries(MultipleSeries): ''' EnsembleSeries object The EnsembleSeries object is a child of the MultipleSeries object, that is, a special case of MultipleSeries, aiming for ensembles of similar series. Ensembles usually arise from age modeling or Bayesian calibrations. All members of an EnsembleSeries object are assumed to share identical labels and units. All methods available for MultipleSeries are available for EnsembleSeries. Some functions were modified for the special case of ensembles. ''' def __init__(self, series_list): self.series_list = series_list
[docs] def make_labels(self): ''' Initialization of labels Returns ------- time_header : str Label for the time axis value_header : str Label for the value axis ''' ts_list = self.series_list if ts_list[0].time_name is not None: time_name_str = ts_list[0].time_name else: time_name_str = 'time' if ts_list[0].value_name is not None: value_name_str = ts_list[0].value_name else: value_name_str = 'value' if ts_list[0].value_unit is not None: value_header = f'{value_name_str} [{ts_list[0].value_unit}]' else: value_header = f'{value_name_str}' if ts_list[0].time_unit is not None: time_header = f'{time_name_str} [{ts_list[0].time_unit}]' else: time_header = f'{time_name_str}' return time_header, value_header
[docs] def quantiles(self, qs=[0.05, 0.5, 0.95]): '''Calculate quantiles of an EnsembleSeries object Parameters ---------- qs : list, optional List of quantiles to consider for the calculation. The default is [0.05, 0.5, 0.95]. Returns ------- ens_qs : pyleoclim.EnsembleSeries ''' time = np.copy(self.series_list[0].time) vals = [] for ts in self.series_list: if not np.array_equal(ts.time, time): raise ValueError('Time axis not consistent across the ensemble!') vals.append(ts.value) vals = np.array(vals) ens_qs = mquantiles(vals, qs, axis=0) ts_list = [] for i, quant in enumerate(ens_qs): ts = Series(time=time, value=quant, label=f'{qs[i]*100:g}%') ts_list.append(ts) ens_qs = EnsembleSeries(series_list=ts_list) return ens_qs
[docs] def correlation(self, target=None, timespan=None, alpha=0.05, settings=None, fdr_kwargs=None, common_time_kwargs=None, mute_pbar=False, seed=None): ''' Calculate the correlation between an EnsembleSeries object to a target. If the target is not specified, then the 1st member of the ensemble will be the target Note that the FDR approach is applied by default to determine the significance of the p-values (more information in See Also below). Parameters ---------- target : pyleoclim.Series or pyleoclim.EnsembleSeries, optional A pyleoclim Series object or EnsembleSeries object. When the target is also an EnsembleSeries object, then the calculation of correlation is performed in a one-to-one sense, and the ourput list of correlation values and p-values will be the size of the series_list of the self object. That is, if the self object contains n Series, and the target contains n+m Series, then only the first n Series from the object will be used for the calculation; otherwise, if the target contains only n-m Series, then the first m Series in the target will be used twice in sequence. timespan : tuple The time interval over which to perform the calculation alpha : float The significance level (0.05 by default) settings : dict Parameters for the correlation function, including: nsim : int the number of simulations (default: 1000) method : str, {'ttest','isopersistent','isospectral' (default)} method for significance testing fdr_kwargs : dict Parameters for the FDR function common_time_kwargs : dict Parameters for the method MultipleSeries.common_time() mute_pbar : bool If True, the progressbar will be muted. Default is False. seed : float or int random seed for isopersistent and isospectral methods Returns ------- res : dict Containing a list of the Pearson's correlation coefficient, associated significance and p-value. See also -------- pyleoclim.utils.correlation.corr_sig : Correlation function pyleoclim.utils.correlation.fdr : FDR function Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo from pyleoclim.utils.tsmodel import colored_noise nt = 100 t0 = np.arange(nt) v0 = colored_noise(alpha=1, t=t0) noise = np.random.normal(loc=0, scale=1, size=nt) ts0 = pyleo.Series(time=t0, value=v0) ts1 = pyleo.Series(time=t0, value=v0+noise) ts2 = pyleo.Series(time=t0, value=v0+2*noise) ts3 = pyleo.Series(time=t0, value=v0+1/2*noise) ts_list1 = [ts0, ts1] ts_list2 = [ts2, ts3] ts_ens = pyleo.EnsembleSeries(ts_list1) ts_target = pyleo.EnsembleSeries(ts_list2) # set an arbitrary randome seed to fix the result corr_res = ts_ens.correlation(ts_target, seed=2333) print(corr_res) ''' if target is None: target = self.series_list[0] r_list = [] p_list = [] signif_list = [] for idx, ts1 in tqdm(enumerate(self.series_list), total=len(self.series_list), disable=mute_pbar): if hasattr(target, 'series_list'): nEns = np.size(target.series_list) if idx < nEns: value2 = target.series_list[idx].value time2 = target.series_list[idx].time else: value2 = target.series_list[idx-nEns].value time2 = target.series_list[idx-nEns].time else: value2 = target.value time2 = target.time ts2 = Series(time=time2, value=value2, verbose=idx==0) corr_res = ts1.correlation(ts2, timespan=timespan, settings=settings, common_time_kwargs=common_time_kwargs, seed=seed) r_list.append(corr_res['r']) signif_list.append(corr_res['signif']) p_list.append(corr_res['p']) r_lsit = np.array(r_list) p_lsit = np.array(p_list) signif_fdr_list = [] fdr_kwargs = {} if fdr_kwargs is None else fdr_kwargs.copy() args = {} args.update(fdr_kwargs) for i in range(np.size(signif_list)): signif_fdr_list.append(False) fdr_res = corrutils.fdr(p_list, **fdr_kwargs) if fdr_res is not None: for i in fdr_res: signif_fdr_list[i] = True corr_ens = CorrEns(r_list, p_list, signif_list, signif_fdr_list, alpha) return corr_ens
[docs] def plot_traces(self, figsize=[10, 4], xlabel=None, ylabel=None, title=None, num_traces=10, seed=None, xlim=None, ylim=None, savefig_settings=None, ax=None, plot_legend=True, color=sns.xkcd_rgb['pale red'], lw=0.5, alpha=0.3, lgd_kwargs=None, mute=False): '''Plot EnsembleSeries as a subset of traces. Parameters ---------- figsize : list, optional The figure size. The default is [10, 4]. xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is None. title : str, optional Plot title. The default is None. xlim : list, optional x-axis limits. The default is None. ylim : list, optional y-axis limits. The default is None. color : str, optional Color of the traces. The default is sns.xkcd_rgb['pale red']. alpha : float, optional Transparency of the lines representing the multiple members. The default is 0.3. lw : float, optional Width of the lines representing the multiple members. The default is 0.5. num_traces : int, optional Number of traces to plot. The default is 10. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. ax : matplotlib.ax, optional Matplotlib axis on which to return the plot. The default is None. plot_legend : bool, optional Whether to plot the legend. The default is True. lgd_kwargs : dict, optional Parameters for the legend. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. seed : int, optional Set the seed for the random number generator. Useful for reproducibility. The default is None. Returns ------- fig, ax Examples -------- .. ipython:: python :okwarning: nn = 30 # number of noise realizations nt = 500 series_list = [] signal = pyleo.gen_ts(model='colored_noise',nt=nt,alpha=1.0).standardize() noise = np.random.randn(nt,nn) for idx in range(nn): # noise ts = pyleo.Series(time=signal.time, value=signal.value+noise[:,idx]) series_list.append(ts) ts_ens = pyleo.EnsembleSeries(series_list) fig, ax = ts_ens.plot_traces(alpha=0.2,num_traces=8) ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() # generate default axis labels time_label, value_label = self.make_labels() if xlabel is None: xlabel = time_label if ylabel is None: ylabel = value_label if ax is None: fig, ax = plt.subplots(figsize=figsize) if num_traces > 0: if seed is not None: np.random.seed(seed) nts = np.size(self.series_list) random_draw_idx = np.random.choice(nts, num_traces) for idx in random_draw_idx: self.series_list[idx].plot(xlabel=xlabel, ylabel=ylabel, zorder=99, linewidth=lw, xlim=xlim, ylim=ylim, ax=ax, color=color, alpha=alpha, ) ax.plot(np.nan, np.nan, color=color, label=f'example members (n={num_traces})') if title is not None: ax.set_title(title) if plot_legend: lgd_args = {'frameon': False} lgd_args.update(lgd_kwargs) ax.legend(**lgd_args) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def plot_envelope(self, figsize=[10, 4], qs=[0.025, 0.25, 0.5, 0.75, 0.975], xlabel=None, ylabel=None, title=None, xlim=None, ylim=None, savefig_settings=None, ax=None, plot_legend=True, curve_clr=sns.xkcd_rgb['pale red'], curve_lw=2, shade_clr=sns.xkcd_rgb['pale red'], shade_alpha=0.2, inner_shade_label='IQR', outer_shade_label='95% CI', lgd_kwargs=None, mute=False): ''' Plot EnsembleSeries as an envelope. Parameters ---------- figsize : list, optional The figure size. The default is [10, 4]. qs : list, optional The significance levels to consider. The default is [0.025, 0.25, 0.5, 0.75, 0.975] (median, interquartile range, and central 95% region) xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is None. title : str, optional Plot title. The default is None. xlim : list, optional x-axis limits. The default is None. ylim : list, optional y-axis limits. The default is None. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. ax : matplotlib.ax, optional Matplotlib axis on which to return the plot. The default is None. plot_legend : bool, optional Wether to plot the legend. The default is True. curve_clr : str, optional Color of the main line (median). The default is sns.xkcd_rgb['pale red']. curve_lw : str, optional Width of the main line (median). The default is 2. shade_clr : str, optional Color of the shaded envelope. The default is sns.xkcd_rgb['pale red']. shade_alpha : float, optional Transparency on the envelope. The default is 0.2. inner_shade_label : str, optional Label for the envelope. The default is 'IQR'. outer_shade_label : str, optional Label for the envelope. The default is '95\% CI'. lgd_kwargs : dict, optional Parameters for the legend. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. Returns ------- fig, ax Example -------- .. ipython:: python :okwarning: nn = 30 # number of noise realizations nt = 500 series_list = [] signal = pyleo.gen_ts(model='colored_noise',nt=nt,alpha=1.0).standardize() noise = np.random.randn(nt,nn) for idx in range(nn): # noise ts = pyleo.Series(time=signal.time, value=signal.value+noise[:,idx]) series_list.append(ts) ts_ens = pyleo.EnsembleSeries(series_list) fig, ax = ts_ens.plot_envelope(curve_lw=1.5) ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() # generate default axis labels time_label, value_label = self.make_labels() if xlabel is None: xlabel = time_label if ylabel is None: ylabel = value_label if ax is None: fig, ax = plt.subplots(figsize=figsize) ts_qs = self.quantiles(qs=qs) if inner_shade_label is None: inner_shade_label = f'{ts_qs.series_list[1].label}-{ts_qs.series_list[-2].label}' if outer_shade_label is None: outer_shade_label = f'{ts_qs.series_list[0].label}-{ts_qs.series_list[-1].label}' time = ts_qs.series_list[0].time # plot outer envelope ax.fill_between( time, ts_qs.series_list[0].value, ts_qs.series_list[4].value, color=shade_clr, alpha=shade_alpha, edgecolor=shade_clr, label=outer_shade_label, ) # plot inner envelope on top ax.fill_between( time, ts_qs.series_list[1].value, ts_qs.series_list[3].value, color=shade_clr, alpha=2*shade_alpha, edgecolor=shade_clr, label=inner_shade_label, ) # plot the median ts_qs.series_list[2].plot(xlabel=xlabel, ylabel=ylabel, linewidth=curve_lw, color=curve_clr, xlim=xlim, ylim=ylim, ax=ax, zorder=100, label = 'median' ) if title is not None: ax.set_title(title) if plot_legend: lgd_args = {'frameon': False} lgd_args.update(lgd_kwargs) ax.legend(**lgd_args) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def stackplot(self, figsize=[5, 15], savefig_settings=None, xlim=None, fill_between_alpha=0.2, colors=None, cmap='tab10', norm=None, spine_lw=1.5, grid_lw=0.5, font_scale=0.8, label_x_loc=-0.15, v_shift_factor=3/4, linewidth=1.5): ''' Stack plot of multiple series Note that the plotting style is uniquely designed for this one and cannot be properly reset with `pyleoclim.set_style()`. Parameters ---------- figsize : list Size of the figure. colors : a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values) Colors for plotting. If None, the plotting will cycle the 'tab10' colormap; if only one color is specified, then all curves will be plotted with that single color; if a list of colors are specified, then the plotting will cycle that color list. cmap : str The colormap to use when "colors" is None. norm : matplotlib.colors.Normalize like The nomorlization for the colormap. If None, a linear normalization will be used. savefig_settings : dictionary the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. xlim : list The x-axis limit. fill_between_alpha : float The transparency for the fill_between shades. spine_lw : float The linewidth for the spines of the axes. grid_lw : float The linewidth for the gridlines. linewidth : float The linewidth for the curves. font_scale : float The scale for the font sizes. Default is 0.8. label_x_loc : float The x location for the label of each curve. v_shift_factor : float The factor for the vertical shift of each axis. The default value 3/4 means the top of the next axis will be located at 3/4 of the height of the previous one. Returns ------- fig, ax ''' plt.ioff() current_style = deepcopy(mpl.rcParams) plotting.set_style('journal', font_scale=font_scale) savefig_settings = {} if savefig_settings is None else savefig_settings.copy() n_ts = len(self.series_list) fig = plt.figure(figsize=figsize) if xlim is None: time_min = np.inf time_max = -np.inf for ts in self.series_list: if np.min(ts.time) <= time_min: time_min = np.min(ts.time) if np.max(ts.time) >= time_max: time_max = np.max(ts.time) xlim = [time_min, time_max] ax = {} left = 0 width = 1 height = 1/n_ts bottom = 1 for idx, ts in enumerate(self.series_list): if colors is None: cmap_obj = plt.get_cmap(cmap) if hasattr(cmap_obj, 'colors'): nc = len(cmap_obj.colors) else: nc = len(self.series_list) if norm is None: norm = mpl.colors.Normalize(vmin=0, vmax=nc-1) clr = cmap_obj(norm(idx%nc)) elif type(colors) is str: clr = colors elif type(colors) is list: nc = len(colors) clr = colors[idx%nc] else: raise TypeError('"colors" should be a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values)') bottom -= height*v_shift_factor ax[idx] = fig.add_axes([left, bottom, width, height]) ax[idx].plot(ts.time, ts.value, color=clr, lw=linewidth) ax[idx].patch.set_alpha(0) ax[idx].set_xlim(xlim) time_label, value_label = ts.make_labels() ax[idx].set_ylabel(value_label, weight='bold') mu = np.mean(ts.value) std = np.std(ts.value) ylim = [mu-4*std, mu+4*std] ax[idx].fill_between(ts.time, ts.value, y2=mu, alpha=fill_between_alpha, color=clr) trans = transforms.blended_transform_factory(ax[idx].transAxes, ax[idx].transData) if ts.label is not None: ax[idx].text(label_x_loc, mu, ts.label, horizontalalignment='right', transform=trans, color=clr, weight='bold') ax[idx].set_ylim(ylim) ax[idx].set_yticks(ylim) ax[idx].yaxis.set_major_formatter(FormatStrFormatter('%.1f')) ax[idx].grid(False) if idx % 2 == 0: ax[idx].spines['left'].set_visible(True) ax[idx].spines['left'].set_linewidth(spine_lw) ax[idx].spines['left'].set_color(clr) ax[idx].spines['right'].set_visible(False) ax[idx].yaxis.set_label_position('left') ax[idx].yaxis.tick_left() else: ax[idx].spines['left'].set_visible(False) ax[idx].spines['right'].set_visible(True) ax[idx].spines['right'].set_linewidth(spine_lw) ax[idx].spines['right'].set_color(clr) ax[idx].yaxis.set_label_position('right') ax[idx].yaxis.tick_right() ax[idx].yaxis.label.set_color(clr) ax[idx].tick_params(axis='y', colors=clr) ax[idx].spines['top'].set_visible(False) ax[idx].spines['bottom'].set_visible(False) ax[idx].tick_params(axis='x', which='both', length=0) ax[idx].set_xlabel('') ax[idx].set_xticklabels([]) xt = ax[idx].get_xticks()[1:-1] for x in xt: ax[idx].axvline(x=x, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) ax[idx].axhline(y=mu, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) bottom -= height*(1-v_shift_factor) ax[n_ts] = fig.add_axes([left, bottom, width, height]) ax[n_ts].set_xlabel(time_label) ax[n_ts].spines['left'].set_visible(False) ax[n_ts].spines['right'].set_visible(False) ax[n_ts].spines['bottom'].set_visible(True) ax[n_ts].spines['bottom'].set_linewidth(spine_lw) ax[n_ts].set_yticks([]) ax[n_ts].patch.set_alpha(0) ax[n_ts].set_xlim(xlim) ax[n_ts].grid(False) ax[n_ts].tick_params(axis='x', which='both', length=3.5) xt = ax[n_ts].get_xticks()[1:-1] for x in xt: ax[n_ts].axvline(x=x, color='lightgray', linewidth=grid_lw, ls='-', zorder=-1) if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: plotting.showfig(fig) # reset the plotting style mpl.rcParams.update(current_style) return fig, ax
[docs] def distplot(self, figsize=[10, 4], title=None, savefig_settings=None, ax=None, ylabel='KDE', vertical=False, edgecolor='w',mute=False, **plot_kwargs): """ Plots the distribution of the timeseries across ensembles Parameters ---------- figsize : list, optional The size of the figure. The default is [10, 4]. title : str, optional Title for the figure. The default is None. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"}. The default is None. ax : matplotlib.axis, optional A matplotlib axis. The default is None. ylabel : str, optional Label for the count axis. The default is 'KDE'. vertical : {True,False}, optional Whether to flip the plot vertically. The default is False. edgecolor : matplotlib.color, optional The color of the edges of the bar. The default is 'w'. mute : {True,False}, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. **plot_kwargs : dict Plotting arguments for seaborn histplot: https://seaborn.pydata.org/generated/seaborn.histplot.html. See also -------- pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim """ # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) #make the data into a dataframe so we can flip the figure time_label, value_label = self.make_labels() #append all the values together for the plot for item in self.series_list: try: val=np.append(val,item.value) except: val=item.value if vertical == True: data=pd.DataFrame({'value':val}) ax = sns.histplot(data=data, y="value", ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs) ax.set_ylabel(value_label) ax.set_xlabel(ylabel) else: ax = sns.histplot(val, ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs) ax.set_xlabel(value_label) ax.set_ylabel(ylabel) if title is not None: ax.set_title(title) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs]class MultiplePSD: ''' Object for multiple PSD. Used for significance level ''' def __init__(self, psd_list): self.psd_list = psd_list
[docs] def copy(self): '''Copy object ''' return deepcopy(self)
[docs] def quantiles(self, qs=[0.05, 0.5, 0.95], lw=[0.5, 1.5, 0.5]): '''Calculate quantiles Parameters ---------- qs : list, optional List of quantiles to consider for the calculation. The default is [0.05, 0.5, 0.95]. lw : list, optional Linewidth to use for plotting each level. Should be the same length as qs. The default is [0.5, 1.5, 0.5]. Raises ------ ValueError Frequency axis not consistent across the PSD list! Returns ------- psds : pyleoclim.MultiplePSD ''' if self.psd_list[0].timeseries is not None: period_unit = self.psd_list[0].timeseries.time_unit freq = np.copy(self.psd_list[0].frequency) amps = [] for psd in self.psd_list: if not np.array_equal(psd.frequency, freq): raise ValueError('Frequency axis not consistent across the PSD list!') amps.append(psd.amplitude) amps = np.array(amps) amp_qs = mquantiles(amps, qs, axis=0) psd_list = [] for i, amp in enumerate(amp_qs): psd_tmp = PSD(frequency=freq, amplitude=amp, label=f'{qs[i]*100:g}%', plot_kwargs={'color': 'gray', 'linewidth': lw[i]}, period_unit=period_unit) psd_list.append(psd_tmp) psds = MultiplePSD(psd_list=psd_list) return psds
[docs] def beta_est(self, fmin=None, fmax=None, logf_binning_step='max', verbose=False): ''' Estimate the scaling factor beta of the each PSD from the psd_list in a log-log space Parameters ---------- fmin : float the minimum frequency edge for beta estimation; the default is the minimum of the frequency vector of the PSD obj fmax : float the maximum frequency edge for beta estimation; the default is the maximum of the frequency vector of the PSD obj logf_binning_step : str, {'max', 'first'} if 'max', then the maximum spacing of log(f) will be used as the binning step if 'first', then the 1st spacing of log(f) will be used as the binning step verbose : bool If True, will print warning messages if there is any Returns ------- res_dict : dictionary - beta: list of the scaling factors - std_err: list of one standard deviation errors of the scaling factor - f_binned: list of the binned frequency series, used as X for linear regression - psd_binned: list of the binned PSD series, used as Y for linear regression - Y_reg: list of the predicted Y from linear regression, used with f_binned for the slope curve plotting See also -------- pyleoclim.core.ui.PSD.beta_est : beta estimation for on a single PSD object ''' res_dict = {} res_dict['beta'] = [] res_dict['std_err'] = [] res_dict['f_binned'] = [] res_dict['psd_binned'] = [] res_dict['Y_reg'] = [] for psd_obj in self.psd_list: res = psd_obj.beta_est(fmin=fmin, fmax=fmax, logf_binning_step=logf_binning_step, verbose=verbose) for k in res_dict.keys(): res_dict[k].append(res[k]) return res_dict
[docs] def plot(self, figsize=[10, 4], in_loglog=True, in_period=True, xlabel=None, ylabel='Amplitude', title=None, xlim=None, ylim=None, savefig_settings=None, ax=None, xticks=None, yticks=None, legend=True, colors=None, cmap=None, norm=None, plot_kwargs=None, lgd_kwargs=None, mute=False): '''Plot multiple PSD on the same plot Parameters ---------- figsize : list, optional Figure size. The default is [10, 4]. in_loglog : bool, optional Whether to plot in loglog. The default is True. in_period : bool, optional Plots against periods instead of frequencies. The default is True. xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is 'Amplitude'. title : str, optional Title for the figure. The default is None. xlim : list, optional Limits for the x-axis. The default is None. ylim : list, optional limits for the y-axis. The default is None. colors : a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values) Colors for plotting. If None, the plotting will cycle the 'tab10' colormap; if only one color is specified, then all curves will be plotted with that single color; if a list of colors are specified, then the plotting will cycle that color list. cmap : str The colormap to use when "colors" is None. norm : matplotlib.colors.Normalize like The nomorlization for the colormap. If None, a linear normalization will be used. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} ax : matplotlib axis, optional The matplotlib axis object on which to retrun the figure. The default is None. xticks : list, optional x-ticks label. The default is None. yticks : list, optional y-ticks label. The default is None. legend : bool, optional Whether to plot the legend. The default is True. plot_kwargs : dictionary, optional Parameters for plot function. The default is None. lgd_kwargs : dictionary, optional Parameters for legend. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax The default is False. Returns ------- fig, ax ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() plot_kwargs = {} if plot_kwargs is None else plot_kwargs.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) for idx, psd in enumerate(self.psd_list): tmp_plot_kwargs = {} if psd.plot_kwargs is not None: tmp_plot_kwargs.update(psd.plot_kwargs) tmp_plot_kwargs.update(plot_kwargs) # get color for each psd curve use_clr = False if 'color' not in tmp_plot_kwargs and 'c' not in 'tmp_plot_kwargs': use_clr = True if 'color' in tmp_plot_kwargs and tmp_plot_kwargs['color'] is None: use_clr = True if 'c' in tmp_plot_kwargs and tmp_plot_kwargs['c'] is None: use_clr = True if colors is not None or cmap is not None: use_clr = True if use_clr: # use the color based on the argument 'colors' or 'cmap' if colors is None: cmap = 'tab10' if cmap is None else cmap cmap_obj = plt.get_cmap(cmap) if hasattr(cmap_obj, 'colors'): nc = len(cmap_obj.colors) else: nc = len(self.psd_list) if norm is None: norm = mpl.colors.Normalize(vmin=0, vmax=nc-1) clr = cmap_obj(norm(idx%nc)) elif type(colors) is str: clr = colors elif type(colors) is list: nc = len(colors) clr = colors[idx%nc] else: raise TypeError('"colors" should be a list of, or one, Python supported color code (a string of hex code or a tuple of rgba values)') tmp_plot_kwargs.update({'color': clr}) ax = psd.plot( figsize=figsize, in_loglog=in_loglog, in_period=in_period, xlabel=xlabel, ylabel=ylabel, title=title, xlim=xlim, ylim=ylim, savefig_settings=savefig_settings, ax=ax, xticks=xticks, yticks=yticks, legend=legend, plot_kwargs=tmp_plot_kwargs, lgd_kwargs=lgd_kwargs, ) if title is not None: ax.set_title(title) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs] def plot_envelope(self, figsize=[10, 4], qs=[0.025, 0.5, 0.975], in_loglog=True, in_period=True, xlabel=None, ylabel='Amplitude', title=None, xlim=None, ylim=None, savefig_settings=None, ax=None, xticks=None, yticks=None, plot_legend=True, curve_clr=sns.xkcd_rgb['pale red'], curve_lw=3, shade_clr=sns.xkcd_rgb['pale red'], shade_alpha=0.3, shade_label=None, lgd_kwargs=None, mute=False, members_plot_num=10, members_alpha=0.3, members_lw=1, seed=None): '''Plot mutiple PSD as an envelope. Parameters ---------- figsize : list, optional The figure size. The default is [10, 4]. qs : list, optional The significance levels to consider. The default is [0.025, 0.5, 0.975]. in_loglog : bool, optional Plot in log space. The default is True. in_period : bool, optional Whether to plot periodicity instead of frequency. The default is True. xlabel : str, optional x-axis label. The default is None. ylabel : str, optional y-axis label. The default is 'Amplitude'. title : str, optional Plot title. The default is None. xlim : list, optional x-axis limits. The default is None. ylim : list, optional y-axis limits. The default is None. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} The default is None. ax : matplotlib.ax, optional Matplotlib axis on which to return the plot. The default is None. xticks : list, optional xticks label. The default is None. yticks : list, optional yticks label. The default is None. plot_legend : bool, optional Wether to plot the legend. The default is True. curve_clr : str, optional Color of the main PSD. The default is sns.xkcd_rgb['pale red']. curve_lw : str, optional Width of the main PSD line. The default is 3. shade_clr : str, optional Color of the shaded envelope. The default is sns.xkcd_rgb['pale red']. shade_alpha : float, optional Transparency on the envelope. The default is 0.3. shade_label : str, optional Label for the envelope. The default is None. lgd_kwargs : dict, optional Parameters for the legend. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. members_plot_num : int, optional Number of individual members to plot. The default is 10. members_alpha : float, optional Transparency of the lines representing the multiple members. The default is 0.3. members_lw : float, optional With of the lines representing the multiple members. The default is 1. seed : int, optional Set the seed for random number generator. Useful for reproducibility. The default is None. Returns ------- fig, ax ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() lgd_kwargs = {} if lgd_kwargs is None else lgd_kwargs.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) if members_plot_num > 0: if seed is not None: np.random.seed(seed) npsd = np.size(self.psd_list) random_draw_idx = np.random.choice(npsd, members_plot_num) for idx in random_draw_idx: self.psd_list[idx].plot( in_loglog=in_loglog, in_period=in_period, xlabel=xlabel, ylabel=ylabel, xlim=xlim, ylim=ylim, xticks=xticks, yticks=yticks, ax=ax, color='gray', alpha=members_alpha, zorder=99, linewidth=members_lw, ) ax.plot(np.nan, np.nan, color='gray', label=f'example members (n={members_plot_num})') psd_qs = self.quantiles(qs=qs) psd_qs.psd_list[1].plot( in_loglog=in_loglog, in_period=in_period, xlabel=xlabel, ylabel=ylabel, linewidth=curve_lw, xlim=xlim, ylim=ylim, xticks=xticks, yticks=yticks, ax=ax, color=curve_clr, zorder=100 ) if in_period: x_axis = 1/psd_qs.psd_list[0].frequency else: x_axis = psd_qs.psd_list[0].frequency if shade_label is None: shade_label = f'{psd_qs.psd_list[0].label}-{psd_qs.psd_list[-1].label}' ax.fill_between( x_axis, psd_qs.psd_list[0].amplitude, psd_qs.psd_list[-1].amplitude, color=shade_clr, alpha=shade_alpha, edgecolor=shade_clr, label=shade_label, ) if title is not None: ax.set_title(title) if plot_legend: lgd_args = {'frameon': False} lgd_args.update(lgd_kwargs) ax.legend(**lgd_args) if 'fig' in locals(): if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax else: return ax
[docs]class MultipleScalogram: ''' Multiple Scalogram objects ''' def __init__(self, scalogram_list): self.scalogram_list = scalogram_list
[docs] def copy(self): ''' Copy the object ''' return deepcopy(self)
[docs] def quantiles(self, qs=[0.05, 0.5, 0.95]): '''Calculate quantiles Parameters ---------- qs : list, optional List of quantiles to consider for the calculation. The default is [0.05, 0.5, 0.95]. Raises ------ ValueError Frequency axis not consistent across the PSD list! Value Error Time axis not consistent across the scalogram list! Returns ------- scals : pyleoclim.MultipleScalogram ''' freq = np.copy(self.scalogram_list[0].frequency) time = np.copy(self.scalogram_list[0].time) coi = np.copy(self.scalogram_list[0].coi) amps = [] for scal in self.scalogram_list: if not np.array_equal(scal.frequency, freq): raise ValueError('Frequency axis not consistent across the scalogram list!') if not np.array_equal(scal.time, time): raise ValueError('Time axis not consistent across the scalogram list!') amps.append(scal.amplitude) amps = np.array(amps) ne, nf, nt = np.shape(amps) amp_qs = np.ndarray(shape=(np.size(qs), nf, nt)) for i in range(nf): for j in range(nt): amp_qs[:,i,j] = mquantiles(amps[:,i,j], qs) scal_list = [] for i, amp in enumerate(amp_qs): scal_tmp = Scalogram(frequency=freq, time=time, amplitude=amp, coi=coi, label=f'{qs[i]*100:g}%') scal_list.append(scal_tmp) scals = MultipleScalogram(scalogram_list=scal_list) return scals
[docs]class CorrEns: ''' Correlation Ensemble Parameters ---------- r: list the list of correlation coefficients p: list the list of p-values p_fmt_td: float the threshold for p-value formating (0.01 by default, i.e., if p<0.01, will print "< 0.01" instead of "0") p_fmt_style: str the style for p-value formating (exponential notation by default) signif: list the list of significance without FDR signif_fdr: list the list of significance with FDR signif_fdr: list the list of significance with FDR alpha : float The significance level (0.05 by default) See also -------- pyleoclim.utils.correlation.corr_sig : Correlation function pyleoclim.utils.correlation.fdr : FDR function ''' def __init__(self, r, p, signif, signif_fdr, alpha, p_fmt_td=0.01, p_fmt_style='exp'): self.r = r self.p = p self.p_fmt_td = p_fmt_td self.p_fmt_style = p_fmt_style self.signif = signif self.signif_fdr = signif_fdr self.alpha = alpha def __str__(self): ''' Prints out the correlation results ''' pi_list = [] for pi in self.p: pi_list.append(pval_format(pi, threshold=self.p_fmt_td, style=self.p_fmt_style)) table = { 'correlation': self.r, 'p-value': pi_list, f'signif. w/o FDR (α: {self.alpha})': self.signif, f'signif. w/ FDR (α: {self.alpha})': self.signif_fdr, } msg = print(tabulate(table, headers='keys')) return f'Ensemble size: {len(self.r)}'
[docs] def plot(self, figsize=[4, 4], title=None, ax=None, savefig_settings=None, hist_kwargs=None, title_kwargs=None, xlim=None, clr_insignif=sns.xkcd_rgb['grey'], clr_signif=sns.xkcd_rgb['teal'], clr_signif_fdr=sns.xkcd_rgb['pale orange'], clr_percentile=sns.xkcd_rgb['salmon'], rwidth=0.8, bins=None, vrange=None, mute=False): ''' Plot the correlation ensembles Parameters ---------- figsize : list, optional The figure size. The default is [4, 4]. title : str, optional Plot title. The default is None. savefig_settings : dict the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"} hist_kwargs : dict the keyword arguments for ax.hist() title_kwargs : dict the keyword arguments for ax.set_title() ax : matplotlib.axis, optional the axis object from matplotlib See [matplotlib.axes](https://matplotlib.org/api/axes_api.html) for details. mute : {True,False} if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax xlim : list, optional x-axis limits. The default is None. See Also -------- matplotlib.pyplot.hist: https://matplotlib.org/3.3.3/api/_as_gen/matplotlib.pyplot.hist.html ''' # Turn the interactive mode off. plt.ioff() savefig_settings = {} if savefig_settings is None else savefig_settings.copy() hist_kwargs = {} if hist_kwargs is None else hist_kwargs.copy() if ax is None: fig, ax = plt.subplots(figsize=figsize) if vrange is None: vrange = [np.min(self.r), np.max(self.r)] clr_list = [clr_insignif, clr_signif, clr_signif_fdr] args = {'rwidth': rwidth, 'bins': bins, 'range': vrange, 'color': clr_list} args.update(hist_kwargs) # insignif_args.update(hist_kwargs) r_insignif = np.array(self.r)[~np.array(self.signif)] r_signif = np.array(self.r)[self.signif] r_signif_fdr = np.array(self.r)[self.signif_fdr] r_stack = [r_insignif, r_signif, r_signif_fdr] ax.hist(r_stack, stacked=True, **args) ax.legend([f'p ≥ {self.alpha}', f'p < {self.alpha} (w/o FDR)', f'p < {self.alpha} (w/ FDR)'], loc='upper left', bbox_to_anchor=(1.1, 1), ncol=1) frac_signif = np.size(r_signif) / np.size(self.r) frac_signif_fdr = np.size(r_signif_fdr) / np.size(self.r) ax.text(x=1.1, y=0.5, s=f'Fraction significant: {frac_signif*100:.1f}%', transform=ax.transAxes, fontsize=10, color=clr_signif) ax.text(x=1.1, y=0.4, s=f'Fraction significant: {frac_signif_fdr*100:.1f}%', transform=ax.transAxes, fontsize=10, color=clr_signif_fdr) r_pcts = np.percentile(self.r, [2.5, 25, 50, 75, 97.5]) trans = transforms.blended_transform_factory(ax.transData, ax.transAxes) for r_pct, pt, ls in zip(r_pcts, np.array([2.5, 25, 50, 75, 97.5])/100, [':', '--', '-', '--', ':']): ax.axvline(x=r_pct, linestyle=ls, color=clr_percentile) ax.text(x=r_pct, y=1.02, s=pt, color=clr_percentile, transform=trans, ha='center', fontsize=10) ax.set_xlabel(r'$r$') ax.set_ylabel('Count') ax.yaxis.set_major_locator(MaxNLocator(integer=True)) if xlim is not None: ax.set_xlim(xlim) if title is not None: title_kwargs = {} if title_kwargs is None else title_kwargs.copy() t_args = {'y': 1.1, 'weight': 'bold'} t_args.update(title_kwargs) ax.set_title(title, **t_args) if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax
[docs]class Lipd: '''Create a Lipd object from Lipd Files Parameters ---------- usr_path : str path to the Lipd file(s). Can be URL (LiPD utilities only support loading one file at a time from a URL) If it's a URL, it must start with "http", "https", or "ftp". lidp_dict : dict LiPD files already loaded into Python through the LiPD utilities validate : bool Validate the LiPD files upon loading. Note that for a large library this can take up to half an hour. remove : bool If validate is True and remove is True, ignores non-valid Lipd files. Note that loading unvalidated Lipd files may result in errors for some functionalities but not all. TODO ---- Support querying the LinkedEarth platform Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url='http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' d=pyleo.Lipd(usr_path=url) ''' def __init__(self, usr_path=None, lipd_dict=None, validate=False, remove=False): self.plot_default = {'ice-other': ['#FFD600','h'], 'ice/rock': ['#FFD600', 'h'], 'coral': ['#FF8B00','o'], 'documents':['k','p'], 'glacierice':['#86CDFA', 'd'], 'hybrid': ['#00BEFF','*'], 'lakesediment': ['#4169E0','s'], 'marinesediment': ['#8A4513', 's'], 'sclerosponge' : ['r','o'], 'speleothem' : ['#FF1492','d'], 'wood' : ['#32CC32','^'], 'molluskshells' : ['#FFD600','h'], 'peat' : ['#2F4F4F','*'], 'midden' : ['#824E2B','o'], 'other':['k','o']} if validate==False and remove==True: print('Removal of unvalidated LiPD files require validation') validate=True #prepare the dictionaries for all possible scenarios if usr_path!=None: # since readLipd() takes only absolute path and it will change the current working directory (CWD) without turning back, # we need to record CWD manually and turn back after the data loading is finished cwd = os.getcwd() if usr_path[:4] == 'http' or usr_path[:3] == 'ftp': # URL D_path = lpd.readLipd(usr_path) else: # local path abs_path = os.path.abspath(usr_path) D_path = lpd.readLipd(abs_path) os.chdir(cwd) #make sure that it's more than one if 'archiveType' in D_path.keys(): D_path={D_path['dataSetName']:D_path} if validate==True: cwd = os.getcwd() res=lpd.validate(D_path,detailed=False) os.chdir(cwd) if remove == True: for item in res: if item['status'] == 'FAIL': c=item['feedback']['errMsgs'] check = [] for i in c: if i.startswith('Mismatched columns'): check.append(1) else: check.append(0) if 0 in check: del D_path[item['filename'].strip('.lpd')] else: D_path={} if lipd_dict!=None: D_dict=lipd_dict if 'archiveType' in D_dict.keys(): D_dict={D_dict['dataSetName']:D_dict} if validate==True: cwd = os.getcwd() res=lpd.validate(D_dict,detailed=False) os.chdir(cwd) if remove == True: for item in res: if item['status'] == 'FAIL': c=item['feedback']['errMsgs'] check = [] for i in c: if i.startswith('Mismatched columns'): check.append(1) else: check.append(0) if 0 in check: del D_dict[item['filename'].strip('.lpd')] else: D_dict={} # raise an error if empty if not bool(D_dict) and not bool(D_path) == True: raise ValueError('No valid files; try without validation.') #assemble self.lipd={} self.lipd.update(D_path) self.lipd.update(D_dict) def __repr__(self): return str(self.__dict__)
[docs] def copy(self): '''Copy the object ''' return deepcopy(self)
[docs] def to_tso(self): '''Extracts all the timeseries objects to a list of LiPD tso Returns ------- ts_list : list List of Lipd timeseries objects as defined by LiPD utilities See also -------- pyleoclim.ui.LipdSeries : LiPD Series object. ''' cwd = os.getcwd() ts_list=lpd.extractTs(self.__dict__['lipd']) os.chdir(cwd) return ts_list
[docs] def extract(self,dataSetName): ''' Parameters ---------- dataSetName : str Extract a particular dataset Returns ------- new : pyleoclim.Lipd A new object corresponding to a particular dataset ''' new = self.copy() try: dict_out=self.__dict__['lipd'][dataSetName] new.lipd=dict_out except: pass return new
[docs] def to_LipdSeriesList(self, mode='paleo'): '''Extracts all LiPD timeseries objects to a list of LipdSeries objects Parameters ---------- mode : {'paleo','chron'} Whether to extract the timeseries information from the paleo tables or chron tables Returns ------- res : list A list of LiPDSeries objects See also -------- pyleoclim.ui.LipdSeries : LipdSeries object ''' cwd = os.getcwd() ts_list=lpd.extractTs(self.__dict__['lipd'], mode=mode) os.chdir(cwd) res=[] for idx, item in enumerate(ts_list): try: res.append(LipdSeries(item)) except: if mode == 'paleo': txt = 'The timeseries from ' + str(idx) + ': ' +\ item['dataSetName'] + ': ' + \ item['paleoData_variableName'] + \ ' could not be coerced into a LipdSeries object, passing' else: txt = 'The timeseries from ' + str(idx) + ': ' +\ item['dataSetName'] + ': ' + \ item['chronData_variableName'] + \ ' could not be coerced into a LipdSeries object, passing' warnings.warn(txt) pass return res
[docs] def to_LipdSeries(self, number = None, mode = 'paleo'): '''Extracts one timeseries from the Lipd object Note that this function may require user interaction. Parameters ---------- number : int the number of the timeseries object mode : {'paleo','chron'} whether to extract the paleo or chron series. Returns ------- ts : pyleoclim.LipdSeries A LipdSeries object See also -------- pyleoclim.ui.LipdSeries : LipdSeries object ''' cwd = os.getcwd() ts_list = lpd.extractTs(self.__dict__['lipd'], mode=mode) os.chdir(cwd) if number is None: ts = LipdSeries(ts_list) else: try: number = int(number) except: raise TypeError('Number needs to be an integer or should be coerced into an integer.') ts = LipdSeries(ts_list[number]) return ts
[docs] def mapAllArchive(self, projection = 'Robinson', proj_default = True, background = True,borders = False, rivers = False, lakes = False, figsize = None, ax = None, marker=None, color=None, markersize = None, scatter_kwargs=None, legend=True, lgd_kwargs=None, savefig_settings=None, mute=False): '''Map the records contained in LiPD files by archive type Parameters ---------- projection : str, optional The projection to use. The default is 'Robinson'. proj_default : bool, optional Wether to use the Pyleoclim defaults for each projection type. The default is True. background : bool, optional Wether to use a backgound. The default is True. borders : bool, optional Draw borders. The default is False. rivers : bool, optional Draw rivers. The default is False. lakes : bool, optional Draw lakes. The default is False. figsize : list, optional The size of the figure. The default is None. ax : matplotlib.ax, optional The matplotlib axis onto which to return the map. The default is None. marker : str, optional The marker type for each archive. The default is None. Uses plot_default color : str, optional Color for each acrhive. The default is None. Uses plot_default markersize : float, optional Size of the marker. The default is None. scatter_kwargs : dict, optional Parameters for the scatter plot. The default is None. legend : bool, optional Whether to plot the legend. The default is True. lgd_kwargs : dict, optional Arguments for the legend. The default is None. savefig_settings : dictionary, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"}. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. Returns ------- res : figure The figure See also -------- pyleoclim.utils.mapping.map_all : Underlying mapping function for Pyleoclim Examples -------- For speed, we are only using one LiPD file. But these functions can load and map multiple. .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) @savefig mapallarchive.png fig, ax = data.mapAllArchive() pyleo.closefig(fig) Change the markersize .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) @savefig mapallarchive_marker.png fig, ax = data.mapAllArchive(markersize=100) pyleo.closefig(fig) ''' scatter_kwargs = {} if scatter_kwargs is None else scatter_kwargs.copy() #get the information from the LiPD dict lat=[] lon=[] archiveType=[] for idx, key in enumerate(self.lipd): d = self.lipd[key] lat.append(d['geo']['geometry']['coordinates'][1]) lon.append(d['geo']['geometry']['coordinates'][0]) if 'archiveType' in d.keys(): archiveType.append(lipdutils.LipdToOntology(d['archiveType']).lower().replace(" ","")) else: archiveType.append('other') # make sure criteria is in the plot_default list for idx,val in enumerate(archiveType): if val not in self.plot_default.keys(): archiveType[idx] = 'other' if markersize is not None: scatter_kwargs.update({'s': markersize}) if marker==None: marker=[] for item in archiveType: marker.append(self.plot_default[item][1]) if color==None: color=[] for item in archiveType: color.append(self.plot_default[item][0]) res = mapping.map_all(lat=lat, lon=lon, criteria=archiveType, marker=marker, color =color, projection = projection, proj_default = proj_default, background = background,borders = borders, rivers = rivers, lakes = lakes, figsize = figsize, ax = ax, scatter_kwargs=scatter_kwargs, legend=legend, lgd_kwargs=lgd_kwargs,savefig_settings=savefig_settings, mute=mute) return res
#def mapNearRecord(): #res={} #return res
[docs]class LipdSeries(Series): '''Lipd time series object These objects can be obtained from a LiPD either through Pyleoclim or the LiPD utilities. If multiple objects (i.e., list) is given, then the user will be prompted to choose one timeseries. LipdSeries is a child of Series, therefore all the methods available for Series apply to LipdSeries in addition to some specific methods. Examples -------- In this example, we will import a LiPD file and explore the various options to create a series object. First, let's look at the Lipd.to_tso option. This method is attractive because the object is a list of dictionaries that are easily explored in Python. .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) ts_list = data.to_tso() # Print out the dataset name and the variable name for item in ts_list: print(item['dataSetName']+': '+item['paleoData_variableName']) # Load the sst data into a LipdSeries. Since Python indexing starts at zero, sst has index 5. ts = pyleo.LipdSeries(ts_list[5]) If you attempt to pass the full list of series, Pyleoclim will prompt you to choose a series by printing out something similar as above. If you already now the number of the timeseries object you're interested in, then you should use the following: .. ipython:: python :okwarning: ts1 = data.to_LipdSeries(number=5) If number is not specified, Pyleoclim will prompt you for the number automatically. Sometimes, one may want to create a MultipleSeries object from a collection of LiPD files. In this case, we recommend using the following: .. ipython:: python :okwarning: ts_list = data.to_LipdSeriesList() # only keep the Mg/Ca and SST ts_list=ts_list[4:] #create a MultipleSeries object ms=pyleo.MultipleSeries(ts_list) ''' def __init__(self, tso, clean_ts=True): if type(tso) is list: self.lipd_ts=lipdutils.getTs(tso) else: self.lipd_ts=tso self.plot_default = {'ice-other': ['#FFD600','h'], 'ice/rock': ['#FFD600', 'h'], 'coral': ['#FF8B00','o'], 'documents':['k','p'], 'glacierice':['#86CDFA', 'd'], 'hybrid': ['#00BEFF','*'], 'lakesediment': ['#4169E0','s'], 'marinesediment': ['#8A4513', 's'], 'sclerosponge' : ['r','o'], 'speleothem' : ['#FF1492','d'], 'wood' : ['#32CC32','^'], 'molluskshells' : ['#FFD600','h'], 'peat' : ['#2F4F4F','*'], 'midden' : ['#824E2B','o'], 'other':['k','o']} try: time, label= lipdutils.checkTimeAxis(self.lipd_ts) if label=='age': time_name='Age' if 'ageUnits' in self.lipd_ts.keys(): time_unit=self.lipd_ts['ageUnits'] else: time_unit=None elif label=='year': time_name='Year' if 'yearUnits' in self.lipd_ts.keys(): time_unit=self.lipd_ts['yearUnits'] else: time_unit=None try: if self.lipd_ts['mode'] == 'paleoData': value=np.array(self.lipd_ts['paleoData_values'],dtype='float64') #Remove NaNs #ys_tmp=np.copy(value) #value=value[~np.isnan(ys_tmp)] #time=time[~np.isnan(ys_tmp)] value_name=self.lipd_ts['paleoData_variableName'] if 'paleoData_units' in self.lipd_ts.keys(): value_unit=self.lipd_ts['paleoData_units'] else: value_unit=None label=self.lipd_ts['dataSetName'] super(LipdSeries,self).__init__(time=time,value=value,time_name=time_name, time_unit=time_unit,value_name=value_name,value_unit=value_unit, label=label,clean_ts=clean_ts) elif self.lipd_ts['mode'] == 'chronData': value=np.array(self.lipd_ts['chronData_values'],dtype='float64') #Remove NaNs #ys_tmp=np.copy(value) #value=value[~np.isnan(ys_tmp)] #time=time[~np.isnan(ys_tmp)] value_name=self.lipd_ts['chronData_variableName'] if 'paleoData_units' in self.lipd_ts.keys(): value_unit=self.lipd_ts['chronData_units'] else: value_unit=None label=self.lipd_ts['dataSetName'] super(LipdSeries,self).__init__(time=time,value=value,time_name=time_name, time_unit=time_unit,value_name=value_name,value_unit=value_unit, label=label,clean_ts=clean_ts) except: raise ValueError("paleoData_values should contain floats") except: raise KeyError("No time information present")
[docs] def copy(self): '''Copy the object ''' return deepcopy(self)
[docs] def chronEnsembleToPaleo(self,D,modelNumber=None,tableNumber=None): '''Fetch chron ensembles from a lipd object and return the ensemble as MultipleSeries Parameters ---------- D : a LiPD object modelNumber : int, optional Age model number. The default is None. tableNumber : int, optional Table Number. The default is None. Raises ------ ValueError DESCRIPTION. Returns ------- ms : pyleoclim.EnsembleSeries An EnsembleSeries object with each series representing a possible realization of the age model ''' #get the corresponding LiPD dataSetName=self.lipd_ts['dataSetName'] if type(D) is dict: try: lipd=D[dataSetName] except: lipd=D else: a=D.extract(dataSetName) lipd=a.__dict__['lipd'] #Look for the ensemble and get values cwd = os.getcwd() csv_dict=lpd.getCsv(lipd) os.chdir(cwd) chron,paleo = lipdutils.isEnsemble(csv_dict) if len(chron)==0: raise ValueError("No ChronMeasurementTables available") elif len(chron)>1: if modelNumber==None or tableNumber==None: csvName=lipdutils.whichEnsemble(chron) else: str1='model'+str(modelNumber) str2='ensemble'+str(tableNumber) for item in chron: if str1 in item and str2 in item: csvName=item depth, ensembleValues =lipdutils.getEnsemble(csv_dict,csvName) else: depth, ensembleValues =lipdutils.getEnsemble(csv_dict,chron[0]) #make sure it's sorted sort_ind = np.argsort(depth) depth=list(np.array(depth)[sort_ind]) ensembleValues=ensembleValues[sort_ind,:] #Map to paleovalues key=[] for item in self.lipd_ts.keys(): if 'depth' in item and 'Units' not in item: key.append(item) key=key[0] ds= np.array(self.lipd_ts[key],dtype='float64') if 'paleoData_values' in self.lipd_ts.keys(): ys= np.array(self.lipd_ts['paleoData_values'],dtype='float64') elif 'chronData_values' in self.lipd_ts.keys(): ys= np.array(self.lipd_ts['chronData_values'],dtype='float64') else: raise KeyError('no y-axis values available') #Remove NaNs ys_tmp=np.copy(ys) ds=ds[~np.isnan(ys_tmp)] sort_ind2=np.argsort(ds) ds=np.array(ds[sort_ind2]) ys=np.array(ys[sort_ind2]) ensembleValuestoPaleo=lipdutils.mapAgeEnsembleToPaleoData(ensembleValues, depth, ds) #create multipleseries s_list=[] for i, s in enumerate(ensembleValuestoPaleo.T): s_tmp = Series(time=s,value=ys, verbose=i==0, clean_ts=False) s_list.append(s_tmp) ens = EnsembleSeries(series_list=s_list) return ens
[docs] def map(self,projection = 'Orthographic', proj_default = True, background = True,borders = False, rivers = False, lakes = False, figsize = None, ax = None, marker=None, color=None, markersize = None, scatter_kwargs=None, legend=True, lgd_kwargs=None, savefig_settings=None, mute=False): '''Map the location of the record Parameters ---------- projection : str, optional The projection to use. The default is 'Robinson'. proj_default : bool, optional Wether to use the Pyleoclim defaults for each projection type. The default is True. background : bool, optional Wether to use a backgound. The default is True. borders : bool, optional Draw borders. The default is False. rivers : bool, optional Draw rivers. The default is False. lakes : bool, optional Draw lakes. The default is False. figsize : list, optional The size of the figure. The default is None. ax : matplotlib.ax, optional The matplotlib axis onto which to return the map. The default is None. marker : str, optional The marker type for each archive. The default is None. Uses plot_default color : str, optional Color for each acrhive. The default is None. Uses plot_default markersize : float, optional Size of the marker. The default is None. scatter_kwargs : dict, optional Parameters for the scatter plot. The default is None. legend : bool, optional Whether to plot the legend. The default is True. lgd_kwargs : dict, optional Arguments for the legend. The default is None. savefig_settings : dictionary, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"}. The default is None. mute : bool, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. Returns ------- res : fig See also -------- pyleoclim.utils.mapping.map_all : Underlying mapping function for Pyleoclim Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) ts = data.to_LipdSeries(number=5) @savefig mapone.png fig, ax = ts.map() pyleo.closefig(fig) ''' scatter_kwargs = {} if scatter_kwargs is None else scatter_kwargs.copy() #get the information from the timeseries lat=[self.lipd_ts['geo_meanLat']] lon=[self.lipd_ts['geo_meanLon']] lon=[self.lipd_ts['geo_meanLon']] if 'archiveType' in self.lipd_ts.keys(): archiveType=lipdutils.LipdToOntology(self.lipd_ts['archiveType']).lower().replace(" ","") else: archiveType=('other') # make sure criteria is in the plot_default list if archiveType not in self.plot_default.keys(): archiveType = 'other' if markersize is not None: scatter_kwargs.update({'s': markersize}) if marker==None: marker= self.plot_default[archiveType][1] if color==None: color=self.plot_default[archiveType][0] if proj_default==True: proj1={'central_latitude':lat[0], 'central_longitude':lon[0]} proj2={'central_latitude':lat[0]} proj3={'central_longitude':lon[0]} archiveType=[archiveType] #list so it will work with map_all marker=[marker] color=[color] if proj_default==True: try: res = mapping.map_all(lat=lat, lon=lon, criteria=archiveType, marker=marker, color =color, projection = projection, proj_default = proj1, background = background,borders = borders, rivers = rivers, lakes = lakes, figsize = figsize, ax = ax, scatter_kwargs=scatter_kwargs, legend=legend, lgd_kwargs=lgd_kwargs,savefig_settings=savefig_settings, mute=mute) except: try: res = mapping.map_all(lat=lat, lon=lon, criteria=archiveType, marker=marker, color =color, projection = projection, proj_default = proj3, background = background,borders = borders, rivers = rivers, lakes = lakes, figsize = figsize, ax = ax, scatter_kwargs=scatter_kwargs, legend=legend, lgd_kwargs=lgd_kwargs,savefig_settings=savefig_settings, mute=mute) except: res = mapping.map_all(lat=lat, lon=lon, criteria=archiveType, marker=marker, color =color, projection = projection, proj_default = proj2, background = background,borders = borders, rivers = rivers, lakes = lakes, figsize = figsize, ax = ax, scatter_kwargs=scatter_kwargs, legend=legend, lgd_kwargs=lgd_kwargs,savefig_settings=savefig_settings, mute=mute) else: res = mapping.map_all(lat=lat, lon=lon, criteria=archiveType, marker=marker, color =color, projection = projection, proj_default = proj_default, background = background,borders = borders, rivers = rivers, lakes = lakes, figsize = figsize, ax = ax, scatter_kwargs=scatter_kwargs, legend=legend, lgd_kwargs=lgd_kwargs,savefig_settings=savefig_settings, mute=mute) return res
[docs] def getMetadata(self): """ Get the necessary metadata for the ensemble plots Parameters ---------- timeseries : object a specific timeseries object. Returns ------- res : dict A dictionary containing the following metadata: archiveType Authors (if more than 2, replace by et al) PublicationYear Publication DOI Variable Name Units Climate Interpretation Calibration Equation Calibration References Calibration Notes """ # Get all the necessary information # Top level information if "archiveType" in self.lipd_ts.keys(): archiveType = self.lipd_ts["archiveType"] else: archiveType = "NA" if "pub1_author" in self.lipd_ts.keys(): authors = self.lipd_ts["pub1_author"] else: authors = "NA" #Truncate if more than two authors idx = [pos for pos, char in enumerate(authors) if char == ";"] if len(idx)>2: authors = authors[0:idx[1]+1] + "et al." if "pub1_year" in self.lipd_ts.keys(): Year = str(self.lipd_ts["pub1_year"]) else: Year = "NA" if "pub1_doi" in self.lipd_ts.keys(): DOI = self.lipd_ts["pub1_doi"] else: DOI = "NA" if self.lipd_ts['mode'] == 'paleoData': prefix = 'paleo' else: prefix = 'chron' if prefix+"Data_InferredVariableType" in self.lipd_ts.keys(): if type(self.lipd_ts[prefix+"Data_InferredVariableType"]) is list: Variable = self.lipd_ts[prefix+"Data_InferredVariableType"][0] else: Variable = self.lipd_ts[prefix+"Data_InferredVariableType"] elif prefix+"Data_ProxyObservationType" in self.lipd_ts.keys(): if type(self.lipd_ts[prefix+"Data_ProxyObservationType"]) is list: Variable = self.lipd_ts[prefix+"Data_ProxyObservationType"][0] else: Variable = self.lipd_ts[prefix+"Data_ProxyObservationType"] else: Variable = self.lipd_ts[prefix+"Data_variableName"] if prefix+"Data_units" in self.lipd_ts.keys(): units = self.lipd_ts[prefix+"Data_units"] else: units = "NA" #Climate interpretation information if prefix+"Data_interpretation" in self.lipd_ts.keys(): interpretation = self.lipd_ts[prefix+"Data_interpretation"][0] if "name" in interpretation.keys(): ClimateVar = interpretation["name"] elif "variable" in interpretation.keys(): ClimateVar = interpretation["variable"] else: ClimateVar = "NA" if "detail" in interpretation.keys(): Detail = interpretation["detail"] elif "variableDetail" in interpretation.keys(): Detail = interpretation['variableDetail'] else: Detail = "NA" if "scope" in interpretation.keys(): Scope = interpretation['scope'] else: Scope = "NA" if "seasonality" in interpretation.keys(): Seasonality = interpretation["seasonality"] else: Seasonality = "NA" if "interpdirection" in interpretation.keys(): Direction = interpretation["interpdirection"] else: Direction = "NA" else: ClimateVar = "NA" Detail = "NA" Scope = "NA" Seasonality = "NA" Direction = "NA" # Calibration information if prefix+"Data_calibration" in self.lipd_ts.keys(): calibration = self.lipd_ts[prefix+'Data_calibration'][0] if "equation" in calibration.keys(): Calibration_equation = calibration["equation"] else: Calibration_equation = "NA" if "calibrationReferences" in calibration.keys(): ref = calibration["calibrationReferences"] if "author" in ref.keys(): ref_author = ref["author"][0] # get the first author else: ref_author = "NA" if "publicationYear" in ref.keys(): ref_year = str(ref["publicationYear"]) else: ref_year="NA" Calibration_notes = ref_author +"."+ref_year elif "notes" in calibration.keys(): Calibration_notes = calibration["notes"] else: Calibration_notes = "NA" else: Calibration_equation = "NA" Calibration_notes = "NA" #Truncate the notes if too long charlim = 30; if len(Calibration_notes)>charlim: Calibration_notes = Calibration_notes[0:charlim] + " ..." res = {"archiveType" : archiveType, "authors" : authors, "Year": Year, "DOI": DOI, "Variable": Variable, "units": units, "Climate_Variable" : ClimateVar, "Detail" : Detail, "Scope":Scope, "Seasonality" : Seasonality, "Interpretation_Direction" : Direction, "Calibration_equation" : Calibration_equation, "Calibration_notes" : Calibration_notes} return res
[docs] def dashboard(self, figsize = [11,8], plt_kwargs=None, distplt_kwargs=None, spectral_kwargs=None, spectralsignif_kwargs=None, spectralfig_kwargs=None, map_kwargs=None, metadata = True, savefig_settings=None, mute=False, ensemble = False, D=None): ''' Parameters ---------- figsize : list, optional Figure size. The default is [11,8]. plt_kwargs : dict, optional Optional arguments for the timeseries plot. See Series.plot() or EnsembleSeries.plot_envelope(). The default is None. distplt_kwargs : dict, optional Optional arguments for the distribution plot. See Series.distplot() or EnsembleSeries.plot_displot(). The default is None. spectral_kwargs : dict, optional Optional arguments for the spectral method. Default is to use Lomb-Scargle method. See Series.spectral() or EnsembleSeries.spectral(). The default is None. spectralsignif_kwargs : dict, optional Optional arguments to estimate the significance of the power spectrum. See PSD.signif_test. Note that we currently do not support significance testing for ensembles. The default is None. spectralfig_kwargs : dict, optional Optional arguments for the power spectrum figure. See PSD.plot() or MultiplePSD.plot_envelope(). The default is None. map_kwargs : dict, optional Optional arguments for the map. See LipdSeries.map(). The default is None. metadata : {True,False}, optional Whether or not to produce a dashboard with printed metadata. The default is True. savefig_settings : dict, optional the dictionary of arguments for plt.savefig(); some notes below: - "path" must be specified; it can be any existed or non-existed path, with or without a suffix; if the suffix is not given in "path", it will follow "format" - "format" can be one of {"pdf", "eps", "png", "ps"}. The default is None. mute : {True,False}, optional if True, the plot will not show; recommend to turn on when more modifications are going to be made on ax. The default is False. ensemble : {True, False}, optional If True, will return the dashboard in ensemble modes if ensembles are available D : pyleoclim.Lipd If asking for an ensemble plot, a pyleoclim.Lipd object must be provided Returns ------- fig : matplotlib.figure The figure ax : matplolib.axis The axis. See also -------- pyleoclim.Series.plot : plot a timeseries pyleoclim.EnsembleSeries.plot_envelope: Envelope plots for an ensemble pyleoclim.Series.distplot : plot a distribution of the timeseries pyleoclim.EnsembleSeries.distplot : plot a distribution of the timeseries across ensembles pyleoclim.Series.spectral : spectral analysis method. pyleoclim.MultipleSeries.spectral : spectral analysis method for multiple series. pyleoclim.PSD.signif_test : significance test for timeseries analysis pyleoclim.PSD.plot : plot power spectrum pyleoclim.MulitplePSD.plot : plot envelope of power spectrum pyleoclim.LipdSeries.map : map location of dataset pyleolim.LipdSeries.getMetadata : get relevant metadata from the timeseries object pyleoclim.utils.mapping.map_all : Underlying mapping function for Pyleoclim Examples -------- .. ipython:: python :okwarning: import pyleoclim as pyleo url = 'http://wiki.linked.earth/wiki/index.php/Special:WTLiPD?op=export&lipdid=MD982176.Stott.2004' data = pyleo.Lipd(usr_path = url) ts = data.to_LipdSeries(number=5) @savefig ts_dashboard.png fig, ax = ts.dashboard() pyleo.closefig(fig) ''' if ensemble == True and D is None: raise ValueError("When an ensemble dashboard is requested, the corresponsind Lipd object must be supplied") if ensemble == True: warnings.warn('Some of the computation in ensemble mode can require a few minutes to complete.') savefig_settings = {} if savefig_settings is None else savefig_settings.copy() res=self.getMetadata() # start plotting fig = plt.figure(figsize=figsize) gs = gridspec.GridSpec(2,5) gs.update(left=0,right=1.1) if ensemble==True: ens = self.chronEnsembleToPaleo(D) ensc = ens.common_time() ax={} # Plot the timeseries plt_kwargs={} if plt_kwargs is None else plt_kwargs.copy() ax['ts'] = plt.subplot(gs[0,:-3]) plt_kwargs.update({'ax':ax['ts']}) # use the defaults if color/markers not specified if ensemble == False: if 'marker' not in plt_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") plt_kwargs.update({'marker':self.plot_default[archiveType][1]}) if 'color' not in plt_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") plt_kwargs.update({'color':self.plot_default[archiveType][0]}) ax['ts'] = self.plot(**plt_kwargs) elif ensemble == True: if 'curve_clr' not in plt_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") plt_kwargs.update({'curve_clr':self.plot_default[archiveType][0]}) if 'shade_clr' not in plt_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") plt_kwargs.update({'shade_clr':self.plot_default[archiveType][0]}) ax['ts'] = ensc.plot_envelope(**plt_kwargs) else: raise ValueError("Invalid argument value for ensemble") ymin, ymax = ax['ts'].get_ylim() #plot the distplot distplt_kwargs={} if distplt_kwargs is None else distplt_kwargs.copy() ax['dts'] = plt.subplot(gs[0,2]) distplt_kwargs.update({'ax':ax['dts']}) distplt_kwargs.update({'ylabel':'PDF'}) distplt_kwargs.update({'vertical':True}) if 'color' not in distplt_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") distplt_kwargs.update({'color':self.plot_default[archiveType][0]}) if ensemble == False: ax['dts'] = self.distplot(**distplt_kwargs) elif ensemble == True: ax['dts'] = ensc.distplot(**distplt_kwargs) ax['dts'].set_ylim([ymin,ymax]) ax['dts'].set_yticklabels([]) ax['dts'].set_ylabel('') ax['dts'].set_yticks([]) #make the map - brute force since projection is not being returned properly lat=[self.lipd_ts['geo_meanLat']] lon=[self.lipd_ts['geo_meanLon']] map_kwargs={} if map_kwargs is None else map_kwargs.copy() if 'projection' in map_kwargs.keys(): projection=map_kwargs['projection'] else: projection='Orthographic' if 'proj_default' in map_kwargs.keys(): proj_default=map_kwargs['proj_default'] else: proj_default=True if proj_default==True: proj1={'central_latitude':lat[0], 'central_longitude':lon[0]} proj2={'central_latitude':lat[0]} proj3={'central_longitude':lon[0]} try: proj = mapping.set_proj(projection=projection, proj_default=proj1) except: try: proj = mapping.set_proj(projection=projection, proj_default=proj3) except: proj = mapping.set_proj(projection=projection, proj_default=proj2) if 'marker' in map_kwargs.keys(): marker = map_kwargs['marker'] else: marker = self.plot_default[archiveType][1] if 'color' in map_kwargs.keys(): color = map_kwargs['color'] else: color = self.plot_default[archiveType][0] if 'background' in map_kwargs.keys(): background = map_kwargs['background'] else: background = True if 'borders' in map_kwargs.keys(): borders= map_kwargs['borders'] else: borders = False if 'rivers' in map_kwargs.keys(): rivers= map_kwargs['rivers'] else: rivers = False if 'lakes' in map_kwargs.keys(): lakes = map_kwargs['lakes'] else: lakes = False if 'scatter_kwargs' in map_kwargs.keys(): scatter_kwargs = map_kwargs['scatter_kwargs'] else: scatter_kwargs={} if 'markersize' in map_kwargs.keys(): scatter_kwargs.update({'s': map_kwargs['markersize']}) else: pass if 'lgd_kwargs' in map_kwargs.keys(): lgd_kwargs = map_kwargs['lgd_kwargs'] else: lgd_kwargs ={} if 'legend' in map_kwargs.keys(): legend = map_kwargs['legend'] else: legend = False #make the plot map data_crs = ccrs.PlateCarree() ax['map'] = plt.subplot(gs[1,0],projection=proj) ax['map'].coastlines() if background is True: ax['map'].stock_img() #Other extra information if borders is True: ax['map'].add_feature(cfeature.BORDERS) if lakes is True: ax['map'].add_feature(cfeature.LAKES) if rivers is True: ax['map'].add_feature(cfeature.RIVERS) ax['map'].scatter(lon,lat,zorder=10,label=marker,facecolor=color,transform=data_crs, **scatter_kwargs) if legend == True: ax.legend(**lgd_kwargs) #spectral analysis spectral_kwargs={} if spectral_kwargs is None else spectral_kwargs.copy() if 'method' in spectral_kwargs.keys(): pass else: spectral_kwargs.update({'method':'lomb_scargle'}) if 'freq_method' in spectral_kwargs.keys(): pass else: if ensemble == False: spectral_kwargs.update({'freq_method':'lomb_scargle'}) elif ensemble == True: pass ax['spec'] = plt.subplot(gs[1,1:3]) spectralfig_kwargs={} if spectralfig_kwargs is None else spectralfig_kwargs.copy() spectralfig_kwargs.update({'ax':ax['spec']}) if ensemble == False: ts_preprocess = self.detrend().standardize() psd = ts_preprocess.spectral(**spectral_kwargs) #Significance test spectralsignif_kwargs={} if spectralsignif_kwargs is None else spectralsignif_kwargs.copy() psd_signif = psd.signif_test(**spectralsignif_kwargs) #plot if 'color' not in spectralfig_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") spectralfig_kwargs.update({'color':self.plot_default[archiveType][0]}) if 'signif_clr' not in spectralfig_kwargs.keys(): spectralfig_kwargs.update({'signif_clr':'grey'}) ax['spec'] = psd_signif.plot(**spectralfig_kwargs) elif ensemble == True: if 'curve_clr' not in spectralfig_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") spectralfig_kwargs.update({'curve_clr':self.plot_default[archiveType][0]}) if 'shade_clr' not in spectralfig_kwargs.keys(): archiveType = lipdutils.LipdToOntology(res['archiveType']).lower().replace(" ","") spectralfig_kwargs.update({'shade_clr':self.plot_default[archiveType][0]}) psd = ensc.detrend().standardize().spectral(**spectral_kwargs) #plot ax['spec'] = psd.plot_envelope(**spectralfig_kwargs) #Make the plot if metadata == True: # get metadata textstr = "archiveType: " + res["archiveType"]+"\n"+"\n"+\ "Authors: " + res["authors"]+"\n"+"\n"+\ "Year: " + res["Year"]+"\n"+"\n"+\ "DOI: " + res["DOI"]+"\n"+"\n"+\ "Variable: " + res["Variable"]+"\n"+"\n"+\ "units: " + res["units"]+"\n"+"\n"+\ "Climate Interpretation: " +"\n"+\ " Climate Variable: " + res["Climate_Variable"] +"\n"+\ " Detail: " + res["Detail"]+"\n"+\ " Seasonality: " + res["Seasonality"]+"\n"+\ " Direction: " + res["Interpretation_Direction"]+"\n \n"+\ "Calibration: \n" + \ " Equation: " + res["Calibration_equation"] + "\n" +\ " Notes: " + res["Calibration_notes"] plt.figtext(0.7, 0.4, textstr, fontsize = 12) if 'path' in savefig_settings: plotting.savefig(fig, settings=savefig_settings) else: if not mute: plotting.showfig(fig) return fig, ax