"""
The Series class describes the most basic objects in Pyleoclim. A Series is a simple `dictionary <https://docs.python.org/3/tutorial/datastructures.html#dictionaries>`_ that contains 3 things:
- a series of real-valued numbers;
- a time axis at which those values were measured/simulated ;
- optionally, some metadata about both axes, like units, labels and the like.
How to create and manipulate such objects is described in a short example below, while `this notebook <https://nbviewer.jupyter.org/github/LinkedEarth/Pyleoclim_util/blob/master/example_notebooks/pyleoclim_ui_tutorial.ipynb>`_ demonstrates how to apply various Pyleoclim methods to Series objects.
"""
import datetime as dt
import re
from ..utils import tsutils, plotting, tsmodel, tsbase, lipdutils, jsonutils
from ..utils import wavelet as waveutils
from ..utils import spectral as specutils
from ..utils import correlation as corrutils
from ..utils import causality as causalutils
from ..utils import decomposition
from ..utils import filter as filterutils
from ..core.psds import PSD
from ..core.ssares import SsaRes
from ..core.multipleseries import MultipleSeries
from ..core.scalograms import Scalogram
from ..core.coherence import Coherence
from ..core.corr import Corr
from ..core.surrogateseries import SurrogateSeries
from ..core.resolutions import Resolution
from ..core.surrogateseries import supported_surrogates
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from collections import namedtuple
from copy import deepcopy
import matplotlib.colors as mcolors
import matplotlib.colorbar as mcb
import random
import csv
#from matplotlib import gridspec
from tqdm import tqdm
import warnings
import collections
from pprint import pprint
from importlib.metadata import version
def dict2namedtuple(d):
''' Convert a dictionary to a namedtuple
'''
tupletype = namedtuple('tupletype', sorted(d))
return tupletype(**d)
[docs]class Series:
'''The Series class describes the most basic objects in Pyleoclim.
A Series is a simple `dictionary <https://docs.python.org/3/tutorial/datastructures.html#dictionaries>`_ that contains 3 things:
* value, an array of real-valued numbers;
* time, a coordinate axis at which those values were obtained ;
* optionally, some metadata about both axes, like units, labels and origin.
How to create, manipulate and use such objects is described in `PyleoTutorials <https://linked.earth/PyleoTutorials/>`_.
Parameters
----------
time : list or numpy.array
time axis (prograde or retrograde)
value : list of numpy.array
values of the dependent variable (y)
time_unit : string
Units for the time vector (e.g., 'ky BP').
Default is None. in which case 'years CE' are assumed
time_name : string
Name of the time vector (e.g., 'Time','Age').
Default is None. This is used to label the time axis on plots
value_name : string
Name of the value vector (e.g., 'temperature')
Default is None
value_unit : string
Units for the value vector (e.g., 'deg C')
Default is None
label : string
Name of the time series (e.g., 'NINO 3.4')
Default is None
log : dict
Dictionary of tuples documentating the various transformations applied to the object
keep_log : bool
Whether to keep a log of applied transformations. False by default
importedFrom : string
source of the dataset. If it came from a LiPD file, this could be the datasetID property
archiveType : string
climate archive, one of 'Borehole', 'Coral', 'FluvialSediment', 'GlacierIce', 'GroundIce', 'LakeSediment', 'MarineSediment', 'Midden', 'MolluskShell', 'Peat', 'Sclerosponge', 'Shoreline', 'Speleothem', 'TerrestrialSediment', 'Wood'
Reference: https://lipdverse.org/vocabulary/archivetype/
control_archiveType : [True, False]
Whether to standardize the name of the archiveType agains the vocabulary from: https://lipdverse.org/vocabulary/paleodata_proxy/.
If set to True, will only allow for these terms and automatically convert known synonyms to the standardized name. Only standardized variable names will be automatically assigned a color scheme.
Default is False.
dropna : bool
Whether to drop NaNs from the series to prevent downstream functions from choking on them
defaults to True
sort_ts : str
Direction of sorting over the time coordinate; 'ascending' or 'descending'
Defaults to 'ascending'
verbose : bool
If True, will print warning messages if there are any
clean_ts : boolean flag
set to True to remove the NaNs and make time axis strictly prograde with duplicated timestamps reduced by averaging the values
Default is None (marked for deprecation)
auto_time_params : bool,
If True, uses tsbase.disambiguate_time_metadata to ensure that time_name and time_unit are usable by Pyleoclim. This may override the provided metadata.
If False, the provided time_name and time_unit are used. This may break some functionalities (e.g. common_time and convert_time_unit), so use at your own risk.
If not provided, code will set to True for internal consistency.
Examples
--------
Import the Southern Oscillation Index (SOI) and display a quick synopsis:
.. jupyter-execute::
import pyleoclim as pyleo
soi = pyleo.utils.load_dataset('SOI')
soi.view()
'''
def __init__(self, time, value, time_unit=None, time_name=None,
value_name=None, value_unit=None, label=None,
importedFrom=None, archiveType = None, control_archiveType=False,
log=None, keep_log=False, sort_ts = 'ascending', dropna = True, verbose=True, clean_ts=False,
auto_time_params = None):
# ensure ndarray instances
time = np.array(time)
value = np.array(value)
if auto_time_params is None:
auto_time_params = True
if verbose:
warnings.warn('auto_time_params is not specified. Currently default behavior sets this to True, which might modify your supplied time metadata. Please set to False if you want a different behavior.', UserWarning, stacklevel=2)
if auto_time_params:
# assign time metadata if they are not provided or provided incorrectly
offending = [tsbase.MATCH_CE, tsbase.MATCH_BP]
if time_unit is None:
time_unit='years CE'
if verbose:
warnings.warn(f'No time_unit parameter provided. Assuming {time_unit}.', UserWarning, stacklevel=2)
elif time_unit.lower().replace(".","") in frozenset().union(*offending):
# fix up time name and units for offending cases
time_name, time_unit = tsbase.disambiguate_time_metadata(time_unit)
else:
# give a proper time name to those series that confuse that notion with time units
time_name, _ = tsbase.disambiguate_time_metadata(time_unit)
if time_name is None:
if verbose:
warnings.warn('No time_name parameter provided. Assuming "Time".', UserWarning, stacklevel=2)
time_name='Time'
elif time_name in tsbase.MATCH_A:
if verbose:
warnings.warn(f'{time_name} refers to the units, not the name of the axis. Picking "Time" instead', UserWarning, stacklevel=2)
time_name='Time'
else:
pass
if log is None:
if keep_log == True:
self.log = ()
else:
self.log = None
else:
self.log = log
if clean_ts == True:
if dropna == False or sort_ts == 'descending':
warnings.warn(f'clean_ts implies dropna=True and sort_ts=ascending; provided values are {dropna, sort_ts}', UserWarning)
else:
dropna = True
sort_ts ='ascending'
elif clean_ts == False:
pass
else:
raise ValueError('clean_ts should be a boolean')
if dropna == True:
value, time = tsbase.dropna(value, time, verbose=verbose)
if keep_log == True:
if len(self.log) > 0:
if self.log[0][0] == 'dropna' and self.log[0]['applied'] == True:
pass # no need to clog the log with redundant information
elif self.log[0][0] == 'clean_ts' and self.log[0]['applied']==True:
self.log[0]['legacy']=True
self.log += ({len(self.log): 'dropna', 'applied': dropna, 'verbose': verbose},)
else:
self.log += ({len(self.log): 'dropna', 'applied': dropna, 'verbose': verbose},)
elif dropna == False:
pass
else:
raise ValueError('dropna should be a boolean')
# if check_sorting:
# res, stats, sign = tsbase.resolution(time)
# if sign == 'mixed':
# warnings.warn("The Series time axis is non-monotonic, which may cause errors. Suggest applying .sort()")
# if sort == 'auto':
# _, _, direction = tsbase.time_unit_to_datum_exp_dir(time_unit)
# value, time = tsbase.sort_ts(np.array(value), np.array(time),
# ascending = (direction == 'prograde'),
# verbose=verbose)
# self.log += ({nlog+1: 'sort', 'direction': direction},)
if sort_ts is not None:
if sort_ts in ['ascending', 'descending']:
value, time = tsbase.sort_ts(value, time, verbose=verbose,
ascending = sort_ts == 'ascending')
if keep_log == True:
if len(self.log) > 1:
if self.log[1][1] == 'sort_ts' and self.log[1]['direction'] == 'ascending':
pass # no need to clog the log with redundant information
elif self.log[0][0] == 'clean_ts' and self.log[0]['applied']==True:
self.log[0]['legacy']=True
self.log += ({len(self.log): 'sort_ts', 'direction': sort_ts},)
else:
self.log += ({len(self.log): 'sort_ts', 'direction': sort_ts},)
else:
print(f"Unknown sorting option {sort_ts}; no sorting applied")
self.time = time
self.value = value
self.time_name = time_name
self.time_unit = time_unit
self.value_name = value_name
self.value_unit = value_unit
self.label = label
self.dropna = dropna
self.sort_ts = sort_ts
self.clean_ts = clean_ts
self.importedFrom = importedFrom
self.control_archiveType = control_archiveType
if archiveType is not None:
#Deal with archiveType
#Get the possible list of archiveTyp
if control_archiveType == True:
res = lipdutils.get_archive_type()
std_var = list(res.keys())
std_var_lower = [key.lower() for key in res.keys()]
data = []
for key, values in res.items():
if values: # Check if the list is not empty
for val in values:
data.append([key.lower(), val.lower().replace(" ", "")])
else:
data.append([key.lower(), None]) # If the list is empty, append None as value
# Creating DataFrame
df = pd.DataFrame(data, columns=['Key', 'Value'])
synonym = df[df['Value'].isin([archiveType.lower().replace(" ", "")])]['Key']
if synonym.empty == True:
synonym = None
else:
synonym = synonym.to_string(header=False, index=False)
if archiveType.lower().replace(" ", "") in std_var_lower:
index = std_var_lower.index(archiveType.lower().replace(" ", ""))
self.archiveType = std_var[index]
elif synonym is not None:
index = std_var_lower.index(synonym)
self.archiveType = std_var[index]
else:
raise ValueError('Not a proper archiveName or a known synonym')
else:
self.archiveType = archiveType
else:
self.archiveType = None
def __repr__(self):
ser = self.to_pandas(paleo_style=True)
d = self.metadata
keys = ['importedFrom', 'label', 'archiveType', 'log']
metadata = {key: d[key] for key in keys if d[key] is not None}
#df = ser.to_frame()
return f'{pprint(metadata)}\n{repr(ser)}'
def __and__(self, other):
"""
Combine with another Series to get MultipleSeries.
Parameters
----------
other
Series to combine with.
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
import numpy as np
ts1 = pyleo.Series(time=np.array([1, 2, 4]), value=np.array([7, 4, 9]), time_unit='years CE', label='ts1')
ts2 = pyleo.Series(time=np.array([1, 3, 4]), value=np.array([7, 8, 1]), time_unit='years CE', label='ts2')
ts3 = pyleo.Series(time=np.array([1, 3, 4]), value=np.array([7, 8, 1]), time_unit='years CE', label='ts3')
# Combine ts1, ts2, and ts3 into a multiple series:
ms = ts1 & ts2 & ts3
"""
if not isinstance(other, Series):
raise TypeError(f"Expected pyleo.Series, got: {type(other)}")
return MultipleSeries([self, other])
@property
def datetime_index(self):
"""
Convert time to pandas DatetimeIndex.
Note: conversion will happen using `time_unit`, and will assume:
- the number of seconds per year is calculated using UDUNITS, see
http://cfconventions.org/cf-conventions/cf-conventions#time-coordinate
- `time` refers to the Gregorian calendar. If using a different calendar,
then please make sure to do any conversions before hand.
"""
datum, exponent, direction = tsbase.time_unit_to_datum_exp_dir(self.time_unit)
index = tsbase.time_to_datetime(self.time, datum, exponent, direction)
return pd.DatetimeIndex(index, name='datetime')
@property
def metadata(self):
return dict(
time_unit = self.time_unit,
time_name = self.time_name,
value_unit = self.value_unit,
value_name = self.value_name,
label = self.label,
archiveType = self.archiveType,
importedFrom = self.importedFrom,
log = self.log,
)
@classmethod
def from_pandas(cls, ser, metadata):
if isinstance(ser.index, pd.DatetimeIndex):
index = ser.index.as_unit('s') if ser.index.unit != 's' else ser.index
time = tsbase.convert_datetime_index_to_time(index, metadata['time_unit'], metadata['time_name'])
else:
raise ValueError('The provided index must be a proper DatetimeIndex object')
# metadata gap-filling. This does not handle the edge case where the keys exist but the entries are None
if 'time_name' not in metadata.keys():
metadata['time_name'] = ser.index.name
if 'value_name' not in metadata.keys():
metadata['value_name'] = ser.name
return cls(time=time,value=ser.values, **metadata,
sort_ts = None, dropna = False, verbose=False)
[docs] def to_pandas(self, paleo_style=False):
'''
Export to pandas Series
Parameters
----------
paleo_style : boolean, optional
If True, will replace datetime with time and label columns with units . The default is False.
Returns
-------
ser : pd.Series representation of the pyleo.Series object
'''
ser = pd.Series(self.value, index=self.datetime_index, name=self.value_name)
if paleo_style:
time_label, value_label = self.make_labels()
ser = ser.set_axis(self.time).rename(value_label).rename_axis(time_label)
return ser
[docs] def to_csv(self, metadata_header=True, path = None):
'''
Export Series to csv
Parameters
----------
metadata_header : boolean, optional
DESCRIPTION. The default is True.
path : str, optional
system path to save the file. Default is '.'
Returns
-------
None
See Also
--------
pyleoclim.Series.from_csv
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
LR04 = pyleo.utils.load_dataset('LR04')
LR04.to_csv()
lr04 = pyleo.Series.from_csv('LR04_benthic_stack.csv')
LR04.equals(lr04)
'''
if path is None:
path = self.label.replace(" ", "_") + '.csv' if self.label is not None else 'series.csv'
ser = self.to_pandas(paleo_style=True)
# export metadata
if metadata_header:
with open(path, 'w', newline='') as file:
hd_writer = csv.writer(file)
hd_writer.writerow(["###", "Series metadata"])
hd_writer.writerow(["written by", "Pyleoclim " + version('Pyleoclim')])
hd_writer.writerows(self.metadata.items())
hd_writer.writerow(["###", "end metadata"])
#file.close()
# export Series object to CSV
ser.to_csv(path, mode = 'a', header = True)
else:
# export Series object to CSV
ser.to_csv(path, header = True)
print('Series exported to ' + path)
[docs] @classmethod
def from_csv(cls, path):
'''
Read in Series object from CSV file. Expects a metadata header
dealineated by '###' lines, as written by the Series.to_csv() method.
Parameters
----------
filename : str
name of the file, e.g. 'myrecord.csv'
path : str
directory of the file. Default: current directory, '.'
Returns
-------
Series
pyleoclim Series object containing data and metadata.
See Also
--------
pyleoclim.Series.to_csv
'''
metadata = {}
# read in metadata header
# TODO: improve error handling
with open(path, 'r') as file:
reader_obj = csv.reader(file)
for i, row in enumerate(reader_obj):
if row[0] == '###' and i > 0:
header = i+1
break
else:
metadata[row[0]] = row[1]
# pop superfluous entries
metadata.pop('###')
metadata.pop('written by')
empty = [key for key in metadata.keys() if metadata[key] == '']
for key in empty:
metadata.pop(key)
# make sure log is a tuple
if 'log' in metadata.keys():
metadata['log'] = eval(metadata['log']) # convert string to tuple
# read in data
df = pd.read_csv(path, header=header)
# export to Series
return cls(time=df.iloc[:,0],value=df.iloc[:,1], **metadata)
[docs] def to_json(self, path=None):
"""
Export the pyleoclim.Series object to a json file
Parameters
----------
path : string, optional
The path to the file. The default is None, resulting in a file saved in the current working directory using the label for the dataset as filename if available or 'series.json' if label is not provided.
Returns
-------
None.
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
ts.to_json('soi.json')
"""
if path is None:
path = self.label.replace(" ", "_") + '.json' if self.label is not None else 'series.json'
jsonutils.PyleoObj_to_json(self, path)
[docs] @classmethod
def from_json(cls, path):
''' Creates a pyleoclim.Series from a JSON file
The keys in the JSON file must correspond to the parameter associated with a Series object
Parameters
----------
path : str
Path to the JSON file
Returns
-------
ts : pyleoclim.core.series.Series
A Pyleoclim Series object.
'''
a = jsonutils.open_json(path)
b = jsonutils.iterate_through_dict(a, 'Series')
return cls(**b)
def pandas_method(self, method):
ser = self.to_pandas()
result = method(ser)
if not isinstance(result, pd.Series):
raise ValueError('Given method does not return a pandas Series and cannot be applied')
return self.from_pandas(result, self.metadata)
[docs] def equals(self, ts, index_tol = 5, value_tol = 1e-5):
'''
Test whether two objects contain the same elements (values and datetime_index)
A printout is returned if metadata are different, but the statement is considered
True as long as data match.
Parameters
----------
ts : Series object
The target series for the comparison
index_tol: int, default 5
tolerance on difference in datetime indices (in dtype units, which are seconds by default)
value_tol: float, default 1e-5
tolerance on difference in values (in %)
Returns
-------
same_data: bool
Truth value of the proposition "the two series have the same data".
same_metadata: bool
Truth value of the proposition "the two series have the same metadata".
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
soi = pyleo.utils.load_dataset('SOI')
NINO3 = pyleo.utils.load_dataset('NINO3')
soi.equals(NINO3)
'''
left = self.to_pandas()
right = ts.to_pandas()
if len(left) != len(right): # check that series have the same lengths
print(f"The two series have different lengths, left: {len(left)} vs right: {len(right)}")
same_values = False
same_index = False
else:
# check that the values are the same
try:
same_values = np.allclose(left.values, right.values, rtol=value_tol, equal_nan=True)
if not same_values:
print(f"The two series have values differing by more than {value_tol} {self.value_unit}")
# check that the indices are the same
dt = left.index - right.index
same_index = all(dt.total_seconds() < index_tol)
if not same_index:
print(f"The series have indices differing by more than {index_tol} seconds")
except AssertionError as exp:
print(str(exp))
same_data = same_values & same_index
# check that the metadata are the same
same_metadata = (self.metadata == ts.metadata)
if not same_metadata:
print("Metadata are different:")
for key in self.metadata:
if self.metadata.get(key) != ts.metadata.get(key):
print(f"{key} property -- left: {self.metadata.get(key)}, right: {ts.metadata.get(key)}")
return same_data, same_metadata
[docs] def view(self):
'''
Generates a DataFrame version of the Series object, suitable for viewing in a Jupyter Notebook
Returns
-------
pd.DataFrame
Examples
--------
Plot the HadCRUT5 Global Mean Surface Temperature
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('HadCRUT5')
ts.view()
'''
return self.to_pandas(paleo_style=True).to_frame()
[docs] def convert_time_unit(self, time_unit='ky BP', keep_log=False):
''' Convert the time units of the Series object
Parameters
----------
time_unit : str
the target time unit, possible input:
{
'year', 'years', 'yr', 'yrs', 'CE', 'AD',
'y BP', 'yr BP', 'yrs BP', 'year BP', 'years BP',
'ky BP', 'kyr BP', 'kyrs BP', 'ka BP', 'ka',
'my BP', 'myr BP', 'myrs BP', 'ma BP', 'ma',
}
keep_log : Boolean
if True, adds this step and its parameter to the series log.
Examples
--------
.. jupyter-execute::
ts = pyleo.utils.load_dataset('SOI')
tsBP = ts.convert_time_unit(time_unit='yrs BP')
print('Original timeseries:')
print('time unit:', ts.time_unit)
print('time:', ts.time[:10])
print()
print('Converted timeseries:')
print('time unit:', tsBP.time_unit)
print('time:', tsBP.time[:10])
'''
new_ts = self.copy()
if time_unit is not None:
time_name, time_unit = tsbase.disambiguate_time_metadata(time_unit)
else:
return new_ts
new_time = tsbase.convert_datetime_index_to_time(self.datetime_index, time_unit, time_name)
dt = np.diff(new_time)
if any(dt<=0):
new_value, new_time = tsbase.sort_ts(self.value, new_time)
else:
new_value = self.copy().value
new_ts.time = new_time
new_ts.new_value = new_value
new_ts.time_unit = time_unit
new_ts.time_name = time_name
if keep_log == True:
if new_ts.log is None:
new_ts.log=()
new_ts.log += ({len(new_ts.log):'convert_time_unit', 'time_unit': time_unit},)
return new_ts
[docs] def make_labels(self):
'''
Initialization of plot labels based on Series metadata
Returns
-------
time_header : str
Label for the time axis
value_header : str
Label for the value axis
'''
if self.time_name is not None:
time_name_str = self.time_name
else:
time_name_str = 'time'
if self.value_name is not None:
value_name_str = self.value_name
else:
value_name_str = 'value'
if self.value_unit is not None:
value_header = f'{value_name_str} [{self.value_unit}]'
else:
value_header = f'{value_name_str}'
if self.time_unit is not None:
time_header = f'{time_name_str} [{self.time_unit}]'
else:
time_header = f'{time_name_str}'
return time_header, value_header
[docs] def stats(self):
""" Compute basic statistics from a Series
Computes the mean, median, min, max, standard deviation, and interquartile range of a numpy array y, ignoring NaNs.
Returns
-------
res : dictionary
Contains the mean, median, minimum value, maximum value, standard
deviation, and interquartile range for the Series.
Examples
--------
Compute basic statistics for the SOI series
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
ts.stats()
"""
# TODO: replace with pd.describe()
mean, median, min_, max_, std, IQR = tsutils.simple_stats(self.value)
res={'mean':mean,
'median':median,
'min':min_,
'max':max_,
'std':std,
'IQR': IQR}
return res
[docs] def flip(self, axis='value', keep_log = False):
'''
Flips the Series along one or both axes
Parameters
----------
axis : str, optional
The axis along which the Series will be flipped. The default is 'value'.
Other acceptable options are 'time' or 'both'.
TODO: enable time flipping after paleopandas is released
keep_log : Boolean
if True, adds this transformation to the series log.
Returns
-------
new : Series
The flipped series object
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
tsf = ts.flip(keep_log=True)
fig, ax = tsf.plot()
tsf.log
'''
if self.log is not None:
methods = [self.log[idx][idx] for idx in range(len(self.log))]
if 'flip' in methods:
warnings.warn("this Series' log indicates that it has previously been flipped")
new = self.copy()
if axis == 'value':
new.value = - self.value
new.value_name = new.value_name + ' x (-1)'
else:
print('Flipping is only enabled along the value axis for now')
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log): 'flip', 'applied': True, 'axis': axis},)
return new
[docs] def plot(self, figsize=[10, 4],
marker=None, markersize=None, color=None,
linestyle=None, linewidth=None, xlim=None, ylim=None,
label=None, xlabel=None, ylabel=None, title=None, zorder=None,
legend=True, plot_kwargs=None, lgd_kwargs=None, alpha=None,
savefig_settings=None, ax=None, invert_xaxis=False, invert_yaxis=False):
''' Plot the timeseries
Parameters
----------
figsize : list
a list of two integers indicating the figure size
marker : str
e.g., 'o' for dots
See [matplotlib.markers](https://matplotlib.org/stable/api/markers_api.html) for details
markersize : float
the size of the marker
color : str, list
the color for the line plot
e.g., 'r' for red
See [matplotlib colors](https://matplotlib.org/stable/gallery/color/color_demo.html) for details
linestyle : str
e.g., '--' for dashed line
See [matplotlib.linestyles](https://matplotlib.org/stable/gallery/lines_bars_and_markers/linestyles.html) for details
linewidth : float
the width of the line
label : str
the label for the line
xlabel : str
the label for the x-axis
ylabel : str
the label for the y-axis
title : str
the title for the figure
zorder : int
The default drawing order for all lines on the plot
legend : {True, False}
plot legend or not
invert_xaxis : bool, optional
if True, the x-axis of the plot will be inverted
invert_yaxis : bool, optional
same for the y-axis
plot_kwargs : dict
the dictionary of keyword arguments for ax.plot()
See [matplotlib.pyplot.plot](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.plot.html) for details
lgd_kwargs : dict
the dictionary of keyword arguments for ax.legend()
See [matplotlib.pyplot.legend](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.legend.html) for details
alpha : float
Transparency setting
savefig_settings : dict
the dictionary of arguments for plt.savefig(); some notes below:
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
ax : matplotlib.axis, optional
the axis object from matplotlib
See [matplotlib.axes](https://matplotlib.org/api/axes_api.html) for details.
Returns
-------
fig : matplotlib.figure
the figure object from matplotlib
See [matplotlib.pyplot.figure](https://matplotlib.org/stable/api/figure_api.html) for details.
ax : matplotlib.axis
the axis object from matplotlib
See [matplotlib.axes](https://matplotlib.org/stable/api/axes_api.html) for details.
Notes
-----
When `ax` is passed, the return will be `ax` only; otherwise, both `fig` and `ax` will be returned.
See also
--------
pyleoclim.utils.plotting.savefig : saving a figure in Pyleoclim
Examples
--------
Plot the SOI record
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
fig, ax = ts.plot()
Change the line color
.. jupyter-execute::
fig, ax = ts.plot(color='r')
Save the figure. Two options available, only one is needed:
* Within the plotting command
* After the figure has been generated
.. jupyter-execute::
fig, ax = ts.plot(color='k', savefig_settings={'path': 'ts_plot3.png'}); pyleo.closefig(fig)
pyleo.savefig(fig,path='ts_plot3.png')
'''
# generate default axis labels
time_label, value_label = self.make_labels()
if xlabel is None:
xlabel = time_label
if ylabel is None:
ylabel = value_label
plot_kwargs = {} if plot_kwargs is None else plot_kwargs.copy()
if label is None:
label = self.label
if label is not None:
plot_kwargs.update({'label': label})
if marker is not None:
plot_kwargs.update({'marker': marker})
if markersize is not None:
plot_kwargs.update({'markersize': markersize})
if color is not None:
plot_kwargs.update({'color': color})
if linestyle is not None:
plot_kwargs.update({'linestyle': linestyle})
if linewidth is not None:
plot_kwargs.update({'linewidth': linewidth})
if alpha is not None:
plot_kwargs.update({'alpha': alpha})
if zorder is not None:
plot_kwargs.update({'zorder': zorder})
res = plotting.plot_xy(
self.time, self.value,
figsize=figsize, xlabel=xlabel, ylabel=ylabel,
title=title, savefig_settings=savefig_settings,
ax=ax, legend=legend, xlim=xlim, ylim=ylim,
plot_kwargs=plot_kwargs, lgd_kwargs=lgd_kwargs,
invert_xaxis=invert_xaxis, invert_yaxis=invert_yaxis
)
return res
[docs] def stripes(self, figsize=[8, 1], cmap = 'RdBu_r', ref_period=None,
sat=1.0, top_label=None, bottom_label=None,
label_color = 'gray', label_size = None, xlim=None,
xlabel=None, savefig_settings=None, ax=None, invert_xaxis=False,
show_xaxis=False, x_offset = 0.03):
'''Represents the Series as an Ed Hawkins "stripes" pattern
Credit: https://matplotlib.org/matplotblog/posts/warming-stripes/
Parameters
----------
ref_period : array-like (2-elements)
dates of the reference period, in the form "(first, last)"
figsize : list
a list of two integers indicating the figure size (in inches)
cmap: str
colormap name (https://matplotlib.org/stable/tutorials/colors/colormaps.html)
sat : float > 0
Controls the saturation of the colormap normalization by scaling the vmin, vmax in https://matplotlib.org/stable/tutorials/colors/colormapnorms.html
default = 0.9
xlim : list
time axis limits
top_label : str
the "title" label for the stripe
bottom_label : str
the "ylabel" explaining which variable is being plotted
invert_xaxis : bool, optional
if True, the x-axis of the plot will be inverted
x_offset : float
value controlling the horizontal offset between stripes and labels (default = 0.05)
show_xaxis : bool
flag indicating whether or not the x-axis should be shown (default = False)
savefig_settings : dict
the dictionary of arguments for plt.savefig(); some notes below:
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
ax : matplotlib.axis, optional
the axis object from matplotlib
See [matplotlib.axes](https://matplotlib.org/api/axes_api.html) for details.
Returns
-------
fig : matplotlib.figure
the figure object from matplotlib
See [matplotlib.pyplot.figure](https://matplotlib.org/stable/api/figure_api.html) for details.
ax : matplotlib.axis
the axis object from matplotlib
See [matplotlib.axes](https://matplotlib.org/stable/api/axes_api.html) for details.
Notes
-----
When `ax` is passed, the return will be `ax` only; otherwise, both `fig` and `ax` will be returned.
See also
--------
pyleoclim.utils.plotting.stripes : stripes representation of a timeseries
pyleoclim.utils.plotting.savefig : saving a figure in Pyleoclim
Examples
--------
Plot the HadCRUT5 Global Mean Surface Temperature
.. jupyter-execute::
gmst = pyleo.utils.load_dataset('HadCRUT5')
fig, ax = gmst.stripes(ref_period=(1971,2000))
For a more pastel tone, dial down saturation:
.. jupyter-execute::
fig, ax = gmst.stripes(ref_period=(1971,2000), sat = 0.8)
To change the colormap:
.. jupyter-execute::
fig, ax = gmst.stripes(ref_period=(1971,2000), cmap='Spectral_r')
fig, ax = gmst.stripes(ref_period=(1971,2000), cmap='magma_r')
To show the time axis:
.. jupyter-execute::
fig, ax = gmst.stripes(ref_period=(1971,2000), show_xaxis=True)
'''
if top_label is None:
top_label = self.label
if bottom_label is None:
bottom_label = self.value_name
if ref_period is None:
ref_period = [self.time.min(), self.time.max()]
if sat <= 0:
raise ValueError('sat must be a strictly positive number, ideally close to unity.')
# Ed Hawkins says: Currently I use HadCRUT5 with a 1971-2000 baseline
# and a colour scaling of +/- 0.75K (which is probably similar to LIM).
# It should be relatively simple to duplicate the stripes exactly
# center and normalize values for proper display
yc = self.center(timespan=ref_period).value
vmax = np.abs(yc).max()/sat
time_label, _ = self.make_labels()
res = plotting.stripes_xy(x=self.time, y=yc, cmap=cmap, vmin=-vmax, vmax=vmax,
top_label = top_label, bottom_label = bottom_label, label_color = label_color,
figsize=figsize, ax=ax, xlim=xlim, invert_xaxis=invert_xaxis,
label_size=label_size, xlabel = time_label, x_offset = x_offset,
savefig_settings=savefig_settings, show_xaxis=show_xaxis,
)
return res
[docs] def ssa(self, M=None, nMC=0, f=0.3, trunc = None, var_thresh=80, online=True):
''' Singular Spectrum Analysis
Nonparametric, orthogonal decomposition of timeseries into constituent oscillations.
This implementation uses the method of [1], with applications presented in [2].
Optionally (MC>0), the significance of eigenvalues is assessed by Monte-Carlo simulations of an AR(1) model fit to X, using [3].
The method expects regular spacing, but is tolerant to missing values, up to a fraction 0<f<1 (see [4]).
Parameters
----------
M : int, optional
window size. The default is None (10% of the length of the series).
MC : int, optional
Number of iteration in the Monte-Carlo process. The default is 0.
f : float, optional
maximum allowable fraction of missing values. The default is 0.3.
trunc : str
if present, truncates the expansion to a level K < M owing to one of 4 criteria:
(1) 'kaiser': variant of the Kaiser-Guttman rule, retaining eigenvalues larger than the median
(2) 'mcssa': Monte-Carlo SSA (use modes above the 95% quantile from an AR(1) process)
(3) 'var': first K modes that explain at least var_thresh % of the variance.
Default is None, which bypasses truncation (K = M)
(4) 'knee': Wherever the "knee" of the screeplot occurs.
Recommended as a first pass at identifying significant modes as it tends to be more robust than 'kaiser' or 'var', and faster than 'mcssa'.
While no truncation method is imposed by default, if the goal is to enhance the S/N ratio and reconstruct a smooth version of the attractor's skeleton,
then the knee-finding method is a good compromise between objectivity and efficiency.
See kneed's `documentation <https://kneed.readthedocs.io/en/latest/index.html>`_ for more details on the knee finding algorithm.
var_thresh : float
variance threshold for reconstruction (only impactful if trunc is set to 'var')
online : bool; {True,False}
Whether or not to conduct knee finding analysis online or offline.
Only called when trunc = 'knee'. Default is True
See kneed's `documentation <https://kneed.readthedocs.io/en/latest/api.html#kneelocator>`_ for details.
Returns
-------
res : object of the SsaRes class containing:
eigvals : (M, ) array of eigenvalues
eigvecs : (M, M) Matrix of temporal eigenvectors (T-EOFs)
PC : (N - M + 1, M) array of principal components (T-PCs)
RCmat : (N, M) array of reconstructed components
RCseries : (N,) reconstructed series, with mean and variance restored (same type as original)
pctvar: (M, ) array of the fraction of variance (%) associated with each mode
eigvals_q : (M, 2) array contaitning the 5% and 95% quantiles of the Monte-Carlo eigenvalue spectrum [ if nMC >0 ]
References
----------
[1]_ Vautard, R., and M. Ghil (1989), Singular spectrum analysis in nonlinear
dynamics, with applications to paleoclimatic time series, Physica D, 35,
395–424.
[2]_ Ghil, M., R. M. Allen, M. D. Dettinger, K. Ide, D. Kondrashov, M. E. Mann,
A. Robertson, A. Saunders, Y. Tian, F. Varadi, and P. Yiou (2002),
Advanced spectral methods for climatic time series, Rev. Geophys., 40(1),
1003–1052, doi:10.1029/2000RG000092.
[3]_ Allen, M. R., and L. A. Smith (1996), Monte Carlo SSA: Detecting irregular
oscillations in the presence of coloured noise, J. Clim., 9, 3373–3404.
[4]_ Schoellhamer, D. H. (2001), Singular spectrum analysis for time series with
missing data, Geophysical Research Letters, 28(16), 3187–3190, doi:10.1029/2000GL012698.
See also
--------
pyleoclim.core.utils.decomposition.ssa : Singular Spectrum Analysis utility
pyleoclim.core.ssares.SsaRes.modeplot : plot SSA modes
pyleoclim.core.ssares.SsaRes.screeplot : plot SSA eigenvalue spectrum
Examples
--------
SSA with SOI
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
fig, ax = ts.plot()
nino_ssa = ts.ssa(M=60)
Let us now see how to make use of all these arrays. The first step is to inspect the eigenvalue spectrum ("scree plot") to identify remarkable modes. Let us restrict ourselves to the first 40, so we can see something:
.. jupyter-execute::
fig, ax = nino_ssa.screeplot()
This highlights a few common phenomena with SSA:
* the eigenvalues are in descending order
* their uncertainties are proportional to the eigenvalues themselves
* the eigenvalues tend to come in pairs : (1,2) (3,4), are all clustered within uncertainties . (5,6) looks like another doublet
* around i=15, the eigenvalues appear to reach a floor, and all subsequent eigenvalues explain a very small amount of variance.
So, summing the variance of the first 14 modes, we get:
.. jupyter-execute::
print(nino_ssa.pctvar[:14].sum())
That is a typical result for a (paleo)climate timeseries; a few modes do the vast majority of the work. That means we can focus our attention on these modes and capture most of the interesting behavior. To see this, let's use the reconstructed components (RCs), and sum the RC matrix over the first 14 columns:
.. jupyter-execute::
RCmat = nino_ssa.RCmat[:,:14]
RCk = (RCmat-RCmat.mean()).sum(axis=1) + ts.value.mean()
fig, ax = ts.plot(title='SOI')
ax.plot(nino_ssa.orig.time,RCk,label='SSA reconstruction, 14 modes',color='orange')
ax.legend()
Indeed, these first few modes capture the vast majority of the low-frequency behavior, including all the El Niño/La Niña events. What is left (the blue wiggles not captured in the orange curve) are high-frequency oscillations that might be considered "noise" from the standpoint of ENSO dynamics. This illustrates how SSA might be used for filtering a timeseries. One must be careful however:
* there was not much rhyme or reason for picking 14 modes. Why not 5, or 39? All we have seen so far is that they gather >95% of the variance, which is by no means a magic number.
* there is no guarantee that the first few modes will filter out high-frequency behavior, or at what frequency cutoff they will do so. If you need to cut out specific frequencies, you are better off doing it with a classical filter, like the butterworth filter implemented in Pyleoclim. However, in many instances the choice of a cutoff frequency is itself rather arbitrary. In such cases, SSA provides a principled alternative for generating a version of a timeseries that preserves features and excludes others (i.e, a filter).
* as with all orthgonal decompositions, summing over all RCs will recover the original signal within numerical precision.
Monte-Carlo SSA
Selecting meaningful modes in eigenproblems (e.g. EOF analysis) is more art than science. However, one technique stands out: Monte Carlo SSA, introduced by Allen & Smith, (1996) to identify SSA modes that rise above what one would expect from "red noise", specifically an AR(1) process). To run it, simply provide the parameter MC, ideally with a number of iterations sufficient to get decent statistics. Here let's use MC = 1000. The result will be stored in the eigval_q array, which has the same length as eigval, and its two columns contain the 5% and 95% quantiles of the ensemble of MC-SSA eigenvalues.
.. jupyter-execute::
nino_mcssa = ts.ssa(M = 60, nMC=1000)
Now let's look at the result:
.. jupyter-execute::
fig, ax = nino_mcssa.screeplot()
print('Indices of modes retained: '+ str(nino_mcssa.mode_idx))
This suggests that modes 1-5 fall above the red noise benchmark, but so do a few others. To inspect mode 1 (index 0), just type:
.. jupyter-execute::
fig, ax = nino_mcssa.modeplot(index=0)
To inspect the reconstructed series, simply do:
.. jupyter-execute::
fig, ax = ts.plot()
nino_mcssa.RCseries.plot(ax=ax)
For other truncation methods, see http://linked.earth/PyleoTutorials/notebooks/L2_singular_spectrum_analysis.html
'''
res = decomposition.ssa(self.value, M=M, nMC=nMC, f=f, trunc = trunc, var_thresh=var_thresh, online=online)
RCseries = self.copy()
RCseries.value = res['RCseries']
if trunc is not None:
RCseries.label = 'SSA reconstruction (' + trunc + ')'
else:
RCseries.label = 'SSA reconstruction'
resc = SsaRes(label=self.label, orig=self, eigvals = res['eigvals'], eigvecs = res['eigvecs'],
pctvar = res['pctvar'], PC = res['PC'], RCmat = res['RCmat'],
RCseries=RCseries, mode_idx=res['mode_idx'])
if nMC >= 0:
resc.eigvals_q=res['eigvals_q'] # assign eigenvalue quantiles if Monte-Carlo SSA was called
return resc
[docs] def is_evenly_spaced(self, tol=1e-3):
'''Check if the Series time axis is evenly-spaced, within tolerance
Parameters
----------
tol : float
tolerance. If time increments are all within tolerance, the series
is declared evenly-spaced. default = 1e-3
Returns
-------
res : bool
'''
res = tsbase.is_evenly_spaced(self.time, tol)
return res
[docs] def filter(self, cutoff_freq=None, cutoff_scale=None, method='butterworth', keep_log= False, **kwargs):
''' Filtering methods for Series objects using four possible methods:
- `Butterworth <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html>`_
- `Lanczos <http://scitools.org.uk/iris/docs/v1.2/examples/graphics/SOI_filtering.html>`_
- `Finite Impulse Response <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.firwin.html>`_
- `Savitzky-Golay filter <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_
By default, this method implements a lowpass filter, though it can easily
be turned into a bandpass or high-pass filter (see examples below).
Parameters
----------
method : str, {'savitzky-golay', 'butterworth', 'firwin', 'lanczos'}
the filtering method
- 'butterworth': a Butterworth filter (default = 3rd order)
- 'savitzky-golay': Savitzky-Golay filter
- 'firwin': finite impulse response filter design using the window method, with default window as Hamming
- 'lanczos': Lanczos zero-phase filter
cutoff_freq : float or list
The cutoff frequency only works with the Butterworth method.
If a float, it is interpreted as a low-frequency cutoff (lowpass).
If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass).
Note that only the Butterworth option (default) currently supports bandpass filtering.
cutoff_scale : float or list
cutoff_freq = 1 / cutoff_scale
The cutoff scale only works with the Butterworth method and when cutoff_freq is None.
If a float, it is interpreted as a low-frequency (high-scale) cutoff (lowpass).
If a list, it is interpreted as a frequency band (f1, f2), with f1 < f2 (bandpass).
keep_log : Boolean
if True, adds this step and its parameters to the series log.
kwargs : dict
a dictionary of the keyword arguments for the filtering method,
see `pyleoclim.utils.filter.savitzky_golay`, `pyleoclim.utils.filter.butterworth`, `pyleoclim.utils.filter.lanczos` and `pyleoclim.utils.filter.firwin` for the details
Returns
-------
new : Series
See also
--------
pyleoclim.utils.filter.butterworth : Butterworth method
pyleoclim.utils.filter.savitzky_golay : Savitzky-Golay method
pyleoclim.utils.filter.firwin : FIR filter design using the window method
pyleoclim.utils.filter.lanczos : lowpass filter via Lanczos resampling
Examples
--------
In the example below, we generate a signal as the sum of two signals with frequency 10 Hz and 20 Hz, respectively.
Then we apply a low-pass filter with a cutoff frequency at 15 Hz, and compare the output to the signal of 10 Hz.
After that, we apply a band-pass filter with the band 15-25 Hz, and compare the outcome to the signal of 20 Hz.
- Generating the test data
.. jupyter-execute::
import pyleoclim as pyleo
import numpy as np
t = np.linspace(0, 1, 1000)
sig1 = np.sin(2*np.pi*10*t)
sig2 = np.sin(2*np.pi*20*t)
sig = sig1 + sig2
ts1 = pyleo.Series(time=t, value=sig1)
ts2 = pyleo.Series(time=t, value=sig2)
ts = pyleo.Series(time=t, value=sig)
fig, ax = ts.plot(label='mix')
ts1.plot(ax=ax, label='10 Hz')
ts2.plot(ax=ax, label='20 Hz')
ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3)
- Applying a low-pass filter
.. jupyter-execute::
fig, ax = ts.plot(label='mix')
ts.filter(cutoff_freq=15).plot(ax=ax, label='After 15 Hz low-pass filter')
ts1.plot(ax=ax, label='10 Hz')
ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3)
- Applying a band-pass filter
.. jupyter-execute::
fig, ax = ts.plot(label='mix')
ts.filter(cutoff_freq=[15, 25]).plot(ax=ax, label='After 15-25 Hz band-pass filter')
ts2.plot(ax=ax, label='20 Hz')
ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3)
Above is using the default Butterworth filtering. To use FIR filtering with a window like Hanning is also simple:
.. jupyter-execute::
fig, ax = ts.plot(label='mix')
ts.filter(cutoff_freq=[15, 25], method='firwin', window='hanning').plot(ax=ax, label='After 15-25 Hz band-pass filter')
ts2.plot(ax=ax, label='20 Hz')
ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3)
- Applying a high-pass filter
.. jupyter-execute::
fig, ax = ts.plot(label='mix')
ts_low = ts.filter(cutoff_freq=15)
ts_high = ts.copy()
ts_high.value = ts.value - ts_low.value # subtract low-pass filtered series from original one
ts_high.plot(label='High-pass filter @ 15Hz',ax=ax)
ax.legend(loc='upper left', bbox_to_anchor=(0, 1.1), ncol=3)
'''
if not self.is_evenly_spaced():
raise ValueError('This method assumes evenly-spaced timeseries, while the input is not. Use the ".interp()", ".bin()" or ".gkernel()" methods prior to ".filter()".')
new = self.copy()
mu = np.mean(self.value) # extract the mean
y = self.value - mu
fs = 1/np.mean(np.diff(self.time))
method_func = {
'savitzky-golay': filterutils.savitzky_golay,
'butterworth': filterutils.butterworth,
'firwin': filterutils.firwin,
'lanczos': filterutils.lanczos,
}
if method not in method_func.keys():
raise ValueError('Method value is not an appropriate method for filters')
args = {}
if method in ['butterworth', 'firwin', 'lanczos']:
if cutoff_freq is None:
if cutoff_scale is None:
raise ValueError('Please set the cutoff frequency or scale argument: "cutoff_freq" or "cutoff_scale".')
else:
if np.isscalar(cutoff_scale):
cutoff_freq = 1 / cutoff_scale
elif len(cutoff_scale) == 2 and method in ['butterworth', 'firwin']:
cutoff_scale = np.array(cutoff_scale)
cutoff_freq = np.sort(1 / cutoff_scale)
cutoff_freq = list(cutoff_freq)
elif len(cutoff_scale) > 1 and method == 'lanczos':
raise ValueError('Lanczos filter requires a scalar input as cutoff scale/frequency')
else:
raise ValueError('Wrong cutoff_scale; should be either one float value (lowpass) or a list two float values (bandpass).')
# assign optional arguments
args['butterworth'] = {'fc': cutoff_freq, 'fs': fs}
args['firwin'] = {'fc': cutoff_freq, 'fs': fs}
args['lanczos'] = {'fc': cutoff_freq, 'fs': fs}
else: # for Savitzky-Golay only
if cutoff_scale is None and cutoff_freq is None:
raise ValueError('No cutoff_scale or cutoff_freq argument provided')
elif cutoff_freq is not None:
cutoff_scale = 1 / cutoff_freq
window_length = int(cutoff_scale*fs)
if window_length % 2 == 0:
window_length += 1 # window length needs to be an odd integer
args['savitzky-golay'] = {'window_length': window_length}
args[method].update(kwargs)
new_val = method_func[method](y, **args[method])
new.value = new_val + mu # restore the mean
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log): 'filter','method': method, 'args': kwargs, 'fs': fs, 'cutoff_freq': cutoff_freq},)
return new
[docs] def histplot(self, figsize=[10, 4], title=None, savefig_settings=None,
ax=None, ylabel='KDE', vertical=False, edgecolor='w', **plot_kwargs):
''' Plot the distribution of the timeseries values
Parameters
----------
figsize : list
a list of two integers indicating the figure size
title : str
the title for the figure
savefig_settings : dict
the dictionary of arguments for plt.savefig(); some notes below:
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
ax : matplotlib.axis, optional
A matplotlib axis
ylabel : str
Label for the count axis
vertical : {True,False}
Whether to flip the plot vertically
edgecolor : matplotlib.color
The color of the edges of the bar
plot_kwargs : dict
Plotting arguments for seaborn histplot: https://seaborn.pydata.org/generated/seaborn.histplot.html
See also
--------
pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim
Examples
--------
Distribution of the SOI record
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
fig, ax = ts.plot()
fig, ax = ts.histplot()
'''
savefig_settings = {} if savefig_settings is None else savefig_settings.copy()
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
#make the data into a dataframe so we can flip the figure
time_label, value_label = self.make_labels()
if vertical == True:
data=pd.DataFrame({'value':self.value})
ax = sns.histplot(data=data, y="value", ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs)
ax.set_ylabel(value_label)
ax.set_xlabel(ylabel)
else:
ax = sns.histplot(self.value, ax=ax, kde=True, edgecolor=edgecolor, **plot_kwargs)
ax.set_xlabel(value_label)
ax.set_ylabel(ylabel)
if title is not None:
ax.set_title(title)
if 'fig' in locals():
if 'path' in savefig_settings:
plotting.savefig(fig, settings=savefig_settings)
return fig, ax
else:
return ax
# def distplot(self, figsize=[10, 4], title=None, savefig_settings=None,
# ax=None, ylabel='KDE', vertical=False, edgecolor='w', **plot_kwargs):
# ''' Plot the distribution of the timeseries values
# [legacy only ; please use histplot() instead]
# Parameters
# ----------
# figsize : list
# a list of two integers indicating the figure size
# title : str
# the title for the figure
# savefig_settings : dict
# the dictionary of arguments for plt.savefig(); some notes below:
# - "path" must be specified; it can be any existed or non-existed path,
# with or without a suffix; if the suffix is not given in "path", it will follow "format"
# - "format" can be one of {"pdf", "eps", "png", "ps"}
# ax : matplotlib.axis, optional
# A matplotlib axis
# ylabel : str
# Label for the count axis
# vertical : {True,False}
# Whether to flip the plot vertically
# edgecolor : matplotlib.color
# The color of the edges of the bar
# plot_kwargs : dict
# Plotting arguments for seaborn histplot: https://seaborn.pydata.org/generated/seaborn.histplot.html
# See also
# --------
# pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim
# Examples
# --------
# Distribution of the SOI record
# .. ipython:: python
# :okwarning:
# :okexcept:
# import pyleoclim as pyleo
# import pandas as pd
# data=pd.read_csv('https://raw.githubusercontent.com/LinkedEarth/Pyleoclim_util/Development/example_data/soi_data.csv',skiprows=0,header=1)
# time=data.iloc[:,1]
# value=data.iloc[:,2]
# ts=pyleo.Series(time=time,value=value,time_name='Year C.E', value_name='SOI', label='SOI')
# fig, ax = ts.plot()
# fig, ax = ts.distplot()
# '''
# warnings.warn(
# "Distplot is deprecated. Function has been renamed histplot in order to maintain consistency with seaborn terminology",
# DeprecationWarning,
# stacklevel=2)
# return self.histplot(figsize, title, savefig_settings, ax, ylabel, vertical, edgecolor, **plot_kwargs)
[docs] def summary_plot(self, psd, scalogram, figsize=[8, 10], title=None,
time_lim=None, value_lim=None, period_lim=None, psd_lim=None,
time_label=None, value_label=None, period_label=None, psd_label=None,
ts_plot_kwargs = None, wavelet_plot_kwargs = None,
psd_plot_kwargs = None, gridspec_kwargs = None, y_label_loc = None,
legend = None, savefig_settings=None):
''' Produce summary plot of timeseries.
Generate cohesive plot of timeseries alongside results of wavelet analysis and spectral analysis on said timeseries.
Requires wavelet and spectral analysis to be conducted outside of plotting function, psd and scalogram must be passed as arguments.
Parameters
----------
psd : PSD
the PSD object of a Series.
scalogram : Scalogram
the Scalogram object of a Series.
If the passed scalogram object contains stored signif_scals these will be plotted.
figsize : list
a list of two integers indicating the figure size
title : str
the title for the figure
time_lim : list or tuple
the limitation of the time axis. This is for display purposes only, the scalogram and psd will still be calculated using the full time series.
value_lim : list or tuple
the limitation of the value axis of the timeseries. This is for display purposes only, the scalogram and psd will still be calculated using the full time series.
period_lim : list or tuple
the limitation of the period axis
psd_lim : list or tuple
the limitation of the psd axis
time_label : str
the label for the time axis
value_label : str
the label for the value axis of the timeseries
period_label : str
the label for the period axis
psd_label : str
the label for the amplitude axis of PDS
legend : bool
if set to True, a legend will be added to the open space above the psd plot
ts_plot_kwargs : dict
arguments to be passed to the timeseries subplot, see Series.plot for details
wavelet_plot_kwargs : dict
arguments to be passed to the scalogram plot, see pyleoclim.Scalogram.plot for details
psd_plot_kwargs : dict
arguments to be passed to the psd plot, see PSD.plot for details
Certain psd plot settings are required by summary plot formatting. These include:
- ylabel
- legend
- tick parameters
These will be overriden by summary plot to prevent formatting errors
gridspec_kwargs : dict
arguments used to build the specifications for gridspec configuration
The plot is constructed with six slots:
- slot [0] contains a subgridspec containing the timeseries and scalogram (shared x axis)
- slot [1] contains a subgridspec containing an empty slot and the PSD plot (shared y axis with scalogram)
- slot [2] and slot [3] are empty to allow ample room for xlabels for the scalogram and PSD plots
- slot [4] contains the scalogram color bar
- slot [5] is empty
It is possible to tune the size and spacing of the various slots:
- 'width_ratios': list of two values describing the relative widths of the column containig the timeseries/scalogram/colorbar and the column containig the PSD plot (default: [6, 1])
- 'height_ratios': list of three values describing the relative heights of the three timeseries, scalogram and colorbar (default: [2, 7, .35])
- 'hspace': vertical space between timeseries and scalogram (default: 0, however if either the scalogram xlabel or the PSD xlabel contain '\\n', .05)
- 'wspace': lateral space between scalogram and psd plot (default: 0)
- 'cbspace': vertical space between the scalogram and colorbar
y_label_loc : float
Plot parameter to adjust horizontal location of y labels to avoid conflict with axis labels, default value is -0.15
savefig_settings : dict
the dictionary of arguments for plt.savefig(); some notes below:
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
See also
--------
pyleoclim.core.series.Series.spectral : Spectral analysis for a timeseries
pyleoclim.core.series.Series.wavelet : Wavelet analysis for a timeseries
pyleoclim.utils.plotting.savefig : saving figure in Pyleoclim
pyleoclim.core.psds.PSD : PSD object
pyleoclim.core.psds.MultiplePSD : Multiple PSD object
Examples
--------
Summary_plot with pre-generated psd and scalogram objects. Note that if the scalogram contains saved noise realizations these will be flexibly reused. See pyleo.Scalogram.signif_test() for details
.. jupyter-execute::
import pyleoclim as pyleo
series = pyleo.utils.load_dataset('SOI')
psd = series.spectral(freq_method = 'welch')
scalogram = series.wavelet(freq_method = 'welch')
fig, ax = series.summary_plot(psd = psd,scalogram = scalogram)
Summary_plot with pre-generated psd and scalogram objects from before and some plot modification arguments passed. Note that if the scalogram contains saved noise realizations these will be flexibly reused. See pyleo.Scalogram.signif_test() for details
.. jupyter-execute::
import pyleoclim as pyleo
series = pyleo.utils.load_dataset('SOI')
psd = series.spectral(freq_method = 'welch')
scalogram = series.wavelet(freq_method = 'welch')
fig, ax = series.summary_plot(psd = psd,scalogram = scalogram, period_lim = [5,0], ts_plot_kwargs = {'color':'red','linewidth':.5}, psd_plot_kwargs = {'color':'red','linewidth':.5})
'''
savefig_settings = {} if savefig_settings is None else savefig_settings.copy()
wavelet_plot_kwargs = {} if wavelet_plot_kwargs is None else wavelet_plot_kwargs.copy()
psd_plot_kwargs = {} if psd_plot_kwargs is None else psd_plot_kwargs.copy()
ts_plot_kwargs = {} if ts_plot_kwargs is None else ts_plot_kwargs.copy()
gridspec_kwargs = {} if gridspec_kwargs is None else gridspec_kwargs.copy()
# spacing
if (type(psd_label) == str and '\n' in psd_label) or (psd_label is None):
gridspec_kwargs_default = {'width_ratios': [6, 1],
# 'height_ratios': [8, 1, .35],
'height_ratios': [2,7,.35],
'hspace': 0.05, 'wspace': 0.05,
'cbspace':1}
else:
gridspec_kwargs_default = {'width_ratios': [6, 1],
# 'height_ratios': [8, 1, .35],
'height_ratios': [2,7,.35],
'hspace': 0, 'wspace': 0,
'cbspace':1}
for key in gridspec_kwargs_default:
if key not in gridspec_kwargs.keys():
gridspec_kwargs[key] = gridspec_kwargs_default[key]
ts_height = gridspec_kwargs['height_ratios'][0]
scal_height = gridspec_kwargs['height_ratios'][1]
cb_height = gridspec_kwargs['height_ratios'][2]
psd_width = gridspec_kwargs['width_ratios'][1]
scal_width = gridspec_kwargs['width_ratios'][0]
if 'cbspace' in gridspec_kwargs.keys():
cb_space = gridspec_kwargs['cbspace']
else:
cb_space = 1
gridspec_kwargs['height_ratios'] = [ts_height+scal_height, cb_space, cb_height]
del gridspec_kwargs['cbspace']
fig = plt.figure(constrained_layout=False, figsize=figsize)
gs = fig.add_gridspec(3, 2, **gridspec_kwargs)
# fig = plt.figure(figsize=figsize)
# gs = gridspec.GridSpec(6, 12)
# gs.update(wspace=0, hspace=0)
#
# gs0 = fig.add_gridspec(3, 2, width_ratios=[6, 1], height_ratios=[8, 1, .35],
# hspace=0, wspace=0.1)
# Subgridspecs
#Let's use the same hspace/wspace if given to a user
gs_d = {}
gs_d['ts_scal'] = gs[0].subgridspec(2, 1, height_ratios=[ts_height, scal_height], hspace=gridspec_kwargs['hspace'])
gs_d['psd'] = gs[1].subgridspec(2, 1, height_ratios=[ts_height, scal_height], hspace=gridspec_kwargs['hspace'])
# gs_d['ts_scal'] = gs[0].subgridspec(2, 1, height_ratios=[1, 4], hspace=gridspec_kwargs['hspace'])
# gs_d['psd'] = gs[1].subgridspec(2, 1, height_ratios=[1, 4], hspace=gridspec_kwargs['hspace'])
gs_d['cb'] = gs[4].subgridspec(1, 1)
ax = {}
### Time series
ax['ts'] = fig.add_subplot(gs_d['ts_scal'][0, 0])
ax['ts'] = self.plot(ax=ax['ts'], **ts_plot_kwargs)
if time_lim is not None:
ax['ts'].set_xlim(time_lim)
if 'xlim' in ts_plot_kwargs:
print(
'Xlim passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
if value_lim is not None:
ax['ts'].set_ylim(value_lim)
if 'ylim' in ts_plot_kwargs:
print(
'Ylim passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
if title is not None:
ax['ts'].set_title(title)
if 'title' in ts_plot_kwargs:
print(
'Title passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
if value_label is not None:
# time_label, value_label = self.make_labels()
ax['ts'].set_ylabel(value_label)
if 'ylabel' in ts_plot_kwargs:
print(
'Ylabel passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
ax['ts'].xaxis.label.set_visible(False)
ax['ts'].tick_params(axis='x', direction='in')#, labelleft=False)
# ax = {}
# ax['ts'] = plt.subplot(gs[0:1, :-3])
# ax['ts'] = self.plot(ax=ax['ts'], **ts_plot_kwargs)
# ax['ts'].xaxis.set_visible(False)
# ax['ts'].get_yaxis().set_label_coords(y_label_loc,0.5)
#
# if time_lim is not None:
# ax['ts'].set_xlim(time_lim)
# if 'xlim' in ts_plot_kwargs:
# print('Xlim passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# if value_lim is not None:
# ax['ts'].set_ylim(value_lim)
# if 'ylim' in ts_plot_kwargs:
# print('Ylim passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
### Scalogram
ax['scal'] = fig.add_subplot(gs_d['ts_scal'][1, 0], sharex=ax['ts'])
# Need variable for plotting purposes
if 'variable' not in wavelet_plot_kwargs:
wavelet_plot_kwargs.update({'variable': 'amplitude'})
if 'title' not in wavelet_plot_kwargs:
wavelet_plot_kwargs.update({'title': None})
if 'cbar_style' not in wavelet_plot_kwargs:
wavelet_plot_kwargs.update({'cbar_style': {'orientation': 'horizontal', 'pad': 0.12,
'label': scalogram.wave_method + ' '+ wavelet_plot_kwargs['variable'].capitalize()}})
else:
orient = 'horizontal'
# I think padding is now the hspace
# if 'pad' in wavelet_plot_kwargs['cbar_style']:
# pad = wavelet_plot_kwargs['cbar_style']['pad']
# else:
# pad = 0.12
if 'label' in wavelet_plot_kwargs['cbar_style']:
label = wavelet_plot_kwargs['cbar_style']['label']
else:
label = wavelet_plot_kwargs['variable'].capitalize() + ' from ' + scalogram.wave_method
wavelet_plot_kwargs.update({'cbar_style': {'orientation': orient,
'label': label,
# 'pad': pad,
}})
wavelet_plot_kwargs['cbar_style']['drawedges'] = True
# Do not plot colorbar in scalogram
wavelet_plot_kwargs['plot_cb'] = False
# Plot scalogram
ax['scal'] = scalogram.plot(ax=ax['scal'], **wavelet_plot_kwargs)
if y_label_loc is not None:
ax['scal'].get_yaxis().set_label_coords(y_label_loc, 0.5)
if period_lim is not None:
ax['scal'].set_ylim(period_lim)
if 'ylim' in wavelet_plot_kwargs.keys():
print(
'Ylim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
if time_label is not None:
ax['scal'].set_xlabel(time_label)
if 'xlabel' in wavelet_plot_kwargs:
print(
'Xlabel passed to scalogram plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
if period_label is not None:
# period_unit = infer_period_unit_from_time_unit(self.time_unit)
# period_label = f'Period [{period_unit}]' if period_unit is not None else 'Period'
ax['scal'].set_ylabel(period_label)
if 'ylabel' in wavelet_plot_kwargs:
print(
'Ylabel passed to scalogram plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
ax['scal'].set_title(None)
xticks = ax['scal'].get_xticks()
midpoints = xticks[:-1] + np.diff(xticks) / 2
ax['scal'].set_xticks(midpoints[1:-1])
ax['scal'].tick_params(axis='x', pad=12) # which='major',
if 'ylims' in psd_plot_kwargs:
shared_y_lims = psd_plot_kwargs['ylims']
elif 'ylims' in wavelet_plot_kwargs:
shared_y_lims = wavelet_plot_kwargs['ylims']
else:
shared_y_lims = ax['scal'].get_ylim()
plt.setp(ax['ts'].get_xticklabels(), visible=False)
# ax['scal'].set_ylim([0.2,50])
# >>
# ax['scal'] = plt.subplot(gs[1:5, :-3], sharex=ax['ts'])
#
# #Need variable for plotting purposes
# if 'variable' not in wavelet_plot_kwargs:
# wavelet_plot_kwargs.update({'variable':'amplitude'})
#
# if 'title' not in wavelet_plot_kwargs:
# wavelet_plot_kwargs.update({'title':None})
#
# if 'cbar_style' not in wavelet_plot_kwargs:
# wavelet_plot_kwargs.update({'cbar_style':{'orientation': 'horizontal', 'pad': 0.12,
# 'label': wavelet_plot_kwargs['variable'].capitalize() + ' from ' + scalogram.wave_method}})
# else:
# if 'orientation' in wavelet_plot_kwargs['cbar_style']:
# orient = wavelet_plot_kwargs['cbar_style']['orientation']
# else:
# orient = 'horizontal'
# if 'pad' in wavelet_plot_kwargs['cbar_style']:
# pad = wavelet_plot_kwargs['cbar_style']['pad']
# else:
# pad = 0.12
# if 'label' in wavelet_plot_kwargs['cbar_style']:
# label = wavelet_plot_kwargs['cbar_style']['label']
# else:
# label = wavelet_plot_kwargs['variable'].capitalize() + ' from ' + scalogram.wave_method
# wavelet_plot_kwargs.update({'cbar_style':{'orientation': orient, 'pad': pad,
# 'label': label}})
#
# ax['scal'] = scalogram.plot(ax=ax['scal'], **wavelet_plot_kwargs)
# ax['scal'].get_yaxis().set_label_coords(y_label_loc,0.5)
#
# if period_lim is not None:
# ax['scal'].set_ylim(period_lim)
# if 'ylim' in wavelet_plot_kwargs:
# print('Ylim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
# ax['scal'].invert_yaxis()
### PSD
ax['psd'] = fig.add_subplot(gs_d['psd'][1, 0], sharey=ax['scal'])
ax['psd'] = psd.plot(ax=ax['psd'], transpose=True, ylabel=str(psd.spec_method) + ' PSD',
**psd_plot_kwargs)
if period_lim is not None:
ax['psd'].set_ylim(period_lim)
if 'ylim' in psd_plot_kwargs:
print(
'Ylim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
else:
ax['psd'].set_ylim(shared_y_lims)
ax['scal'].set_ylim(shared_y_lims)
if psd_lim is not None:
ax['psd'].set_xlim(psd_lim)
if 'xlim' in psd_plot_kwargs:
print(
'Xlim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument')
if psd_label is not None:
ax['psd'].set_xlabel(psd_label)
if 'xlabel' in psd_plot_kwargs:
print(
'Xlabel passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
ax['psd'].invert_yaxis()
ax['psd'].set_ylabel(None)
ax['psd'].tick_params(axis='y', direction='in', labelleft=False, pad=12)
if legend is None:
for key in ['ts', 'psd']:
ax[key].legend().remove()
if legend == True:
leg_h, leg_l = [], []
for key in ['ts', 'psd']:
ax[key].legend()
_h, _l = ax[key].get_legend_handles_labels()
for ip, label in enumerate(_l):
if label not in leg_l:
if len(label.split(' ')) > 1:
if len(label) > 15:
label = label[:15] + label[15:].replace(' ', '\n', 1)
label = label.replace('simulations', 'sims')
if psd_width/scal_width < .25:
label = label.replace('threshold', 'C.L.')
leg_l.append(label)
leg_h.append(_h[ip])
ax[key].legend().remove()
ax['leg'] = fig.add_subplot(gs_d['psd'][0, 0])
ax['leg'].grid(False)
for side in ['top', 'bottom', 'left', 'right']:
ax['leg'].spines[side].set_visible(False)
ax['leg'].set_xticklabels([])
ax['leg'].set_yticklabels([])
ax['leg'].tick_params(axis='x', which='both', length=0)
ax['leg'].tick_params(axis='y', which='both', length=0)
x0, y0 = 1,1#0,0#-psd_width*3/4, -ts_height*3/4#, psd_width, ts_height
ax['leg'].legend(leg_h, leg_l, fontsize='small', loc='upper left')#, bbox_to_anchor=(x0, y0))# width, height))
ax['scal'].invert_yaxis() # not sure where this needs to be
# ax['leg'] = fig.add_subplot(gs_d['psd_leg'][0, 0])
# ax['leg'].legend(h, l)
# ax['psd'] = plt.subplot(gs[1:4, -3:], sharey=ax['scal'])
# ax['psd'] = psd.plot(ax=ax['psd'], transpose=True, ylabel = 'PSD from \n' + str(psd.spec_method), **psd_plot_kwargs)
#
# if period_lim is not None:
# ax['psd'].set_ylim(period_lim)
# if 'ylim' in psd_plot_kwargs:
# print('Ylim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# ax['psd'].yaxis.set_visible(False)
# ax['psd'].invert_yaxis()
# ax['psd'].set_ylabel(None)
# ax['psd'].tick_params(axis='y', direction='in', labelleft=False)
# ax['psd'].legend().remove()
#
# if psd_lim is not None:
# ax['psd'].set_xlim(psd_lim)
# if 'xlim' in psd_plot_kwargs:
# print('Xlim passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument')
#
# if title is not None:
# ax['ts'].set_title(title)
# if 'title' in ts_plot_kwargs:
# print('Title passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# if value_label is not None:
# #time_label, value_label = self.make_labels()
# ax['ts'].set_ylabel(value_label)
# if 'ylabel' in ts_plot_kwargs:
# print('Ylabel passed to time series plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# if time_label is not None:
# #time_label, value_label = self.make_labels()
# ax['scal'].set_xlabel(time_label)
# if 'xlabel' in wavelet_plot_kwargs:
# print('Xlabel passed to scalogram plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# if period_label is not None:
# #period_unit = infer_period_unit_from_time_unit(self.time_unit)
# #period_label = f'Period [{period_unit}]' if period_unit is not None else 'Period'
# ax['scal'].set_ylabel(period_label)
# if 'ylabel' in wavelet_plot_kwargs:
# print('Ylabel passed to scalogram plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
#
# if psd_label is not None:
# ax['psd'].set_xlabel(psd_label)
# if 'xlabel' in psd_plot_kwargs:
# print('Xlabel passed to psd plot through exposed argument and key word argument. The exposed argument takes precedence and will overwrite relevant key word argument.')
# plot color bar for scalogram using filled contour data
ax['cb'] = fig.add_subplot(gs_d['cb'][0, 0])
cb = mcb.Colorbar(ax=ax['cb'], mappable=scalogram.conf,
orientation=wavelet_plot_kwargs['cbar_style']['orientation'],
label=wavelet_plot_kwargs['cbar_style']['label'])#,
# pad=wavelet_plot_kwargs['cbar_style']['pad'])
#
# cb = mpl.colorbar.ColorbarBase(ax['cb'], orientation='horizontal',
# cmap=cbar_data['cmap'],
# norm=cbar_data['norm'], # vmax and vmin
# extend=cbar_data['extend'],
# boundaries=cbar_data['boundaries'], # ,
# label=wavelet_plot_kwargs['cbar_style']['label'],
# drawedges=cbar_data['drawedges']) # True)
# cb = mpl.colorbar.Colorbar(ax['cb'], mappable = cbar_data.mappable,
# orientation='horizontal',
# extend=cbar_data.extend,
# boundaries=cbar_data.boundaries, # ,
# label=wavelet_plot_kwargs['cbar_style']['label'],
# drawedges=cbar_data.drawedges) # True)
#
# ticks=[0, 3, 6, 9])
if 'path' in savefig_settings:
plotting.savefig(fig, settings=savefig_settings)
return fig, ax
[docs] def copy(self):
'''Make a copy of the Series object
Returns
-------
Series : Series
A copy of the Series object
'''
return deepcopy(self)
[docs] def clean(self, verbose=False, keep_log = False):
''' Clean up the timeseries by removing NaNs and sort with increasing time points
Parameters
----------
verbose : bool
If True, will print warning messages if there is any
keep_log : Boolean
if True, adds this step and its parameters to the series log.
Returns
-------
new : Series
Series object with removed NaNs and sorting
'''
new = self.copy()
v_mod, t_mod = tsbase.clean_ts(self.value, self.time, verbose=verbose)
new.time = t_mod
new.value = v_mod
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'clean', 'verbose': verbose},)
return new
[docs] def sort(self, verbose=False, ascending = True, keep_log = False):
''' Ensure timeseries is set to a monotonically increasing axis.
If the time axis is prograde to begin with, no transformation is applied.
Parameters
----------
verbose : bool
If True, will print warning messages if there is any
keep_log : Boolean
if True, adds this step and its parameter to the series log.
Returns
-------
new : Series
Series object with removed NaNs and sorting
'''
new = self.copy()
v_mod, t_mod = tsbase.sort_ts(self.value, self.time, ascending=ascending, verbose=verbose)
new.time = t_mod
new.value = v_mod
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'sort', 'ascending': ascending},)
return new
[docs] def gaussianize(self, keep_log = False):
''' Gaussianizes the timeseries (i.e. maps its values to a standard normal)
Returns
-------
new : Series
The Gaussianized series object
keep_log : Boolean
if True, adds this transformation to the series log.
References
----------
Emile-Geay, J., and M. Tingley (2016), Inferring climate variability from nonlinear proxies: application to palaeo-enso studies, Climate of the Past, 12 (1), 31–50, doi:10.5194/cp- 12-31-2016.
'''
new = self.copy()
v_mod = tsutils.gaussianize(self.value)
new.value = v_mod
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'gaussianize', 'applied': True},)
return new
[docs] def standardize(self, keep_log = False, scale=1):
"""Standardizes the series ((i.e. remove its estimated mean and divides by its estimated standard deviation)
Returns
-------
new : Series
The standardized series object
keep_log : Boolean
if True, adds the previous mean, standard deviation and method parameters to the series log.
"""
new = self.copy()
vs, mu, sig = tsutils.standardize(self.value, scale=scale)
new.value = vs
if keep_log == True:
if new.log is None:
new.log=()
method_dict = {len(new.log):'standardize', 'args': scale,
'previous_mean': mu, 'previous_std': sig}
new.log += (method_dict,)
return new
[docs] def center(self, timespan=None, keep_log=False):
''' Centers the series (i.e. renove its estimated mean)
Parameters
----------
timespan : tuple or list
The timespan over which the mean must be estimated.
In the form [a, b], where a, b are two points along the series' time axis.
keep_log : Boolean
if True, adds the previous mean and method parameters to the series log.
Returns
-------
new : Series
The centered series object
'''
new = self.copy()
if timespan is not None:
ts_mean = np.nanmean(self.slice(timespan).value)
else:
ts_mean = np.nanmean(self.value)
new.value = self.value - ts_mean
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log): 'center', 'args': timespan, 'previous_mean': ts_mean},)
return new
[docs] def segment(self, factor=10, verbose = False):
"""Gap detection
This function segments a timeseries into n number of parts following a gap
detection algorithm. The rule of gap detection is very simple:
we define the intervals between time points as dts, then if dts[i] is larger than factor * dts[i-1],
we think that the change of dts (or the gradient) is too large, and we regard it as a breaking point
and divide the time series into two segments here
Parameters
----------
factor : float
The factor that adjusts the threshold for gap detection
verbose : bool
If True, will print warning messages if there is any
Returns
-------
res : MultipleSeries or Series
If gaps were detected, returns the segments in a MultipleSeries object,
else, returns the original timeseries.
"""
seg_y, seg_t, n_segs = tsutils.ts2segments(self.value,self.time,factor=factor)
if len(seg_y)>1:
s_list=[]
for idx,s in enumerate(seg_y):
if self.label is not None:
s_lbl = self.label + ' segment ' + str(idx+1)
else:
s_lbl = 'segment ' + str(idx+1)
s_tmp=Series(time=seg_t[idx],value=s,time_name=self.time_name,
time_unit=self.time_unit, value_name=self.value_name,
value_unit=self.value_unit,label=s_lbl, verbose=verbose)
s_list.append(s_tmp)
res=MultipleSeries(series_list=s_list)
elif len(seg_y)==1:
res=self.copy()
else:
raise ValueError('No timeseries detected')
return res
[docs] def sel(self, value=None, time=None, tolerance=0):
"""
Slice Series based on 'value' or 'time'.
Parameters
----------
value : int, float, slice
If int/float, then the Series will be sliced so that `self.value` is
equal to `value` (+/- `tolerance`).
If slice, then the Series will be sliced so `self.value` is between
slice.start and slice.stop (+/- tolerance).
time : int, float, slice
If int/float, then the Series will be sliced so that `self.time` is
equal to `time`. (+/- `tolerance`)
If slice of int/float, then the Series will be sliced so that
`self.time` is between slice.start and slice.stop.
If slice of `datetime` (or str containing datetime, such as `'2020-01-01'`),
then the Series will be sliced so that `self.datetime_index` is
between `time.start` and `time.stop` (+/- `tolerance`, which needs to be
a `timedelta`).
tolerance : int, float, default 0.
Used by `value` and `time`, see above.
Returns
-------
Copy of `self`, sliced according to `value` and `time`.
Examples
--------
>>> ts = pyleo.Series(
... time=np.array([1, 1.1, 2, 3]), value=np.array([4, .9, 6, 1]), time_unit='years BP'
... )
>>> ts.sel(value=1)
{'log': ({0: 'clean_ts', 'applied': True, 'verbose': False},
{2: 'clean_ts', 'applied': True, 'verbose': False})}
None
time [years BP]
3.0 1.0
Name: value, dtype: float64
If you also want to include the value `3.9`, you could set `tolerance` to `.1`:
>>> ts.sel(value=1, tolerance=.1)
{'log': ({0: 'clean_ts', 'applied': True, 'verbose': False},
{2: 'clean_ts', 'applied': True, 'verbose': False})}
None
time [years BP]
1.1 0.9
3.0 1.0
Name: value, dtype: float64
You can also pass a `slice` to select a range of values:
>>> ts.sel(value=slice(4, 6))
{'log': ({0: 'clean_ts', 'applied': True, 'verbose': False},
{2: 'clean_ts', 'applied': True, 'verbose': False})}
None
time [years BP]
1.0 4.0
2.0 6.0
Name: value, dtype: float64
>>> ts.sel(value=slice(4, None))
{'log': ({0: 'clean_ts', 'applied': True, 'verbose': False},
{2: 'clean_ts', 'applied': True, 'verbose': False})}
None
time [years BP]
1.0 4.0
2.0 6.0
Name: value, dtype: float64
>>> ts.sel(value=slice(None, 4))
{'log': ({0: 'clean_ts', 'applied': True, 'verbose': False},
{2: 'clean_ts', 'applied': True, 'verbose': False})}
None
time [years BP]
1.0 4.0
1.1 0.9
3.0 1.0
Name: value, dtype: float64
Similarly, you filter using `time` instead of `value`.
"""
if value is not None and time is not None:
raise TypeError("Cannot pass both `value` and `time`")
if value is not None:
if isinstance(value, (int, float)):
return self.pandas_method(lambda x: x[x.between(value-tolerance, value+tolerance)])
if isinstance(value, slice):
if isinstance(value.start, (int, float)) and isinstance(value.stop, (int, float)):
return self.pandas_method(lambda x: x[x.between(value.start-tolerance, value.stop+tolerance)])
if isinstance(value.start, (int, float)) and value.stop is None:
return self.pandas_method(lambda x: x[x.ge(value.start-tolerance)])
if isinstance(value.stop, (int, float)) and value.start is None:
return self.pandas_method(lambda x: x[x.le(value.stop-tolerance)])
raise TypeError(f'Expected slice, int, or float, got: {type(value)}')
if time is not None:
if isinstance(time, (int, float)):
return self.slice([time-tolerance, time+tolerance])
if isinstance(time, dt.datetime):
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
return self.pandas_method(
lambda x: x[(x.index>=time-tolerance) & (x.index<=time+tolerance)]
)
if isinstance(time, str):
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
tolerance = np.timedelta64(tolerance, 's')
return self.pandas_method(
lambda x: x[
(x.index>=np.datetime64(time, 's')-tolerance)
& (x.index<=np.datetime64(time, 's')+tolerance)
]
)
if isinstance(time, slice):
if isinstance(time.start, (int, float)) and isinstance(time.stop, (int, float)):
return self.slice([time.start-tolerance, time.stop+tolerance])
if isinstance(time.start, (int, float)) and time.stop is None:
mask = self.time >= time.start-tolerance
new = self.copy()
new.time = new.time[mask]
new.value = new.value[mask]
return new
if isinstance(time.stop, (int, float)) and time.start is None:
mask = self.time <= time.stop+tolerance
new = self.copy()
new.time = new.time[mask]
new.value = new.value[mask]
return new
if isinstance(time.start, str) and isinstance(time.stop, str):
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
tolerance = np.timedelta64(tolerance, 's')
return self.pandas_method(
lambda x: x[
(x.index>=np.datetime64(time.start, 's')-tolerance)
& (x.index<=np.datetime64(time.stop, 's')+tolerance)
]
)
if isinstance(time.start, str) and time.stop is None:
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
tolerance = np.timedelta64(tolerance, 's')
return self.pandas_method(
lambda x: x[x.index>=np.datetime64(time.start, 's')-tolerance]
)
if isinstance(time.stop, str) and time.start is None:
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}")
tolerance = np.timedelta64(tolerance, 's')
return self.pandas_method(
lambda x: x[x.index<=np.datetime64(time.stop, 's')+tolerance]
)
if isinstance(time.start, dt.datetime) and isinstance(time.stop, dt.datetime):
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
return self.pandas_method(
lambda x: x[(x.index>=time.start-tolerance) & (x.index<=time.stop+tolerance)]
)
if isinstance(time.start, dt.datetime) and time.stop is None:
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
return self.pandas_method(
lambda x: x[x.index>=time.start-tolerance]
)
if isinstance(time.stop, dt.datetime) and time.start is None:
if tolerance == 0:
tolerance = dt.timedelta(days=0)
if not isinstance(tolerance, dt.timedelta):
raise TypeError(
f"Invalid 'tolerance' passed. Expected timedelta, received: {type(tolerance)}"
)
return self.pandas_method(
lambda x: x[x.index<=time.stop+tolerance]
)
raise TypeError("Expected int or float, or slice of int/float/datetime/str.")
raise TypeError("Invalid combination of arguments received.")
[docs] def slice(self, timespan):
''' Slicing the timeseries with a timespan (tuple or list)
Parameters
----------
timespan : tuple or list
The list of time points for slicing, whose length must be even.
When there are n time points, the output Series includes n/2 segments.
For example, if timespan = [a, b], then the sliced output includes one segment [a, b];
if timespan = [a, b, c, d], then the sliced output includes segment [a, b] and segment [c, d].
Returns
-------
new : Series
The sliced Series object.
Examples
--------
slice the SOI from 1972 to 1998
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
ts_slice = ts.slice([1972, 1998])
print("New time bounds:",ts_slice.time.min(),ts_slice.time.max())
'''
new = self.copy()
n_elements = len(timespan)
if n_elements % 2 == 1:
raise ValueError('The number of elements in timespan must be even!')
n_segments = int(n_elements / 2)
mask = [False for i in range(np.size(self.time))]
for i in range(n_segments):
mask |= (self.time >= timespan[i*2]) & (self.time <= timespan[i*2+1])
new.time = self.time[mask]
new.value = self.value[mask]
return new
[docs] def fill_na(self, timespan=None, dt=1, keep_log=False):
''' Fill NaNs into the timespan
Parameters
----------
timespan : tuple or list
The list of time points for slicing, whose length must be 2.
For example, if timespan = [a, b], then the sliced output includes one segment [a, b].
If None, will use the start point and end point of the original timeseries
dt : float
The time spacing to fill the NaNs; default is 1.
keep_log : Boolean
if True, adds this step and its parameters to the series log.
Returns
-------
new : Series
The sliced Series object.
'''
new = self.copy()
if timespan is None:
start = np.min(self.time)
end = np.max(self.time)
else:
start = timespan[0]
end = timespan[-1]
new_time = np.arange(start, end+dt, dt)
new_value = np.empty(np.size(new_time))
for i, t in enumerate(new_time):
if t in self.time:
loc = list(self.time).index(t)
new_value[i] = self.value[loc]
else:
new_value[i] = np.nan
new.time = new_time
new.value = new_value
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'fill_na', 'applied': True, 'dt': dt, 'timespan': timespan},)
return new
[docs] def detrend(self, method='emd', keep_log=False, preserve_mean = False, **kwargs):
'''Detrend Series object
Parameters
----------
method : str, optional
The method for detrending. The default is 'emd'.
Options include:
* "linear": the result of a n ordinary least-squares stright line fit to y is subtracted.
* "constant": only the mean of data is subtracted.
* "savitzky-golay", y is filtered using the Savitzky-Golay filters and the resulting filtered series is subtracted from y.
* "emd" (default): Empirical mode decomposition. The last mode is assumed to be the trend and removed from the series
keep_log : boolean
if True, adds the removed trend and method parameters to the series log.
preserve_mean : boolean
if True, ensures that the mean of the series is preserved despite the detrending
kwargs : dict
Relevant arguments for each of the methods.
Returns
-------
new : Series
Detrended Series object in "value", with new field "trend" added
See also
--------
pyleoclim.utils.tsutils.detrend : detrending wrapper functions
Examples
--------
.. jupyter-execute::
lr04 = pyleo.utils.load_dataset('LR04')
fig, ax = lr04.plot(invert_yaxis=True)
ts_emd = lr04.detrend(method='emd',preserve_mean=True)
ts_emd.plot(label=lr04.label+', EMD detrend',ax=ax)
'''
new = self.copy()
v_mod, trend = tsutils.detrend(self.value, x=self.time, method=method,
preserve_mean=preserve_mean, **kwargs)
new.value = v_mod
#new.label = self.label +' (' + method +' detrended)'
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log): 'detrend','method': method, 'args': kwargs, 'previous_trend': trend},)
return new
[docs] def spectral(self, method='lomb_scargle', freq_method='log', freq_kwargs=None, settings=None, label=None, scalogram=None, verbose=False):
''' Perform spectral analysis on the timeseries
Parameters
----------
method : str;
{'wwz', 'mtm', 'lomb_scargle', 'welch', 'periodogram', 'cwt'}
freq_method : str
{'log','scale', 'nfft', 'lomb_scargle', 'welch'}
freq_kwargs : dict
Arguments for frequency vector
settings : dict
Arguments for the specific spectral method
label : str
Label for the PSD object
scalogram : pyleoclim.core.series.Series.Scalogram
The return of the wavelet analysis; effective only when the method is 'wwz' or 'cwt'
verbose : bool
If True, will print warning messages if there is any
Returns
-------
psd : PSD
A PSD object
See also
--------
pyleoclim.utils.spectral.mtm : Spectral analysis using the Multitaper approach
pyleoclim.utils.spectral.lomb_scargle : Spectral analysis using the Lomb-Scargle method
pyleoclim.utils.spectral.welch: Spectral analysis using the Welch segement approach
pyleoclim.utils.spectral.periodogram: Spectral anaysis using the basic Fourier transform
pyleoclim.utils.spectral.wwz_psd : Spectral analysis using the Wavelet Weighted Z transform
pyleoclim.utils.spectral.cwt_psd : Spectral analysis using the continuous Wavelet Transform as implemented by Torrence and Compo
pyleoclim.utils.spectral.make_freq_vector : Functions to create the frequency vector
pyleoclim.utils.tsutils.detrend : Detrending function
pyleoclim.core.psds.PSD : PSD object
pyleoclim.core.psds.MultiplePSD : Multiple PSD object
Examples
--------
Calculate the spectrum of SOI using the various methods and compute significance
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
ts_std = ts.standardize()
- Lomb-Scargle
.. jupyter-execute::
psd_ls = ts_std.spectral(method='lomb_scargle')
psd_ls_signif = psd_ls.signif_test(number=20) #in practice, need more AR1 simulations
fig, ax = psd_ls_signif.plot(title='PSD using Lomb-Scargle method')
We may pass in method-specific arguments via "settings", which is a dictionary.
For instance, to adjust the number of overlapping segment for Lomb-Scargle, we may specify the method-specific argument "n50";
to adjust the frequency vector, we may modify the "freq_method" or modify the method-specific argument "freq".
.. jupyter-execute::
import numpy as np
psd_LS_n50 = ts_std.spectral(method='lomb_scargle', settings={'n50': 4}) # c=1e-2 yields lower frequency resolution
psd_LS_freq = ts_std.spectral(method='lomb_scargle', settings={'freq': np.linspace(1/20, 1/0.2, 51)})
psd_LS_LS = ts_std.spectral(method='lomb_scargle', freq_method='lomb_scargle') # with frequency vector generated using REDFIT method
fig, ax = psd_LS_n50.plot(
title='PSD using Lomb-Scargle method with 4 overlapping segments',
label='settings={"n50": 4}')
psd_ls.plot(ax=ax, label='settings={"n50": 3}', marker='o')
fig, ax = psd_LS_freq.plot(
title='PSD using Lomb-Scargle method with different frequency vectors',
label='freq=np.linspace(1/20, 1/0.2, 51)', marker='o')
psd_ls.plot(ax=ax, label='freq_method="log"', marker='o')
You may notice the differences in the PSD curves regarding smoothness and the locations of the analyzed period points.
For other method-specific arguments, please look up the specific methods in the "See also" section.
- WWZ
.. jupyter-execute::
psd_wwz = ts_std.spectral(method='wwz') # wwz is the default method
psd_wwz_signif = psd_wwz.signif_test(number=1) # significance test; for real work, should use number=200 or even larger
fig, ax = psd_wwz_signif.plot(title='PSD using WWZ method')
We may take advantage of a pre-calculated scalogram using WWZ to accelerate the spectral analysis
(although note that the default parameters for spectral and wavelet analysis using WWZ are different):
.. jupyter-execute::
scal_wwz = ts_std.wavelet(method='wwz') # wwz is the default method
psd_wwz_fast = ts_std.spectral(method='wwz', scalogram=scal_wwz)
fig, ax = psd_wwz_fast.plot(title='PSD using WWZ method w/ pre-calculated scalogram')
- Periodogram
.. jupyter-execute::
ts_interp = ts_std.interp()
psd_perio = ts_interp.spectral(method='periodogram')
psd_perio_signif = psd_perio.signif_test(number=20, method='ar1sim') #in practice, need more AR1 simulations
fig, ax = psd_perio_signif.plot(title='PSD using Periodogram method')
- Welch
.. jupyter-execute::
psd_welch = ts_interp.spectral(method='welch')
psd_welch_signif = psd_welch.signif_test(number=20, method='ar1sim') #in practice, need more AR1 simulations
fig, ax = psd_welch_signif.plot(title='PSD using Welch method')
- MTM
.. jupyter-execute::
psd_mtm = ts_interp.spectral(method='mtm', label='MTM, NW=4')
psd_mtm_signif = psd_mtm.signif_test(number=20, method='ar1sim') #in practice, need more AR1 simulations
fig, ax = psd_mtm_signif.plot(title='PSD using the multitaper method')
By default, MTM uses a half-bandwidth of 4 times the fundamental (Rayleigh) frequency, i.e. NW = 4, which is the most conservative choice.
NW runs from 2 to 4 in multiples of 1/2, and can be adjusted like so (note the sharper peaks and higher overall variance, which may not be desirable):
.. jupyter-execute::
psd_mtm2 = ts_interp.spectral(method='mtm', settings={'NW':2}, label='MTM, NW=2')
fig, ax = psd_mtm2.plot(title='MTM with NW=2')
- Continuous Wavelet Transform
.. jupyter-execute::
ts_interp = ts_std.interp()
psd_cwt = ts_interp.spectral(method='cwt')
psd_cwt_signif = psd_cwt.signif_test(number=20)
fig, ax = psd_cwt_signif.plot(title='PSD using the CWT method')
'''
if not verbose:
warnings.simplefilter('ignore')
settings = {} if settings is None else settings.copy()
spec_func = {
'wwz': specutils.wwz_psd,
'mtm': specutils.mtm,
'lomb_scargle': specutils.lomb_scargle,
'welch': specutils.welch,
'periodogram': specutils.periodogram,
'cwt': specutils.cwt_psd
}
args = {}
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = specutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs)
args['wwz'] = {'freq': freq}
args['cwt'] = {'freq': freq}
args['mtm'] = {}
args['lomb_scargle'] = {'freq': freq}
args['welch'] = {}
args['periodogram'] = {}
args[method].update(settings)
if method == 'wwz' and scalogram is not None:
args['wwz'].update(
{
'wwa': scalogram.amplitude,
'wwz_Neffs': scalogram.wwz_Neffs,
'wwz_freq': scalogram.frequency,
}
)
if method == 'cwt' and scalogram is not None:
Results = collections.namedtuple('Results', ['amplitude', 'coi', 'freq', 'time', 'scale', 'mother','param'])
res = Results(amplitude=scalogram.amplitude, coi=scalogram.coi,
freq=scalogram.frequency, time=scalogram.time,
scale=scalogram.wave_args['scale'],
mother=scalogram.wave_args['mother'],
param=scalogram.wave_args['param'])
args['cwt'].update({'cwt_res':res})
spec_res = spec_func[method](self.value, self.time, **args[method])
if type(spec_res) is dict:
spec_res = dict2namedtuple(spec_res)
if label is None:
label = self.label
if method == 'wwz' and scalogram is not None:
args['wwz'].pop('wwa')
args['wwz'].pop('wwz_Neffs')
args['wwz'].pop('wwz_freq')
if method == 'cwt':
args['cwt'].update({'scale':spec_res.scale,'mother':spec_res.mother,'param':spec_res.param})
if scalogram is not None:
args['cwt'].pop('cwt_res')
psd = PSD(
frequency=spec_res.freq,
amplitude=spec_res.psd,
label=label,
timeseries=self,
spec_method=method,
spec_args=args[method]
)
return psd
[docs] def wavelet(self, method='cwt', settings=None, freq_method='log', freq_kwargs=None, verbose=False):
''' Perform wavelet analysis on a timeseries
Parameters
----------
method : str {wwz, cwt}
cwt - the continuous wavelet transform [1]
is appropriate for evenly-spaced series.
wwz - the weighted wavelet Z-transform [2]
is appropriate for unevenly-spaced series.
Default is cwt, returning an error if the Series is unevenly-spaced.
freq_method : str
{'log', 'scale', 'nfft', 'lomb_scargle', 'welch'}
freq_kwargs : dict
Arguments for the frequency vector
settings : dict
Arguments for the specific wavelet method
verbose : bool
If True, will print warning messages if there are any
Returns
-------
scal : Scalogram object
See also
--------
pyleoclim.utils.wavelet.wwz : wwz function
pyleoclim.utils.wavelet.cwt : cwt function
pyleoclim.utils.spectral.make_freq_vector : Functions to create the frequency vector
pyleoclim.utils.tsutils.detrend : Detrending function
pyleoclim.core.series.Series.spectral : spectral analysis tools
pyleoclim.core.scalograms.Scalogram : Scalogram object
pyleoclim.core.scalograms.MultipleScalogram : Multiple Scalogram object
References
----------
[1] Torrence, C. and G. P. Compo, 1998: A Practical Guide to Wavelet Analysis. Bull. Amer. Meteor. Soc., 79, 61-78.
Python routines available at http://paos.colorado.edu/research/wavelets/
[2] Foster, G., 1996: Wavelets for period analysis of unevenly sampled time series. The Astronomical Journal, 112, 1709.
Examples
--------
Wavelet analysis on the evenly-spaced SOI record. The CWT method will be applied by default.
.. jupyter-execute::
import pyleoclim as pyleo
ts = pyleo.utils.load_dataset('SOI')
scal1 = ts.wavelet()
scal_signif = scal1.signif_test(number=20) # for research-grade work, use number=200 or larger
fig, ax = scal_signif.plot()
If you wanted to invoke the WWZ method instead (here with no significance testing, to lower computational cost):
.. jupyter-execute::
scal2 = ts.wavelet(method='wwz')
fig, ax = scal2.plot()
Notice that the two scalograms have different amplitudes, which are relative. Method-specific arguments
may be passed via `settings`. For instance, if you wanted to change the default mother wavelet
('MORLET') to a derivative of a Gaussian (DOG), with degree 2 by default ("Mexican Hat wavelet"):
.. jupyter-execute::
scal3 = ts.wavelet(settings = {'mother':'DOG'})
fig, ax = scal3.plot(title='CWT scalogram with DOG mother wavelet')
As for WWZ, note that, for computational efficiency, the time axis is coarse-grained
by default to 50 time points, which explains in part the difference with the CWT scalogram.
If you need a custom axis, it (and other method-specific parameters) can also be passed
via the `settings` dictionary:
.. jupyter-execute::
tau = np.linspace(np.min(ts.time), np.max(ts.time), 60)
scal4 = ts.wavelet(method='wwz', settings={'tau':tau})
fig, ax = scal4.plot(title='WWZ scalogram with finer time axis')
'''
if not verbose:
warnings.simplefilter('ignore')
# Assign method
if method == 'cwt' and not(self.is_evenly_spaced()):
raise ValueError("The chosen method is cwt but the series is unevenly spaced. You can either interpolate/bin or set method='wwz'.")
wave_func = {'wwz': waveutils.wwz,
'cwt': waveutils.cwt
}
# Process options
settings = {} if settings is None else settings.copy()
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = specutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs)
args = {}
args['wwz'] = {'freq': freq}
args['cwt'] = {'freq': freq}
if method == 'wwz':
if 'ntau' in settings.keys():
ntau = settings['ntau']
else:
ntau = np.min([np.size(self.time), 50])
tau = np.linspace(np.min(self.time), np.max(self.time), ntau)
settings.update({'tau': tau})
args[method].update(settings)
# Apply wavelet method
wave_res = wave_func[method](self.value, self.time, **args[method])
# Export result
if method == 'wwz':
wwz_Neffs = wave_res.Neffs
elif method=='cwt':
wwz_Neffs = None
args[method].update({'scale':wave_res.scale,'mother':wave_res.mother,'param':wave_res.param,
'standardize':wave_res.standardize, 'gaussianize':wave_res.gaussianize})
scal = Scalogram(
frequency=wave_res.freq,
scale = wave_res.scale,
time=wave_res.time,
amplitude=wave_res.amplitude,
coi=wave_res.coi,
label=self.label,
timeseries=self,
wave_method=method,
freq_method=freq_method,
freq_kwargs=freq_kwargs,
wave_args=args[method],
wwz_Neffs=wwz_Neffs,
)
return scal
[docs] def wavelet_coherence(self, target_series, method='cwt', settings=None,
freq_method='log', freq_kwargs=None, verbose=False,
common_time_kwargs=None):
''' Performs wavelet coherence analysis with the target timeseries
Parameters
----------
target_series : Series
A pyleoclim Series object on which to perform the coherence analysis
method : str
Possible methods {'wwz','cwt'}. Default is 'cwt', which only works
if the series share the same evenly-spaced time axis.
'wwz' is designed for unevenly-spaced data, but is far slower.
freq_method : str
{'log','scale', 'nfft', 'lomb_scargle', 'welch'}
freq_kwargs : dict
Arguments for frequency vector
common_time_kwargs : dict
Parameters for the method `MultipleSeries.common_time()`. Will use interpolation by default.
settings : dict
Arguments for the specific wavelet method (e.g. decay constant for WWZ, mother wavelet for CWT)
and common properties like standardize, detrend, gaussianize, pad, etc.
verbose : bool
If True, will print warning messages, if any
Returns
-------
coh : pyleoclim.core.coherence.Coherence
References
----------
Grinsted, A., Moore, J. C. & Jevrejeva, S. Application of the cross wavelet transform and
wavelet coherence to geophysical time series. Nonlin. Processes Geophys. 11, 561–566 (2004).
See also
--------
pyleoclim.utils.spectral.make_freq_vector : Functions to create the frequency vector
pyleoclim.utils.tsutils.detrend : Detrending function
pyleoclim.core.multipleseries.MultipleSeries.common_time : put timeseries on common time axis
pyleoclim.core.series.Series.wavelet : wavelet analysis
pyleoclim.utils.wavelet.wwz_coherence : coherence using the wwz method
pyleoclim.utils.wavelet.cwt_coherence : coherence using the cwt method
Examples
--------
Calculate the wavelet coherence of NINO3 and All India Rainfall with default arguments:
.. jupyter-execute::
import pyleoclim as pyleo
ts_air = pyleo.utils.load_dataset('AIR')
ts_nino = pyleo.utils.load_dataset('NINO3')
coh = ts_air.wavelet_coherence(ts_nino)
coh.plot()
Note that in this example both timeseries area already on a common,
evenly-spaced time axis. If they are not (either because the data are unevenly spaced,
or because the time axes are different in some other way), an error will be raised.
To circumvent this error, you can either put the series
on a common time axis (e.g. using common_time()) prior to applying CWT, or you
can use the Weighted Wavelet Z-transform (WWZ) instead, as it is designed for
unevenly-spaced data. However, it is usually far slower:
.. jupyter-execute::
coh_wwz = ts_air.wavelet_coherence(ts_nino, method = 'wwz')
coh_wwz.plot()
As with wavelet analysis, both CWT and WWZ admit optional arguments through `settings`.
For instance, one can adjust the resolution of the time axis on which coherence is evaluated:
.. jupyter-execute::
coh_wwz = ts_air.wavelet_coherence(ts_nino, method = 'wwz', settings = {'ntau':20})
coh_wwz.plot()
The frequency (scale) axis can also be customized, e.g. to focus on scales from 1 to 20y, with 24 scales:
.. jupyter-execute::
coh = ts_air.wavelet_coherence(ts_nino, freq_kwargs={'fmin':1/20,'fmax':1,'nf':24})
coh.plot()
Significance is assessed similarly to PSD or Scalogram objects:
.. jupyter-execute::
cwt_sig = coh.signif_test(number=20, qs=[.9,.95]) # specifiying 2 significance thresholds does not take any more time.
# by default, the plot function will look for the closest quantile to 0.95, but it is easy to adjust:
cwt_sig.plot(signif_thresh = 0.9)
Another plotting option, `dashboard`, allows to visualize both
timeseries as well as the wavelet transform coherency (WTC), which quantifies where
two timeseries exhibit similar behavior in time-frequency space, and the cross-wavelet
transform (XWT), which indicates regions of high common power.
.. jupyter-execute::
cwt_sig.dashboard()
Note: this design balances many considerations, and is not easily customizable.
'''
if not verbose:
warnings.simplefilter('ignore')
settings = {} if settings is None else settings.copy()
wtc_func = {
'wwz': waveutils.wwz_coherence,
'cwt': waveutils.cwt_coherence
}
# Process options
settings = {} if settings is None else settings.copy()
freq_kwargs = {} if freq_kwargs is None else freq_kwargs.copy()
freq = specutils.make_freq_vector(self.time, method=freq_method, **freq_kwargs)
args = {}
args['wwz'] = {'freq': freq, 'verbose': verbose}
args['cwt'] = {'freq': freq}
# put on same time axes if necessary
if method == 'cwt' and not np.array_equal(self.time, target_series.time):
warnings.warn("Series have different time axes. Applying common_time().")
ms = MultipleSeries([self, target_series])
common_time_kwargs = {} if common_time_kwargs is None else common_time_kwargs.copy()
ct_args = {'method': 'interp'}
ct_args.update(common_time_kwargs)
ms = ms.common_time(**ct_args)
ts1 = ms.series_list[0]
ts2 = ms.series_list[1]
elif method == 'cwt' and (not self.is_evenly_spaced() or not target_series.is_evenly_spaced()):
raise ValueError("The chosen method is cwt but at least one the series is unevenly spaced. You can either apply common_time() or use 'wwz'.")
else:
ts1 = self
ts2 = target_series
if method == 'wwz':
if 'ntau' in settings.keys():
ntau = settings['ntau']
settings.pop('ntau')
else:
ntau = np.min([np.size(ts1.time), np.size(ts2.time), 50])
if 'tau' in settings.keys():
tau = settings['tau']
else:
lb1, ub1 = np.min(ts1.time), np.max(ts1.time)
lb2, ub2 = np.min(ts2.time), np.max(ts2.time)
lb = np.max([lb1, lb2])
ub = np.min([ub1, ub2])
tau = np.linspace(lb, ub, ntau)
settings.update({'tau': tau})
args[method].update(settings)
# Apply WTC method
wtc_res = wtc_func[method](ts1.value, ts1.time, ts2.value, ts2.time, **args[method])
# Export result
coh = Coherence(
frequency=wtc_res.freq,
scale = wtc_res.scale,
time=wtc_res.time,
wtc=wtc_res.xw_coherence,
xwt=wtc_res.xw_amplitude,
phase=wtc_res.xw_phase,
coi=wtc_res.coi,
timeseries1= ts1,
timeseries2= ts2,
wave_method = method,
wave_args = args[method],
freq_method=freq_method,
freq_kwargs=freq_kwargs,
)
return coh
[docs] def correlation(self, target_series, alpha=0.05, statistic='pearsonr', method = 'phaseran', timespan=None,
settings=None, common_time_kwargs=None, seed=None, mute_pbar=False):
''' Estimates the correlation and its associated significance between two time series (not ncessarily IID).
The significance of the correlation is assessed using one of the following methods:
1) 'ttest': T-test adjusted for effective sample size.
2) 'isopersistent': AR(1) modeling of x and y.
3) 'isospectral': phase randomization of original inputs. (default)
The T-test is a parametric test, hence computationally cheap, but can only be performed in ideal circumstances.
The others are non-parametric, but their computational requirements scale with the number of simulations.
The choise of significance test and associated number of Monte-Carlo simulations are passed through the `settings` parameter.
Parameters
----------
target_series : Series
A pyleoclim Series object
alpha : float
The significance level (default: 0.05)
statistic : str
statistic being evaluated. Can use any of the SciPy-supported ones:
https://docs.scipy.org/doc/scipy/reference/stats.html#association-correlation-tests
Currently supported: ['pearsonr','spearmanr','pointbiserialr','kendalltau','weightedtau']
Default: 'pearsonr'.
method : str, {'ttest','built-in','ar1sim','phaseran'}
method for significance testing. Default is 'phaseran'
'ttest' implements the T-test with degrees of freedom adjusted for autocorrelation, as done in [1]
'built-in' uses the p-value that ships with the statistic.
The old options 'isopersistent' and 'isospectral' still work, but trigger a deprecation warning.
Note that 'weightedtau' does not have a known distribution, so the 'built-in' method returns an error in that case.
timespan : tuple
The time interval over which to perform the calculation
settings : dict
Parameters for the correlation function, including:
nsim : int
the number of simulations (default: 1000)
method : str, {'ttest','ar1sim','phaseran' (default)}
method for significance testing
surr_settings : dict
Parameters for surrogate generator. See individual methods for details.
common_time_kwargs : dict
Parameters for the method `MultipleSeries.common_time()`. Will use interpolation by default.
seed : float or int
random seed for isopersistent and isospectral methods
mute_pbar : bool, optional
Mute the progress bar. The default is False.
Returns
-------
corr : pyleoclim.Corr
the result object, containing
- r : float
correlation coefficient
- p : float
the p-value
- signif : bool
true if significant; false otherwise
Note that signif = True if and only if p <= alpha.
- alpha : float
the significance level
See also
--------
pyleoclim.utils.correlation.corr_sig : Correlation function (marked for deprecation)
pyleoclim.utils.correlation.association : SciPy measures of association between variables
pyleoclim.series.surrogates : parametric and non-parametric surrogates of any Series object
pyleoclim.multipleseries.common_time : Aligning time axes
References
----------
[1] Hu, J., J. Emile-Geay, and J. Partin (2017), Correlation-based interpretations of paleoclimate data – where statistics meet past climates, Earth and Planetary Science Letters, 459, 362–371, doi:10.1016/j.epsl.2016.11.048.
Examples
--------
Correlation between the Nino3.4 index and the Deasonalized All Indian Rainfall Index
.. jupyter-execute::
import pyleoclim as pyleo
ts_air = pyleo.utils.load_dataset('AIR')
ts_nino = pyleo.utils.load_dataset('NINO3')
# with `nsim=20` and default `method='phaseran'`
# set an arbitrary random seed to fix the result
corr_res = ts_nino.correlation(ts_air, settings={'nsim': 20}, seed=2333)
print(corr_res)
# changing the statistic
corr_res = ts_nino.correlation(ts_air, statistic='kendalltau')
print(corr_res)
# using a simple t-test with DOFs adjusted for autocorrelation
# set an arbitrary random seed to fix the result
corr_res = ts_nino.correlation(ts_air, method='ttest')
print(corr_res)
# using "isopersistent" surrogates (AR(1) simulation)
# set an arbitrary random seed to fix the result
corr_res = ts_nino.correlation(ts_air, method = 'ar1sim', settings={'nsim': 20}, seed=2333)
print(corr_res)
'''
if method == 'isospectral':
warnings.warn("isospectral is deprecated and was replaced by 'phaseran'",
DeprecationWarning, stacklevel=2)
method = 'phaseran'
elif method == 'isopersistent':
warnings.warn("isopersistent is deprecated and was replaced by 'ar1sim'",
DeprecationWarning, stacklevel=2)
method = 'ar1sim'
settings = {} if settings is None else settings.copy()
corr_args = {'alpha': alpha, 'method': method}
corr_args.update(settings)
ms = MultipleSeries([self, target_series])
if list(self.time) != list(target_series.time):
common_time_kwargs = {} if common_time_kwargs is None else common_time_kwargs.copy()
ct_args = {'method': 'interp'}
ct_args.update(common_time_kwargs)
ms = ms.common_time(**ct_args)
if timespan is None:
ts0 = ms.series_list[0]
ts1 = ms.series_list[1]
else:
ts0 = ms.series_list[0].slice(timespan)
ts1 = ms.series_list[1].slice(timespan)
if seed is not None:
np.random.seed(seed)
if method == 'ttest':
stat, signf, pval = corrutils.corr_ttest(ts0.value, ts1.value, alpha=alpha)
signif = bool(signf)
elif method == 'built-in':
if statistic == 'weightedtau':
raise ValueError('The null distribution of this statistic is unknown; please use a non-parametric method to obtain the p-value')
else:
res = corrutils.association(ts0.value,ts1.value,statistic)
stat = res[0]
pval = res.pvalue if len(res) > 1 else np.nan
signif = pval <= alpha
elif method in supported_surrogates:
number = corr_args['nsim'] if 'nsim' in corr_args.keys() else 1000
seed = corr_args['seed'] if 'seed' in corr_args.keys() else None
method = corr_args['method'] if 'method' in corr_args.keys() else None
surr_settings = corr_args['surr_settings'] if 'surr_settings' in corr_args.keys() else None
# compute correlation statistic
stat = corrutils.association(ts0.value,ts1.value,statistic)[0]
ts0_surr = ts0.surrogates(number=number, seed=seed,
method=method, settings=surr_settings)
ts1_surr = ts1.surrogates(number=number, seed=seed,
method=method, settings=surr_settings)
stat_surr = np.empty((number))
for i in tqdm(range(number), desc='Evaluating association on surrogate pairs', total=number, disable=mute_pbar):
stat_surr[i] = corrutils.association(ts0_surr.series_list[i].value,
ts1_surr.series_list[i].value,statistic)[0]
# obtain p-value
pval = np.sum(np.abs(stat_surr) >= np.abs(stat)) / number
# establish significance
signif = pval <= alpha
else:
raise ValueError(f'Unknown method: {method}. Look up documentation for a wiser choice.')
corr = Corr(stat, pval, signif, alpha) # assemble Correlation result object
return corr
[docs] def causality(self, target_series, method='liang', timespan=None, settings=None, common_time_kwargs=None):
''' Perform causality analysis with the target timeseries. Specifically, whether there is information in the target series that influenced the original series.
If the two series have different time axes, they are first placed on a common timescale (in ascending order).
Parameters
----------
target_series : Series
A pyleoclim Series object on which to compute causality
method : {'liang', 'granger'}
The causality method to use.
timespan : tuple
The time interval over which to perform the calculation
settings : dict
Parameters associated with the causality methods. Note that each method has different parameters. See individual methods for details
common_time_kwargs : dict
Parameters for the method `MultipleSeries.common_time()`. Will use interpolation by default.
Returns
-------
res : dict
Dictionary containing the results of the the causality analysis. See indivudal methods for details
See also
--------
pyleoclim.utils.causality.liang_causality : Liang causality
pyleoclim.utils.causality.granger_causality : Granger causality
Examples
--------
Liang causality
.. jupyter-execute::
import pyleoclim as pyleo
ts_nino=pyleo.utils.load_dataset('NINO3')
ts_air=pyleo.utils.load_dataset('AIR')
We use the specific params below to lighten computations; you may drop `settings` for real work
.. jupyter-execute::
liang_N2A = ts_air.causality(ts_nino, settings={'nsim': 20, 'signif_test': 'isopersist'})
print(liang_N2A)
liang_A2N = ts_nino.causality(ts_air, settings={'nsim': 20, 'signif_test': 'isopersist'})
print(liang_A2N)
liang_N2A['T21']/liang_A2N['T21']
Both information flows (T21) are positive, but the flow from NINO3 to AIR is about 3x as large as the other way around, suggesting that NINO3 influences AIR much more than the other way around, which conforms to physical intuition.
To implement Granger causality, simply specfiy the method:
.. jupyter-execute::
granger_A2N = ts_nino.causality(ts_air, method='granger')
granger_N2A = ts_air.causality(ts_nino, method='granger')
Note that the output is fundamentally different for the two methods. Granger causality cannot discriminate between NINO3 -> AIR or AIR -> NINO3, in this case. This is not unusual, and one reason why it is no longer in wide use.
'''
# ensure prograde time
ms = MultipleSeries([self.sort(), target_series.sort()])
# Put on common axis if necessary
if list(self.time) != list(target_series.time):
common_time_kwargs = {} if common_time_kwargs is None else common_time_kwargs.copy()
ct_args = {'method': 'interp'}
ct_args.update(common_time_kwargs)
ms = ms.common_time(**ct_args)
if timespan is None:
value1 = ms.series_list[0].value
value2 = ms.series_list[1].value
else:
value1 = ms.series_list[0].slice(timespan).value
value2 = ms.series_list[1].slice(timespan).value
settings = {} if settings is None else settings.copy()
spec_func={
'liang':causalutils.liang_causality,
'granger':causalutils.granger_causality}
args = {}
args['liang'] = {}
args['granger'] = {}
args[method].update(settings)
causal_res = spec_func[method](value1, value2, **args[method])
return causal_res
[docs] def surrogates(self, method='ar1sim', number=1, length=None, seed=None, settings=None):
''' Generate surrogates of the Series object according to "method"
For now, assumes uniform spacing and increasing time axis
Parameters
----------
method : {ar1sim, phaseran}
The method used to generate surrogates of the timeseries
Note that phaseran assumes an odd number of samples. If the series
has even length, the last point is dropped to satisfy this requirement
number : int
The number of surrogates to generate
length : int
Length of the series
seed : int
Control seed option for reproducibility
settings : dict
Parameters for surrogate generator. See individual methods for details.
Returns
-------
surr : SurrogateSeries
See also
--------
pyleoclim.utils.tsmodel.ar1_sim : AR(1) simulator
pyleoclim.utils.tsutils.phaseran : phase randomization
'''
settings = {} if settings is None else settings.copy()
args = {}
time = self.time
args['ar1sim'] = {'t': time}
args['phaseran'] = {}
args[method].update(settings)
if seed is not None:
np.random.seed(seed)
if method == 'ar1sim':
surr_res = tsmodel.ar1_sim(self.value, number, **args[method])
elif method == 'phaseran':
if self.is_evenly_spaced():
surr_res = tsutils.phaseran2(self.value, number, **args[method])
else:
raise ValueError("Phase-randomization presently requires evenly-spaced series.")
# elif method == 'uar1':
# # TODO : implement Lionel's ML method
# elif method == 'power-law':
# # TODO : implement Stochastic
# elif method == 'fBm':
# # TODO : implement Stochastic
if len(np.shape(surr_res)) == 1:
surr_res = surr_res[:, np.newaxis]
s_list = []
for i, s in enumerate(surr_res.T):
s_tmp = Series(time=time, value=s, # will need reformation after uar1 pull
time_name=self.time_name,
time_unit=self.time_unit,
value_name=self.value_name,
value_unit=self.value_unit,
label = str(self.label or '') + " surr #" + str(i+1),
verbose=False, auto_time_params=True)
s_list.append(s_tmp)
surr = SurrogateSeries(series_list=s_list,
label = self.label,
surrogate_method=method,
surrogate_args=args[method])
return surr
[docs] def outliers(self,method='kmeans',remove=True, settings=None,
fig_outliers=True, figsize_outliers=[10,4], plotoutliers_kwargs=None, savefigoutliers_settings=None,
fig_clusters=True,figsize_clusters=[10,4], plotclusters_kwargs=None,savefigclusters_settings=None, keep_log=False):
"""
Remove outliers from timeseries data. The method employs clustering to identify clusters in the data, using the k-means and DBSCAN algorithms from scikit-learn. Points falling a certain distance from the cluster (either away from the centroid for k-means or in a area of low density for DBSCAN) are considered outliers.
The silhouette score is used to optimize parameter values.
A tutorial explaining how to use this method and set the parameters is available at https://github.com/LinkedEarth/PyleoTutorials/blob/main/notebooks/L2_outliers_detection.ipynb.
Parameters
----------
method : str, {'kmeans','DBSCAN'}, optional
The clustering method to use. The default is 'kmeans'.
remove : bool, optional
If True, removes the outliers. The default is True.
settings : dict, optional
Specific arguments for the clustering functions. The default is None.
fig_outliers : bool, optional
Whether to display the timeseries showing the outliers. The default is True.
figsize_outliers : list, optional
The dimensions of the outliers figure. The default is [10,4].
plotoutliers_kwargs : dict, optional
Arguments for the plot displaying the outliers. The default is None.
savefigoutliers_settings : dict, optional
Saving options for the outlier plot. The default is None.
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
fig_clusters : bool, optional
Whether to display the clusters. The default is True.
figsize_clusters : list, optional
The dimensions of the cluster figures. The default is [10,4].
plotclusters_kwargs : dict, optional
Arguments for the cluster plot. The default is None.
savefigclusters_settings : dict, optional
Saving options for the cluster plot. The default is None.
- "path" must be specified; it can be any existed or non-existed path,
with or without a suffix; if the suffix is not given in "path", it will follow "format"
- "format" can be one of {"pdf", "eps", "png", "ps"}
keep_log : Boolean
if True, adds the previous method parameters to the series log.
Returns
-------
ts: Series
A new Series object without outliers if remove is True. Otherwise, returns the original timeseries
See also
--------
pyleoclim.utils.tsutils.detect_outliers_DBSCAN : Outlier detection using the DBSCAN method
pyleoclim.utils.tsutils.detect_outliers_kmeans : Outlier detection using the kmeans method
pyleoclim.utils.tsutils.remove_outliers : Remove outliers from the series
Examples
--------
.. jupyter-execute::
import pyleoclim as pyleo
LR04 = pyleo.utils.load_dataset('LR04')
LR_out = LR04.detrend().standardize().outliers(method='kmeans')
To set the number of clusters:
.. jupyter-execute::
LR_out = LR04.detrend().standardize().outliers(method='kmeans', settings={'nbr_clusters':2})
The log contains diagnostic information, to access it, set the keep_log parameter to True:
.. jupyter-execute::
LR_out = LR04.detrend().standardize().outliers(method='kmeans', settings={'nbr_clusters':2}, keep_log=True)
"""
if method not in ['kmeans','DBSCAN']:
raise ValueError('method should either be "kmeans" or "DBSCAN"')
# run the algorithm
settings = {} if settings is None else settings.copy()
spec_func={
'kmeans':tsutils.detect_outliers_kmeans,
'DBSCAN':tsutils.detect_outliers_DBSCAN}
args = {}
args['kmeans'] = {}
args['DBSCAN'] = {}
args[method].update(settings)
indices, res = spec_func[method](self.value,**args[method])
# Create the new Series object
new=self.copy()
if remove==True:
if len(indices)>=1:
ys,ts=tsutils.remove_outliers(self.time,self.value,indices)
new.value=ys
new.time=ts
# Figures
# Optional parameters
savefigoutliers_settings = {} if savefigoutliers_settings is None else savefigoutliers_settings.copy()
savefigclusters_settings = {} if savefigclusters_settings is None else savefigclusters_settings.copy()
plotoutliers_kwargs = {} if plotoutliers_kwargs is None else plotoutliers_kwargs.copy()
plotclusters_kwargs = {} if plotclusters_kwargs is None else plotclusters_kwargs.copy()
# Figure showing the outliers
if fig_outliers == True:
fig,ax = plt.subplots(figsize=figsize_outliers)
time_label, value_label = self.make_labels()
if 'xlabel' not in plotoutliers_kwargs.keys():
xlabel = time_label
else:
xlabel = plotoutliers_kwargs['xlabel']
plotoutliers_kwargs.pop('xlabel')
if 'ylabel' not in plotoutliers_kwargs.keys():
ylabel = value_label
else:
ylabel = plotoutliers_kwargs['ylabel']
plotoutliers_kwargs.pop('ylabel')
if 'title' not in plotoutliers_kwargs.keys():
title = None
else:
title = plotoutliers_kwargs['title']
plotoutliers_kwargs.pop('title')
if 'xlim' not in plotoutliers_kwargs.keys():
xlim = None
else:
xlim = plotoutliers_kwargs['xlim']
plotoutliers_kwargs.pop('xlim')
if 'ylim' not in plotoutliers_kwargs.keys():
ylim = None
else:
ylim = plotoutliers_kwargs['ylim']
plotoutliers_kwargs.pop('ylim')
if 'legend' not in plotoutliers_kwargs.keys():
legend = True
else:
legend = plotoutliers_kwargs['legend']
plotoutliers_kwargs.pop('legend')
if len(indices)>=1:
plotting.plot_scatter_xy(self.time,self.value,self.time[indices],self.value[indices],
xlabel=xlabel,ylabel=ylabel,
title = title, xlim=xlim, ylim=ylim, legend=legend,
plot_kwargs=plotoutliers_kwargs,ax=ax)
else:
plotting.plot_xy(self.time,self.value,
xlabel=xlabel,ylabel=ylabel,
title = title, xlim=xlim, ylim=ylim, legend=legend,
plot_kwargs=plotoutliers_kwargs,ax=ax)
#Saving options
if 'path' in savefigoutliers_settings:
plotting.savefig(fig,settings=savefigoutliers_settings)
if fig_clusters == True:
fig,ax = plt.subplots(figsize=figsize_clusters)
# dealt with plot options
time_label, value_label = self.make_labels()
if 'xlabel' not in plotclusters_kwargs.keys():
xlabel = time_label
else:
xlabel = plotclusters_kwargs['xlabel']
plotclusters_kwargs.pop('xlabel')
if 'ylabel' not in plotclusters_kwargs.keys():
ylabel = value_label
else:
ylabel = plotclusters_kwargs['ylabel']
plotclusters_kwargs.pop('ylabel')
if 'title' not in plotclusters_kwargs.keys():
title = None
else:
title = plotclusters_kwargs['title']
plotclusters_kwargs.pop('title')
if 'xlim' not in plotclusters_kwargs.keys():
xlim = None
else:
xlim = plotclusters_kwargs['xlim']
plotclusters_kwargs.pop('xlim')
if 'ylim' not in plotclusters_kwargs.keys():
ylim = None
else:
ylim = plotclusters_kwargs['ylim']
plotclusters_kwargs.pop('ylim')
if 'legend' not in plotclusters_kwargs.keys():
legend = True
else:
legend = plotclusters_kwargs['legend']
plotclusters_kwargs.pop('legend')
clusters = np.array(res.loc[res['silhouette score']==np.max(res['silhouette score'])]['clusters'])[0]
if 'c' not in plotclusters_kwargs.keys():
color_list = list(mcolors.CSS4_COLORS.keys())
color_list.remove('red')
random.Random(9).shuffle(color_list)
colors = color_list[0:len(np.unique(clusters))]
vectorizer = np.vectorize(lambda x: colors[x % len(colors)])
c = vectorizer(clusters)
else:
c = plotclusters_kwargs['c']
plotclusters_kwargs.pop('c')
plotting.scatter_xy(self.time,self.value,c = c, xlabel=xlabel,ylabel=ylabel,
title = title, xlim=xlim, ylim=ylim, legend=legend,
plot_kwargs = plotclusters_kwargs, ax=ax)
#plot
if np.size(indices) != 0:
plotting.scatter_xy(self.time[indices],self.value[indices],c='red',ax=ax)
if 'path' in savefigclusters_settings:
plotting.savefig(fig,settings=savefigclusters_settings)
#return the log if asked
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log): 'outliers','method': method,
'args': settings,
'results': res},)
return new
[docs] def interp(self, method='linear', keep_log= False, **kwargs):
'''Interpolate a Series object onto a new time axis
Parameters
----------
method : {‘linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’}
where ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a spline interpolation of zeroth, first, second or third order; ‘previous’ and ‘next’ simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is ‘linear’.
keep_log : Boolean
if True, adds the method name and its parameters to the series log.
kwargs :
Arguments specific to each interpolation function. See pyleoclim.utils.tsutils.interp for details
Returns
-------
new : Series
An interpolated Series object
See also
--------
pyleoclim.utils.tsutils.interp : interpolation function
'''
new = self.copy()
ti, vi = tsutils.interp(self.time,self.value,interp_type=method,**kwargs)
new.time = ti
new.value = vi
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'interp', 'method': method, 'args': kwargs},)
return new
[docs] def gkernel(self, step_style='max', keep_log = False, step_type=None, **kwargs):
''' Coarse-grain a Series object via a Gaussian kernel.
Like .bin() this technique is conservative and uses the max space between points
as the default spacing. Unlike .bin(), gkernel() uses a gaussian kernel to
calculate the weighted average of the time series over these intervals.
Note that if the series being examined has very low resolution sections with few points,
you may need to tune the parameter for the kernel e-folding scale (h).
Parameters
----------
step_style : str
type of timestep: 'mean', 'median', or 'max' of the time increments
keep_log : Boolean
if True, adds the step type and its keyword arguments to the series log.
kwargs :
Arguments for kernel function. See pyleoclim.utils.tsutils.gkernel for details
Returns
-------
new : Series
The coarse-grained Series object
See also
--------
pyleoclim.utils.tsutils.gkernel : application of a Gaussian kernel
'''
if step_type is not None:
warnings.warn("step_type is deprecated. Please use step_style instead",
DeprecationWarning, stacklevel=2)
new=self.copy()
ti, vi = tsutils.gkernel(self.time, self.value, step_style=step_style, **kwargs) # apply kernel
new.time = ti
new.value = vi
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'gkernel', 'step_type': step_type, 'args': kwargs},)
return new
[docs] def bin(self, keep_log = False, **kwargs):
'''Bin values in a time series
Parameters
----------
keep_log : Boolean
if True, adds this step and its parameters to the series log.
kwargs :
Arguments for binning function. See pyleoclim.utils.tsutils.bin for details
Returns
-------
new : Series
An binned Series object
See also
--------
pyleoclim.utils.tsutils.bin : bin the series values into evenly-spaced time bins
'''
new=self.copy()
res_dict = tsutils.bin(self.time,self.value,**kwargs)
new.time = res_dict['bins']
new.value = res_dict['binned_values']
if keep_log == True:
if new.log is None:
new.log=()
new.log += ({len(new.log):'bin', 'args': kwargs},)
return new
[docs] def resample(self, rule, keep_log = False, **kwargs):
"""
Run analogue to pandas.Series.resample.
This is a convenience method: doing
ser.resample('AS').mean()
will do the same thing as
ser.pandas_method(lambda x: x.resample('AS').mean())
but will also accept some extra resampling rules, such as `'Ga'` (see below).
Parameters
----------
rule : str
The offset string or object representing target conversion.
Can also accept pyleoclim units, such as 'ka' (1000 years),
'Ma' (1 million years), and 'Ga' (1 billion years).
Check the [pandas resample docs](https://pandas.pydata.org/docs/dev/reference/api/pandas.DataFrame.resample.html)
for more details.
kwargs : dict
Any other arguments which will be passed to pandas.Series.resample.
Returns
-------
SeriesResampler
Resampler object, not meant to be used to directly. Instead,
an aggregation should be called on it, see examples below.
Examples
--------
.. jupyter-execute::
ts = pyleo.utils.load_dataset('LR04')
ts5k = ts.resample('5ka').mean()
fig, ax = ts.plot(invert_yaxis='True',xlim=[0, 1000])
ts5k.plot(ax=ax,color='C1')
"""
search = re.search(r'(\d*)([a-zA-Z]+)', rule)
if search is None:
raise ValueError(f"Invalid rule provided, got: {rule}")
md = self.metadata
if md['label'] is not None:
md['label'] = md['label'] + ' (' + rule + ' resampling)'
multiplier = search.group(1)
if multiplier == '':
multiplier = 1
else:
multiplier = int(multiplier)
unit = search.group(2)
if unit.lower() in tsbase.MATCH_A:
rule = f'{multiplier}YS'
elif unit.lower() in tsbase.MATCH_KA:
rule = f'{1_000*multiplier}YS'
elif unit.lower() in tsbase.MATCH_MA:
rule = f'{1_000_000*multiplier}YS'
elif unit.lower() in tsbase.MATCH_GA:
rule = f'{1_000_000_000*multiplier}YS'
ser = self.to_pandas()
return SeriesResampler(rule, ser, md, keep_log, kwargs)
[docs] def resolution(self):
"""Generate a resolution object
Increments are assigned to the preceding time value.
E.g. for time_axis = [0,1,3], resolution.resolution = [1,2] resolution.time = [0,1]
Returns
-------
resolution : Resolution
Resolution object
See Also
--------
pyleoclim.core.resolutions.Resolution
Examples
--------
To create a resolution object, apply the .resolution() method to a Series object
.. jupyter-execute::
ts = pyleo.utils.load_dataset('EDC-dD')
resolution = ts.resolution()
Several methods are then available:
Summary statistics can be obtained via .describe()
.. jupyter-execute::
resolution.describe()
A simple plot can be created using .plot()
.. jupyter-execute::
resolution.plot()
The distribution of resolution
.. jupyter-execute::
resolution.histplot()
Or a dashboard combining plot() and histplot() side by side:
.. jupyter-execute::
resolution.dashboard()
"""
copy = self.copy()
x = copy.time
v = copy.value
if any(np.isnan(v)):
v,x = tsbase.dropna(ts=x,ys=v)
copy.time = x
copy.value = v
res,_,_ = tsbase.resolution(x=x)
resolution = Resolution(
resolution = res,
time = x[1:],
resolution_unit= self.time_unit,
label= self.label,
timeseries = copy
)
return resolution
class SeriesResampler:
"""
This is only meant to be used internally, and is not meant to
be public-facing or to be used directly by users.
If users call
ts.resample('1Y').mean()
then they will get back a pyleoclim.Series, and `SeriesResampler`
will only be used in an intermediate step. Think of it as an
implementation detail.
"""
def __init__(self, rule, series, metadata, keep_log, kwargs):
self.rule = rule
self.series = series
self.metadata = metadata
self.keep_log = keep_log
self.kwargs = kwargs
def __getattr__(self, attr):
attr = getattr(self.series.resample(self.rule, **self.kwargs), attr)
def func(*args, **kwargs):
series = attr(*args, **kwargs)
series.index = series.index + (series.index[1] - series.index[0])/2 # sample midpoints
_, __, direction = tsbase.time_unit_to_datum_exp_dir(self.metadata['time_unit'], self.metadata['time_name'])
if direction == 'prograde':
from_pandas = Series.from_pandas(series, metadata=self.metadata)
else:
from_pandas = Series.from_pandas(series.sort_index(ascending=False), metadata=self.metadata)
if self.keep_log == True:
if from_pandas.log is None:
from_pandas.log=()
from_pandas.log += ({len(from_pandas.log): 'resample','rule': self.rule},)
return from_pandas
return func