# -*- coding: utf-8 -*-
"""
Utilities to manipulate LiPD files and automate data transformation whenever possible.
These functions are used throughout Pyleoclim but are not meant for direct interaction by users.
Also handles integration with the LinkedEarth wiki and the LinkedEarth Ontology.
"""
import numpy as np
import os
import json
import requests
import wget
from bs4 import BeautifulSoup
import string
[docs]class CaseInsensitiveDict(dict):
def __setitem__(self, key, value):
super().__setitem__(key.lower().replace(" ", ""), value)
def __getitem__(self, key):
return super().__getitem__(key.lower().replace(" ", ""))
# Old one:
# PLOT_DEFAULT = {'GroundIce': ['#86CDFA', 'h'],
# 'Borehole': ['#00008b', 'h'],
# 'Coral': ['#FF8B00', 'o'],
# 'Documents': ['#f8d568', 'p'],
# 'GlacierIce': ['#86CDFA', 'd'],
# 'Hybrid': ['#808000', '*'],
# 'LakeSediment': ['#8A4513', '^'],
# 'MarineSediment': ['#8A4513', 's'],
# 'Sclerosponge': ['r', 'o'],
# 'Speleothem': ['#FF1492', 'd'],
# 'Wood': ['#32CC32', '^'],
# 'MolluskShell': ['#FFD600', 'h'],
# 'Peat': ['#2F4F4F', '*'],
# 'Midden': ['#824E2B', 'o'],
# 'FluvialSediment': ['#4169E0','o'],
# 'TerrestrialSediment': ['#8A4513','o'],
# 'Shoreline': ['#add8e6','o'],
# 'Instrumental' : ['#8f21d8', '*'],
# 'Model' : ['#b4a7d6', "d"],
# 'Other': ['k', 'o']
# }
PLOT_DEFAULT = {'GlacierIce': ['deepskyblue', '*'],
'GroundIce': ['slategray', '*'],
'Borehole': ['#FFD600', 's'],
'Coral': ['#FF8B00', 'v'],
'Sclerosponge': ['r', 'v'],
'Documents': ['#f8d568', 'p'],
'Hybrid': ['#808000', 'H'],
'LakeSediment': ['#1170aa', 'o'],
'MarineSediment': ['#8A4513', 'o'],
'FluvialSediment': ['#5fa2ce','o'],
'TerrestrialSediment': ['#57606c','o'],
'Speleothem': ['#FF1492', 'd'],
'Wood': ['#8CD17D', '^'],
'MolluskShell': ['#f8d568', 'h'],
'Peat': ['#8A9A5B', 'X'],
'Midden': ['#824E2B', 'X'],
'Shoreline': ['#40826D','o'],
'Instrumental' : ['#B07AA1', 'D'],
'Model' : ['#E15759', "D"],
'Other': ['k', 'o']
}
# as per lipd convention, communicated by David Edge, 06.25.2024
# var colorPal = {"Borehole":"#FFD600","MolluskShell":"#7b03fc","GlacierIce":"#86CDFA","GroundIce":"#ff6db6","Coral":"#FF8B00","FluvialSediment":"#4169E0","LakeSediment":"#8f8fa1","MarineSediment":"#8A4513","Speleothem":"#FF1492","Midden":"#824E2B","Peat":"#8A9A5B","Sclerosponge":"#D2042D","Shoreline":"#40826D","Wood":"#32CC32","TerrestrialSediment":"#d2b48c"}
# var shapePal ={"Borehole":"square","MolluskShell":"triangle","GlacierIce":"snowflake","GroundIce":"snowflake","Coral":"triangle-down","FluvialSediment":"circle","LakeSediment":"circle","MarineSediment":"circle","Speleothem":"square","Midden":"diamond","Peat":"triangle-down","Sclerosponge":"triangle","Shoreline":"diamond","Wood":"triangle","TerrestrialSediment":"circle"}
"""
The followng functions handle web scrapping to grab information regarding the controlled vocabulary
"""
[docs]def get_archive_type():
'''
Scrape the LiPDverse website to obtain the list of possible archives and associated synonyms
Returns
-------
res : Dictionary
Keys correspond to the preferred terms and values represent known synonyms
'''
url = "https://lipdverse.org/vocabulary/archivetype/"
response = requests.get(url)
if response.status_code == 200:
# Parse the content of the request with BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
# Get the names of the archiveTypes
h3_tags = soup.find_all('h3')
archiveName = []
for item in h3_tags:
archiveName.append(item.get_text())
# Get the known synonyms
h4_tags = soup.find_all('h4', string="Known synonyms")
synonyms=[]
for h4_tag in h4_tags:
next_element = h4_tag.find_next_sibling()
found_p = False
while next_element and next_element.name != 'div':
if next_element.name == 'p':
synonyms_text = next_element.get_text()
words = [word.strip() for word in synonyms_text.split(',')]
synonyms.append(words)
found_p = True
break
next_element = next_element.find_next_sibling()
# If a <p> tag was not found, insert an empty string
if not found_p:
synonyms.append([])
#create a dictionary for the results
res= {}
for idx,item in enumerate(archiveName):
res[item]=synonyms[idx]
else:
print("failed to retrieve the webpage; returning static list, which may be out of date")
res = ["Borehole",
"Coral",
"FluvialSediment",
"GlacierIce",
"GroundIce",
"LakeSediment",
"MarineSediment",
"Midden",
"MolluskShell",
"Peat",
"Scelorosponge",
"Shoreline",
"Spleleothem",
"TerrestrialSediment",
"Wood"]
return res
"""
The following functions handle the LiPD files
"""
[docs]def enumerateLipds(lipds):
"""Enumerate the LiPD files loaded in the workspace
Parameters
----------
lipds : dict
A dictionary of LiPD files.
"""
print("Below are the available records")
lipds_list = [val for val in lipds.keys()]
for idx, val in enumerate(lipds_list):
print(idx,': ',val)
[docs]def getLipd(lipds):
"""Prompt for a LiPD file
Ask the user to select a LiPD file from a list
Use this function in conjunction with enumerateLipds()
Parameters
----------
lipds : dict
A dictionary of LiPD files. Can be obtained from
pyleoclim.readLipd()
Returns
-------
select_lipd : int
The index of the LiPD file
"""
enumerateLipds(lipds)
lipds_list = [val for val in lipds.keys()]
choice = int(input("Enter the number of the file: "))
lipd_name = lipds_list[choice]
select_lipd = lipds[lipd_name]
return select_lipd
"""
The following functions work at the variables level
"""
[docs]def promptForVariable():
"""Prompt for a specific variable
Ask the user to select the variable they are interested in.
Use this function in conjunction with readHeaders() or getTSO()
Returns
-------
select_var : int
The index of the variable
"""
select_var = int(input("Enter the number of the variable you wish to use: "))
return select_var
[docs]def xAxisTs(timeseries):
""" Get the x-axis for the timeseries.
Parameters
----------
timeseries : dict
a timeseries object
Returns
-------
x_axis : array
the values for the x-axis representation
label : string
returns either "age", "year", or "depth"
"""
if "depth" in timeseries.keys() and "age" in timeseries.keys() or\
"depth" in timeseries.keys() and "year" in timeseries.keys():
print("Both time and depth information available, selecting time")
if "age" in timeseries.keys() and "year" in timeseries.keys():
print("Both age and year representation available, selecting age")
x_axis = timeseries["age"]
label = "age"
elif "year" in timeseries.keys():
x_axis = timeseries["year"]
label = "year"
elif "age" in timeseries.keys():
x_axis = timeseries["age"]
elif "depth" in timeseries.keys():
x_axis = timeseries["depth"]
label = "depth"
elif "age" in timeseries.keys():
x_axis = timeseries["age"]
label = "age"
elif "year" in timeseries.keys():
x_axis = timeseries["year"]
label = "year"
else:
raise KeyError("No age or depth information available")
return x_axis, label
[docs]def checkXaxis(timeseries, x_axis= None):
"""Check that a x-axis is present for the timeseries
Parameters
----------
timeseries : dict
a timeseries
x_axis : string
the x-axis representation, either depth, age or year
Returns
-------
x : array
the values for the x-axis representation
label : string
returns either "age", "year", or "depth"
"""
if x_axis is None:
x, label = xAxisTs(timeseries)
x = np.array(x, dtype = 'float64')
elif x_axis == "depth":
if not "depth" in timeseries.keys():
raise ValueError("Depth not available for this record")
else:
x = np.array(timeseries['depth'], dtype = 'float64')
label = "depth"
elif x_axis == "age":
if not "age" in timeseries.keys():
raise ValueError("Age not available for this record")
else:
x = np.array(timeseries['age'], dtype = 'float64')
label = "age"
elif x_axis == "year":
if not "year" in timeseries.keys():
raise ValueError("Year not available for this record")
else:
x = np.array(timeseries['year'], dtype = 'float64')
label = "year"
else:
raise KeyError("enter either 'depth','age',or 'year'")
return x, label
[docs]def checkTimeAxis(timeseries, x_axis = None):
""" This function makes sure that time is available for the timeseries
Parameters
----------
timeseries : dict
A LiPD timeseries object
Returns
-------
x : array
the time values for the timeseries
label : string
the time representation for the timeseries
"""
if x_axis is None:
if not 'age' in timeseries.keys() and not 'year' in timeseries.keys():
raise KeyError("No time information available")
elif 'age' in timeseries.keys() and 'year' in timeseries.keys():
print("Both age and year information are available, using age")
label = 'age'
elif 'age' in timeseries.keys():
label = 'age'
elif 'year' in timeseries.keys():
label = 'year'
elif x_axis == 'age':
if not 'age' in timeseries.keys():
raise KeyError('Age is not available for this record')
else:
label = 'age'
elif x_axis == 'year':
if not 'year' in timeseries.keys():
raise KeyError('Year is not available for this record')
else:
label='year'
else:
raise KeyError('Only None, year and age are valid entries for x_axis parameter')
x = np.array(timeseries[label], dtype = 'float64')
return x, label
[docs]def searchVar(timeseries_list, key, exact = True, override = True):
""" This function searched for keywords (exact match) for a variable
Parameters
----------
timeseries_list : list
A list of available series
key : list
A list of keys to search
exact : bool
if True, looks for an exact match.
override : bool
if True, override the exact match if no match is found
Returns
-------
match : list
A list of keys for the timeseries that match the selection
criteria.
"""
# Make sure thaat the keys are contained in a list
if type(key) is not list:
if type(key) is str:
key = [key]
else:
raise TypeError("Key terms should be entered as a list")
match = []
if exact == True:
#Search for exact match with the key
for keyVal in key:
for val in timeseries_list.keys():
ts_temp = timeseries_list[val]
if "variableName" in ts_temp.keys():
name = ts_temp["variableName"]
if keyVal.lower() == name.lower():
match.append(val)
elif "paleoData_variableName" in ts_temp.keys():
name = ts_temp["paleoData_variableName"]
if keyVal.lower() == name.lower():
match.append(val)
elif "chronData_variableName" in ts_temp.keys():
name = ts_temp["chronData_variableName"]
if keyVal.lower() == name.lower():
match.append(val)
elif "ProxyObservationType" in ts_temp.keys():
name = ts_temp["ProxyObservationType"]
if keyVal.lower() == name.lower():
match.append(val)
elif "paleoData_proxyObservationType" in ts_temp.keys():
name = ts_temp["paleoData_proxyObservationType"]
if keyVal.lower() == name.lower():
match.append(val)
elif "chronData_proxyObservationType" in ts_temp.keys():
name = ts_temp["chronData_proxyObservationType"]
if keyVal.lower() == name.lower():
match.append(val)
elif "InferredVariableType" in ts_temp.keys():
name = ts_temp["InferredVariableType"]
if keyVal.lower() == name.lower():
match.append(val)
elif "paleoData_inferredVariableType" in ts_temp.keys():
name = ts_temp["paleoData_inferredVariableType"]
if keyVal.lower() == name.lower():
match.append(val)
elif "chronData_inferredVariableType" in ts_temp.keys():
name = ts_temp["chronData_inferredVariableType"]
if keyVal.lower() == name.lower():
match.append(val)
else:
# Search for the word in the ley
for keyVal in key:
for val in timeseries_list.keys():
ts_temp = timeseries_list[val]
if "variableName" in ts_temp.keys():
name = ts_temp["variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_variableName" in ts_temp.keys():
name = ts_temp["paleoData_variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_variableName" in ts_temp.keys():
name = ts_temp["chronData_variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "ProxyObservationType" in ts_temp.keys():
name = ts_temp["ProxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_proxyObservationType" in ts_temp.keys():
name = ts_temp["paleoData_proxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_proxyObservationType" in ts_temp.keys():
name = ts_temp["chronData_proxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "InferredVariableType" in ts_temp.keys():
name = ts_temp["InferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_inferredVariableType" in ts_temp.keys():
name = ts_temp["paleoData_inferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_inferredVariableType" in ts_temp.keys():
name = ts_temp["chronData_inferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
# Expand the search if asked
if not match and exact == True and override == True:
print("No match found on exact search, running partial match")
for keyVal in key:
for val in timeseries_list.keys():
ts_temp = timeseries_list[val]
if "variableName" in ts_temp.keys():
name = ts_temp["variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_variableName" in ts_temp.keys():
name = ts_temp["paleoData_variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_variableName" in ts_temp.keys():
name = ts_temp["chronData_variableName"]
if keyVal.lower() in name.lower():
match.append(val)
elif "ProxyObservationType" in ts_temp.keys():
name = ts_temp["ProxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_proxyObservationType" in ts_temp.keys():
name = ts_temp["paleoData_proxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_proxyObservationType" in ts_temp.keys():
name = ts_temp["chronData_proxyObservationType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "InferredVariableType" in ts_temp.keys():
name = ts_temp["InferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "paleoData_inferredVariableType" in ts_temp.keys():
name = ts_temp["paleoData_inferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
elif "chronData_inferredVariableType" in ts_temp.keys():
name = ts_temp["chronData_inferredVariableType"]
if keyVal.lower() in name.lower():
match.append(val)
# Get the unique entries
match = list(set(match))
# Narrow down if more than one match is found by asking the user
if len(match) > 1:
print("More than one series match your search criteria")
for idx, val in enumerate(match):
print(idx,": ", val)
choice = int(input("Enter the number for the variable: "))
match = match[choice]
elif not match:
print("No match found.")
print("Here are the available variables: ")
v = list(timeseries_list.keys())
for idx, val in enumerate(v):
print(idx,": ",val)
choice = input("Please select the variable you'd like to use or enter to continue: ")
if not choice:
match =""
else:
choice = int(choice)
match = v[choice]
else:
match = match[0]
return match
"""
The following functions handle the time series objects
"""
[docs]def enumerateTs(timeseries_list):
"""Enumerate the available time series objects
Parameters
----------
timeseries_list : list
a list of available timeseries objects.
"""
available_y = []
dataSetName =[]
at =[]
for item in timeseries_list:
if 'dataSetName' in item.keys():
dataSetName.append(item['dataSetName'])
else:
dataSetName.append('NA')
if 'paleoData_variableName' in item.keys():
available_y.append(item['paleoData_variableName'])
elif 'chronData_variableName' in item.keys():
available_y.append(item['chronData_variableName'])
else:
available_y.append('NA')
if 'archiveType' in item.keys():
at.append(item['archiveType'])
else:
at.append('NA')
for idx,val in enumerate(available_y):
print(idx,': ',dataSetName[idx], ': ',at[idx],': ', val)
"""
Functions to handle data on the LinkedEarth Platform
"""
[docs]def LipdToOntology(archiveType):
""" standardize archiveType
Transform the archiveType from their LiPD name to their ontology counterpart
Parameters
----------
archiveType : string
name of the archiveType from the LiPD file
Returns
-------
archiveType : string
archiveType according to the ontology
"""
#Align with the ontology
if archiveType != None:
if archiveType.lower().replace(" ", "") == "icecore":
archiveType = 'GlacierIce'
elif archiveType.lower().replace(" ", "") == "ice-other":
archiveType = 'GlacierIce'
elif archiveType.lower().replace(" ", "") == 'tree':
archiveType = 'Wood'
elif archiveType.lower().replace(" ","") not in [key.lower() for key in PLOT_DEFAULT.keys()]:
archiveType='Other'
return archiveType
[docs]def timeUnitsCheck(units):
""" This function attempts to make sense of the time units by checking for equivalence
Parameters
----------
units : string
The units string for the timeseries
Returns
-------
unit_group : string
Whether the units belongs to age_units, kage_units, year_units, ma_units, or undefined
"""
age_units = ['year B.P.','yr B.P.','yr BP','BP','yrs BP','years B.P.',\
'yr. BP','yr. B.P.', 'cal. BP', 'cal B.P.', \
'year BP','years BP']
kage_units = ['kyr BP','kaBP','ka BP','ky','kyr','kyr B.P.', 'ka B.P.', 'ky BP',\
'kyrs BP','ky B.P.', 'kyrs B.P.', 'kyBP', 'kyrBP']
year_units = ['AD','CE','year C.E.','year A.D.', 'year CE','year AD',\
'years C.E.','years A.D.','yr CE','yr AD','yr C.E.'\
'yr A.D.', 'yrs C.E.', 'yrs A.D.', 'yrs CE', 'yrs AD']
mage_units = ['my BP', 'myr BP', 'myrs BP', 'ma BP', 'ma',\
'my B.P.', 'myr B.P.', 'myrs B.P.', 'ma B.P.']
undefined = ['years', 'yr','year','yrs']
if units in age_units:
unit_group = 'age_units'
elif units in kage_units:
unit_group = 'kage_units'
elif units in year_units:
unit_group = 'year_units'
elif units in undefined:
unit_group = 'undefined'
elif units in mage_units:
unit_group = 'ma_units'
else:
unit_group = 'unknown'
return unit_group
[docs]def whatArchives(print_response=True):
""" Get the names for ArchiveType from LinkedEarth Ontology
Parameters
----------
print_response : bool
Whether to print the results on the console. Default is True
Returns
-------
res : JSON-object with the request from LinkedEarth wiki api
"""
url = "http://wiki.linked.earth/store/ds/query"
query = """PREFIX core: <http://linked.earth/ontology#>
PREFIX wiki: <http://wiki.linked.earth/Special:URIResolver/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT distinct ?a
WHERE {
{
?dataset wiki:Property-3AArchiveType ?a.
}UNION
{
?w core:proxyArchiveType ?t.
?t rdfs:label ?a
}
}"""
response = requests.post(url, data = {'query': query})
res_i = json.loads(response.text)
if print_response == True:
print("The following archive types are available on the wiki:")
for item in res_i['results']['bindings']:
print ("*" + item['a']['value'])
res=[]
for item in res_i['results']['bindings']:
res.append(item['a']['value'])
return res
[docs]def whatProxyObservations(print_response=True):
""" Get the names for ProxyObservations from LinkedEarth Ontology
Parameters
----------
print_response : bool
Whether to print the results on the console. Default is True
Returns
-------
res : JSON-object with the request from LinkedEarth wiki api
"""
url = "http://wiki.linked.earth/store/ds/query"
query = """PREFIX core: <http://linked.earth/ontology#>
PREFIX wiki: <http://wiki.linked.earth/Special:URIResolver/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT distinct ?a
WHERE
{
?w core:proxyObservationType ?t.
?t rdfs:label ?a
}"""
response = requests.post(url, data = {'query': query})
res_i = json.loads(response.text)
if print_response==True:
print("The following proxy observation types are available on the wiki: ")
for item in res_i['results']['bindings']:
print (item['a']['value'])
res=[]
for item in res_i['results']['bindings']:
res.append(item['a']['value'])
return res
[docs]def whatProxySensors(print_response=True):
""" Get the names for ProxySensors from LinkedEarth Ontology
Parameters
----------
print_response : bool
Whether to print the results on the console. Default is True
Returns
-------
res : JSON-object with the request from LinkedEarth wiki api
"""
url = "http://wiki.linked.earth/store/ds/query"
query = """PREFIX core: <http://linked.earth/ontology#>
PREFIX wiki: <http://wiki.linked.earth/Special:URIResolver/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT distinct ?a ?b
WHERE
{
?w core:sensorGenus ?a.
?w core:sensorSpecies ?b .
}"""
response = requests.post(url, data = {'query': query})
res_i = json.loads(response.text)
if print_response == True:
print("The available sensor genus/species are: ")
for item in res_i['results']['bindings']:
print ("*" + 'Genus: '+item['a']['value']+' Species: ' +item['b']['value'])
res=[]
for item in res_i['results']['bindings']:
res.append(item['a']['value'])
return res
[docs]def whatInferredVariables(print_response=True):
""" Get the names for InferredVariables from LinkedEarth Ontology
Parameters
----------
print_response : bool
Whether to print the results on the console. Default is True
Returns
-------
res : JSON-object with the request from LinkedEarth wiki api
"""
url = "http://wiki.linked.earth/store/ds/query"
query = """PREFIX core: <http://linked.earth/ontology#>
PREFIX wiki: <http://wiki.linked.earth/Special:URIResolver/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT distinct ?a
WHERE
{
?w core:inferredVariableType ?t.
?t rdfs:label ?a
}"""
response = requests.post(url, data = {'query': query})
res_i = json.loads(response.text)
if print_response == True:
print("The following Inferred Variable types are available on the wiki: ")
for item in res_i['results']['bindings']:
print ("*" + item['a']['value'])
res=[]
for item in res_i['results']['bindings']:
res.append(item['a']['value'])
return res
[docs]def whatInterpretations(print_response=True):
""" Get the names for interpretations from LinkedEarth Ontology
Parameters
----------
print_response : bool
Whether to print the results on the console. Default is True
Returns
-------
res : JSON-object with the request from LinkedEarth wiki api
"""
url = "http://wiki.linked.earth/store/ds/query"
query = """PREFIX core: <http://linked.earth/ontology#>
PREFIX wiki: <http://wiki.linked.earth/Special:URIResolver/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT distinct ?a ?b
WHERE
{
?w core:name ?a.
?w core:detail ?b .
}"""
response = requests.post(url, data = {'query': query})
res_i = json.loads(response.text)
if print_response==True:
print("The following interpretation are available on the wiki: ")
for item in res_i['results']['bindings']:
print ("*" + 'Name: '+item['a']['value']+' Detail: ' +item['b']['value'])
res=[]
for item in res_i['results']['bindings']:
res.append(item['a']['value'])
return res
[docs]def pre_process_list(list_str):
""" Pre-process a series of strings for capitalized letters, space, and punctuation
Parameters
----------
list_str : list
A list of strings from which to strip capitals, spaces, and other characters
Returns
-------
res : list
A list of strings with capitalization, spaces, and punctuation removed
"""
res=[]
for item in list_str:
res.append(pre_process_str(item))
return res
[docs]def similar_string(list_str, search):
""" Returns a list of indices for strings with similar values
Parameters
----------
list_str : list
A list of strings
search : str
A keyword search
Returns
-------
indices: list
A list of indices with similar value as the keyword
"""
#exact matches
indices = [i for i, x in enumerate(list_str) if x == search]
# proximity matches
return indices
[docs]def pre_process_str(word):
"""Pre-process a string for capitalized letters, space, and punctuation
Parameters
----------
string : str
A string from which to strip capitals, spaces, and other characters
Returns
-------
res : str
A string with capitalization, spaces, and punctuation removed
"""
d=word.replace(" ","").lower()
stopset=list(string.punctuation)
res="".join([i for i in d if i not in stopset])
return res
"""
Deal with models
"""
[docs]def isModel(csvName, lipd):
"""Check for the presence of a model in the same object as the measurement table
Parameters
----------
csvName : string
The name of the csv file corresponding to the measurement table
lipd : dict
A LiPD object
Returns
-------
model : list
List of models already available
dataObject : string
The name of the paleoData or ChronData
object in which the model(s) are stored
"""
csvNameSplit = csvName.split('.')
for val in csvNameSplit:
if "chron" in val or "paleo" in val:
tableName = val
if tableName[0] == 'c':
objectName = 'chron'+tableName.split('chron')[1][0]
dataObject = lipd["chronData"][objectName]
elif tableName[0] == 'p':
objectName = 'paleo'+tableName.split('paleo')[1][0]
dataObject = lipd["paleoData"][objectName]
else:
raise KeyError("Key name should only include 'chron' or 'paleo'")
if "model" in dataObject.keys():
model_list = dataObject["model"]
model = list(model_list.keys())
else:
model=[]
return model, objectName
[docs]def modelNumber(model):
"""Assign a new or existing model number
Parameters
----------
model : list
List of possible model number. Obtained from isModel
Returns
-------
modelNum : int
The number of the model
"""
if model:
print("There is " + str(len(model)) + " model(s) already available.")
print("creating a new model...")
modelNum = len(model)
print("Your new model number is "+ str(modelNum))
else:
print("No previous model available. Creating a new model...")
modelNum = 0
print("Your model number is "+ str(modelNum))
return modelNum
"""
Get entire tables
"""
[docs]def isMeasurement(csv_dict):
""" Check whether measurement tables are available
Parameters
----------
csv_dict : dict
Dictionary of available csv
Returns
-------
paleoMeasurementTables : list
List of available paleoMeasurementTables
chronMeasurementTables : list
List of available chronMeasurementTables
"""
chronMeasurementTables = []
paleoMeasurementTables =[]
for val in csv_dict.keys():
if "measurement" in val and "chron" in val:
chronMeasurementTables.append(val)
if "measurement" in val and "paleo" in val:
paleoMeasurementTables.append(val)
return chronMeasurementTables, paleoMeasurementTables
[docs]def whichMeasurement(measurementTableList):
"""Select a measurement table from a list
Use in conjunction with the function isMeasurement
Parameters
----------
measurementTableList : list
List of measurement tables contained in the LiPD file. Output from the isMeasurement function
Returns
-------
csvName : string
the name of the csv file
"""
if len(measurementTableList)>1:
print("More than one table is available.")
for idx, val in enumerate(measurementTableList):
print(idx, ": ", val)
csvName = measurementTableList[int(input("Which one would you like to use? "))]
else:
csvName = measurementTableList[0]
return csvName
[docs]def getMeasurement(csvName, lipd):
"""Extract the dictionary corresponding to the measurement table
Parameters
----------
csvName : string
The name of the csv file
lipd : dict
The LiPD object from which to extract the data
Returns
-------
ts_list : dict
A dictionary containing data and metadata for each column in the
csv file.
"""
csvNameSplit = csvName.split('.')
for val in csvNameSplit:
if "chron" in val or "paleo" in val:
tableName = val
if tableName[0] == 'c':
objectName = 'chron'+tableName.split('chron')[1][0]
ts_list = lipd["chronData"][objectName]["measurementTable"][tableName]["columns"]
elif tableName[0] == 'p':
objectName = 'paleo'+tableName.split('paleo')[1][0]
ts_list = lipd["paleoData"][objectName]["measurementTable"][tableName]["columns"]
else:
raise KeyError("Key name should only include 'chron' or 'paleo'")
return ts_list
"""
Deal with ensembles
"""
[docs]def isEnsemble(csv_dict):
""" Check whether ensembles are available
Parameters
----------
csv_dict : dict
Dictionary of available csv
Returns
-------
paleoEnsembleTables : list
List of available paleoEnsembleTables
chronEnsembleTables : list
List of availale chronEnsemble Tables
"""
chronEnsembleTables =[]
paleoEnsembleTables =[]
for val in csv_dict.keys():
if "ensemble" in val and "chron" in val:
chronEnsembleTables.append(val)
elif "ensemble" in val and "paleo" in val:
paleoEnsembleTables.append(val)
return chronEnsembleTables, paleoEnsembleTables
[docs]def whichEnsemble(ensembleTableList):
"""Select an ensemble table from a list
Use in conjunction with the function isMeasurement
Parameters
----------
measurementTableList : list
List of measurement tables contained in the LiPD file. Output from the isMeasurement function
csv_list : list
Dictionary of available csv
Returns
-------
csvName : string
the name of the csv file
"""
if len(ensembleTableList)>1:
print("More than one table is available.")
for idx, val in enumerate(ensembleTableList):
print(idx, ": ", val)
csvName = ensembleTableList[int(input("Which one would you like to use? "))]
else:
csvName = ensembleTableList[0]
return csvName
[docs]def getEnsemble(csv_dict, csvName):
""" Extracts the ensemble values and depth vector from the dictionary and
returns them into two numpy arrays.
Parameters
----------
csv_dict : dict
dictionary containing the availableTables
csvName : str
Name of the csv
Returns
-------
depth : array
Vector of depth
ensembleValues : array
The matrix of Ensemble values
"""
ensemble_dict=csv_dict[csvName]
ensembleValues=[]
for val in ensemble_dict.keys():
if 'depth' in val:
depth = ensemble_dict[val]["values"]
else:
ensembleValues.append(ensemble_dict[val]["values"])
ensembleValues= np.transpose(np.array(ensembleValues))
return depth, ensembleValues
[docs]def mapAgeEnsembleToPaleoData(ensembleValues, depthEnsemble, depthMapping,extrapolate=True):
""" Map the depth for the ensemble age values to the paleo depth
Parameters
----------
ensembleValues : array
A matrix of possible age models. Realizations should be stored in columns
depthEnsemble : array
A vector of depth. The vector should have the same length as the number of rows in the ensembleValues
depthMapping : array
A vector corresponding to the depth that will be the ensemble should be mapped onto
extrapolate : bool
Whether to extrapolate the ensemble values to the paleo depth. Default is True
Returns
-------
ensembleValuesToPaleo : array
A matrix of age ensemble on the PaleoData scale
"""
if len(depthEnsemble)!=np.shape(ensembleValues)[0]:
raise ValueError("Depth and ensemble need to have the same length")
#Make sure that numpy arrays were given
ensembleValues=np.array(ensembleValues)
depthEnsemble=np.array(depthEnsemble)
depthMapping = np.array(depthMapping)
#Interpolate
ensembleValuesMapped = np.zeros((len(depthMapping),np.shape(ensembleValues)[1])) #placeholder
if extrapolate is True:
for i in np.arange(0,np.shape(ensembleValues)[1]):
ensembleValuesMapped[:,i]=np.interp(depthMapping,depthEnsemble,ensembleValues[:,i])
elif extrapolate is False:
for i in np.arange(0,np.shape(ensembleValues)[1]):
ensembleValuesMapped[:,i]=np.interp(depthMapping,depthEnsemble,ensembleValues[:,i],left=np.nan,right=np.nan)
return ensembleValuesMapped