Intake III - work with two catalogs and merge them#
Overview
🎯 objectives: Learn how to integrate intake-esm
in your workflow
⌛ time_estimation: “60min”
☑️ requirements:
© contributors: k204210
⚖ license:
Agenda
Based on DKRZ’s CMIP6 and CORDEX catalogs and a Pangeo’s CMIP6 catalog, you learn in this part,
how to merge catalogs by combining their data bases with differently formatted assets.
This includes
Loading catalogs with a user-defined set of attributes
Comparing meta data for checking for compatibility
Merging the data bases via merge or concat
Configure a catalog description for accessing datasets across projects
Make ALL data accessible and consolidate aggregation
Save the new catalog
Questions
how can you find out how compatible the catalogs are? Would you have to sanitize the column names?
what is overlap? Which of the 1000 datasets of pange are included in DKRZ’s?
This tutorial highlights two use cases:
Merging two projects (CMIP6 and CORDEX, both from DKRZ)
Merging two data bases for the same project (CMIP6 DKRZ and CMIP6 Pangeo)
For each, the ultimate Goal of this tutorial is to create a merged catalog which also enables data access to data sets of both catalogs.
Case 1: Merge two projects CMIP6 and CORDEX in one catalog#
import intake
import pandas as pd
#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#only for generating the web page we need to take the original link:
dkrz_catalog=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])
print([entry for entry in list(dkrz_catalog) if "disk" in entry and ("cordex" in entry or "cmip6" in entry)])
['dkrz_cmip6_disk', 'dkrz_cordex_disk']
Load catalogs with default + common columns#
Most of all DKRZ catalogs include cataloonies attributes. This simplifies the merging as you could already merge the catalogs over these columns. Usable columns of the catalogs are stored in the main catalog’s metadata and can be displayed and retrieved:
dkrz_catalog.metadata
{'parameters': {'additional_cmip6_disk_columns': {'default': ['units',
'path',
'opendap_url',
'long_name'],
'type': 'list[str]'},
'additional_era5_disk_columns': {'default': ['path',
'units',
'long_name',
'short_name'],
'type': 'list[str]'},
'cataloonie_columns': {'default': ['project',
'institution_id',
'source_id',
'experiment_id',
'simulation_id',
'realm',
'frequency',
'time_reduction',
'grid_label',
'grid_id',
'level_type',
'time_min',
'time_max',
'time_range',
'format',
'uri',
'variable_id'],
'type': 'list[str]'}}}
overall_columns=dkrz_catalog.metadata["parameters"]["cataloonie_columns"]["default"]
print(overall_columns)
['project', 'institution_id', 'source_id', 'experiment_id', 'simulation_id', 'realm', 'frequency', 'time_reduction', 'grid_label', 'grid_id', 'level_type', 'time_min', 'time_max', 'time_range', 'format', 'uri', 'variable_id']
However, these attributes are not sufficient for finding an individual assets in CORDEX and CMIP6. We need additional columns:
cordex_columns=dkrz_catalog._entries["dkrz_cordex_disk"]._open_args["read_csv_kwargs"]["usecols"]
print(cordex_columns)
cmip6_columns=dkrz_catalog._entries["dkrz_cmip6_disk"]._open_args["read_csv_kwargs"]["usecols"]
print(cmip6_columns)
['project', 'product_id', 'CORDEX_domain', 'institute_id', 'driving_model_id', 'experiment_id', 'member', 'model_id', 'rcm_version_id', 'frequency', 'variable_id', 'version', 'time_range', 'uri', 'format']
['project', 'activity_id', 'source_id', 'institution_id', 'experiment_id', 'member_id', 'dcpp_init_year', 'table_id', 'variable_id', 'grid_label', 'version', 'time_range', 'uri', 'format']
We open both catalogs with the columns that we have found:
cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(read_csv_kwargs=dict(usecols=cmip6_columns+overall_columns))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (21,22,23) have mixed types. Specify dtype option on import or set low_memory=False.
df = pd.read_csv(
cordex_cat=dkrz_catalog.dkrz_cordex_disk(read_csv_kwargs=dict(usecols=cordex_columns+overall_columns))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (22) have mixed types. Specify dtype option on import or set low_memory=False.
df = pd.read_csv(
We assume that we are interested in the variable tas which is the Near-Surface Temperature:
cmip6_cat=cmip6_cat.search(variable_id="tas")
cordex_cat=cordex_cat.search(variable_id="tas")
Merge both catalogs#
The underlying DataFrames have different columns. We add CMIP6 columns to the CORDEX catalog and vice versa so that we can merge:
for cordex_col in list(set(cordex_columns)-set(overall_columns)):
cmip6_cat.df.loc[:,cordex_col]="None"
for cmip6_col in list(set(cmip6_columns)-set(overall_columns)):
cordex_cat.df.loc[:,cmip6_col]="None"
for column in overall_columns+cmip6_columns+cordex_columns :
cmip6_cat.df[column]=cmip6_cat.df[column].astype(str)
cordex_cat.df[column]=cordex_cat.df[column].astype(str)
overall_df=pd.merge(cmip6_cat.df, cordex_cat.df, on=overall_columns+cmip6_columns+cordex_columns, how="outer")
overall_df
activity_id | institution_id | source_id | experiment_id | member_id | table_id | variable_id | grid_label | dcpp_init_year | version | ... | time_max | format | uri | member | model_id | institute_id | rcm_version_id | driving_model_id | product_id | CORDEX_domain | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r1i1p1f1 | Amon | tas | gn | nan | None | ... | 201412 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | None | None | None | None | None | None | None |
1 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r2i1p1f1 | Amon | tas | gn | nan | None | ... | 201412 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | None | None | None | None | None | None | None |
2 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r2i1p1f1 | day | tas | gn | nan | None | ... | 20141231 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | None | None | None | None | None | None | None |
3 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r3i1p1f1 | Amon | tas | gn | nan | None | ... | 201412 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | None | None | None | None | None | None | None |
4 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r3i1p1f1 | day | tas | gn | nan | None | ... | 20141231 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | None | None | None | None | None | None | None |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
211137 | None | MPI-CSC | MPI-CSC-REMO2009 | rcp85 | None | None | tas | nan | None | None | ... | 206011.0 | netcdf | /work/kd0956/CORDEX/data/cordex/output/WAS-44i... | r1i1p1 | MPI-CSC-REMO2009 | MPI-CSC | v1 | MPI-M-MPI-ESM-LR | output | WAS-44i |
211138 | None | MPI-CSC | MPI-CSC-REMO2009 | rcp85 | None | None | tas | nan | None | None | ... | 207011.0 | netcdf | /work/kd0956/CORDEX/data/cordex/output/WAS-44i... | r1i1p1 | MPI-CSC-REMO2009 | MPI-CSC | v1 | MPI-M-MPI-ESM-LR | output | WAS-44i |
211139 | None | MPI-CSC | MPI-CSC-REMO2009 | rcp85 | None | None | tas | nan | None | None | ... | 208011.0 | netcdf | /work/kd0956/CORDEX/data/cordex/output/WAS-44i... | r1i1p1 | MPI-CSC-REMO2009 | MPI-CSC | v1 | MPI-M-MPI-ESM-LR | output | WAS-44i |
211140 | None | MPI-CSC | MPI-CSC-REMO2009 | rcp85 | None | None | tas | nan | None | None | ... | 209011.0 | netcdf | /work/kd0956/CORDEX/data/cordex/output/WAS-44i... | r1i1p1 | MPI-CSC-REMO2009 | MPI-CSC | v1 | MPI-M-MPI-ESM-LR | output | WAS-44i |
211141 | None | MPI-CSC | MPI-CSC-REMO2009 | rcp85 | None | None | tas | nan | None | None | ... | 210011.0 | netcdf | /work/kd0956/CORDEX/data/cordex/output/WAS-44i... | r1i1p1 | MPI-CSC-REMO2009 | MPI-CSC | v1 | MPI-M-MPI-ESM-LR | output | WAS-44i |
211142 rows × 29 columns
Redefine catalog description#
We copy the entire .json
description file so that we can edit it.
mixed_esmcol_data=dict(cmip6_cat.esmcat).copy()
mixed_esmcol_data["aggregation_control"]=dict(mixed_esmcol_data["aggregation_control"]).copy()
mixed_esmcol_data
{'esmcat_version': '0.1.0',
'attributes': [Attribute(column_name='project', vocabulary=''),
Attribute(column_name='activity_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'),
Attribute(column_name='source_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'),
Attribute(column_name='institution_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'),
Attribute(column_name='experiment_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'),
Attribute(column_name='member_id', vocabulary=''),
Attribute(column_name='table_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'),
Attribute(column_name='variable_id', vocabulary=''),
Attribute(column_name='grid_label', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'),
Attribute(column_name='version', vocabulary=''),
Attribute(column_name='dcpp_init_year', vocabulary=''),
Attribute(column_name='time_range', vocabulary=''),
Attribute(column_name='uri', vocabulary=''),
Attribute(column_name='simulation_id', vocabulary=''),
Attribute(column_name='realm', vocabulary=''),
Attribute(column_name='frequency', vocabulary=''),
Attribute(column_name='time_reduction', vocabulary=''),
Attribute(column_name='grid_label', vocabulary=''),
Attribute(column_name='grid_id', vocabulary=''),
Attribute(column_name='level_type', vocabulary=''),
Attribute(column_name='time_min', vocabulary=''),
Attribute(column_name='time_max', vocabulary=''),
Attribute(column_name='opendap_url', vocabulary=''),
Attribute(column_name='path', vocabulary=''),
Attribute(column_name='units', vocabulary=''),
Attribute(column_name='long_name', vocabulary='')],
'assets': Assets(column_name='uri', format=None, format_column_name='format'),
'aggregation_control': {'variable_column_name': 'variable_id',
'groupby_attrs': ['activity_id',
'source_id',
'experiment_id',
'table_id',
'grid_label'],
'aggregations': [Aggregation(type=<AggregationType.union: 'union'>, attribute_name='variable_id', options={}),
Aggregation(type=<AggregationType.join_existing: 'join_existing'>, attribute_name='time_range', options={'compat': 'override', 'coords': 'minimal', 'dim': 'time'}),
Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='member_id', options={'compat': 'override', 'coords': 'minimal'}),
Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='dcpp_init_year', options={'compat': 'override', 'coords': 'minimal'})]},
'id': '/work/ik1017/Catalogs/dkrz_cmip6_disk',
'catalog_dict': None,
'catalog_file': None,
'description': "This is a ESM-collection for CMIP6 data on DKRZ's disk storage system which will be loaded from a source file which is in the cloud (see catalog_file)",
'title': None,
'last_updated': None}
Let’s have a look at these entries. We can subdivide these into two groups:
Required to be manually changed:
groupby_attrs: We will change it such that both CMIP6 and CORDEX datasets can be created.
attributes and default_columns: The CORDEX ones need to be added
aggregation_control: Must be revised but we can do that afterwards. For now, we will just delete all entries but the one for
variable_id
Other attributes
assets: Will stay the same as there is no difference between the original catalogs
catalog_file: Will be automatically overwritten by Intake when the final catalog is written.
Description, esmcat_version, id: Is arbitrary
We will start with adding missing attributes:
columns_already=[dict(k)["column_name"] for k in mixed_esmcol_data["attributes"]]
columns_already
['project',
'activity_id',
'source_id',
'institution_id',
'experiment_id',
'member_id',
'table_id',
'variable_id',
'grid_label',
'version',
'dcpp_init_year',
'time_range',
'uri',
'simulation_id',
'realm',
'frequency',
'time_reduction',
'grid_label',
'grid_id',
'level_type',
'time_min',
'time_max',
'opendap_url',
'path',
'units',
'long_name']
for k in dict(cordex_cat.esmcat)["attributes"] :
if dict(k)["column_name"] not in columns_already:
mixed_esmcol_data["attributes"].append(k)
groupby_attrs:
he attributes used to build an index for a dataset is defined by the order of attributes in the list groupby_attrs. The aggregation methods for CMIP6 datasets and CORDEX datasets differ.
We have to redefine this list. Think about the perfect order and arrangement of attributes.
mixed_esmcol_data["aggregation_control"]["groupby_attrs"]=[
"CORDEX_domain",
"driving_model_id",
"activity_id",
"institute_id",
"model_id",
"experiment_id",
"frequency",
"table_id",
"grid_label"
]
aggregation_control
For now, drop all the aggregation attributes besides variable_id
for enabling a quick save of the catalog. Note that the grouping only works if there is at least one entry in the mixed_esmcol_data["aggregation_control"]["aggregations"]
list.
for entry in mixed_esmcol_data["aggregation_control"]["aggregations"]:
if dict(entry)["attribute_name"] != "variable_id" :
mixed_esmcol_data["aggregation_control"]["aggregations"].remove(entry)
mixed_esmcol_data
{'esmcat_version': '0.1.0',
'attributes': [Attribute(column_name='project', vocabulary=''),
Attribute(column_name='activity_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'),
Attribute(column_name='source_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'),
Attribute(column_name='institution_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'),
Attribute(column_name='experiment_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'),
Attribute(column_name='member_id', vocabulary=''),
Attribute(column_name='table_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'),
Attribute(column_name='variable_id', vocabulary=''),
Attribute(column_name='grid_label', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'),
Attribute(column_name='version', vocabulary=''),
Attribute(column_name='dcpp_init_year', vocabulary=''),
Attribute(column_name='time_range', vocabulary=''),
Attribute(column_name='uri', vocabulary=''),
Attribute(column_name='simulation_id', vocabulary=''),
Attribute(column_name='realm', vocabulary=''),
Attribute(column_name='frequency', vocabulary=''),
Attribute(column_name='time_reduction', vocabulary=''),
Attribute(column_name='grid_label', vocabulary=''),
Attribute(column_name='grid_id', vocabulary=''),
Attribute(column_name='level_type', vocabulary=''),
Attribute(column_name='time_min', vocabulary=''),
Attribute(column_name='time_max', vocabulary=''),
Attribute(column_name='opendap_url', vocabulary=''),
Attribute(column_name='path', vocabulary=''),
Attribute(column_name='units', vocabulary=''),
Attribute(column_name='long_name', vocabulary=''),
Attribute(column_name='CORDEX_domain', vocabulary=''),
Attribute(column_name='product_id', vocabulary=''),
Attribute(column_name='institute_id', vocabulary=''),
Attribute(column_name='driving_model_id', vocabulary=''),
Attribute(column_name='member', vocabulary=''),
Attribute(column_name='model_id', vocabulary=''),
Attribute(column_name='rcm_version_id', vocabulary=''),
Attribute(column_name='format', vocabulary='')],
'assets': Assets(column_name='uri', format=None, format_column_name='format'),
'aggregation_control': {'variable_column_name': 'variable_id',
'groupby_attrs': ['CORDEX_domain',
'driving_model_id',
'activity_id',
'institute_id',
'model_id',
'experiment_id',
'frequency',
'table_id',
'grid_label'],
'aggregations': [Aggregation(type=<AggregationType.union: 'union'>, attribute_name='variable_id', options={}),
Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='member_id', options={'compat': 'override', 'coords': 'minimal'})]},
'id': '/work/ik1017/Catalogs/dkrz_cmip6_disk',
'catalog_dict': None,
'catalog_file': None,
'description': "This is a ESM-collection for CMIP6 data on DKRZ's disk storage system which will be loaded from a source file which is in the cloud (see catalog_file)",
'title': None,
'last_updated': None}
NaN
s cause trouble in the df, so that we set it to notset:
for k in mixed_esmcol_data["aggregation_control"]["groupby_attrs"]:
overall_df[overall_df[k].isna()]="notset"
overall_df[k]=overall_df[k].str.replace("None","notset")
overall_df[k]=overall_df[k].str.replace("nan","notset")
Now, let us open the combined intake catalog:
cmip6andcordex=intake.open_esm_datastore(
obj=dict(
esmcat=mixed_esmcol_data,
df=overall_df
)
)
We write the new catalog to disk via:
cmip6andcordex.serialize("test", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/test.json
We can test if our configuration works by directly opening it:
intake.open_esm_datastore("test.json").search(experiment_id="historical",
source_id="MPI*",
simulation_id="r1i1*")#.to_dataset_dict(cdf_kwargs=dict(chunks=dict(time=1)))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (4,13,15,16,22,25,27) have mixed types. Specify dtype option on import or set low_memory=False.
df = pd.read_csv(
test catalog with 27 dataset(s) from 633 asset(s):
unique | |
---|---|
activity_id | 2 |
institution_id | 3 |
source_id | 4 |
experiment_id | 1 |
member_id | 2 |
table_id | 7 |
variable_id | 1 |
grid_label | 2 |
dcpp_init_year | 0 |
version | 0 |
time_range | 399 |
project | 2 |
simulation_id | 3 |
grid_id | 1 |
frequency | 7 |
time_reduction | 2 |
realm | 1 |
level_type | 0 |
time_min | 324 |
time_max | 323 |
format | 1 |
uri | 633 |
member | 1 |
model_id | 2 |
institute_id | 2 |
rcm_version_id | 1 |
driving_model_id | 2 |
product_id | 1 |
CORDEX_domain | 8 |
derived_variable_id | 0 |
Case 2: Merge two data bases for CMIP6#
Assume you are interested in variable tas
from table Amon
from both catalogs.
You would start look like this:
pangeo=intake.open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/master.yaml")
#
print(list(pangeo.climate))
cmip6_pangeo=intake.open_esm_datastore("https://storage.googleapis.com/cmip6/pangeo-cmip6.json")
cmip6_cat=dkrz_catalog.dkrz_cmip6_disk
#
esm_dkrz_tas=cmip6_cat.search(
variable_id="tas",
table_id="Amon"
)
esm_pangeo_tas=cmip6_pangeo.search(
variable_id="tas",
table_id="Amon"
)
['cmip6_gcs', 'cmip6_s3', 'GFDL_CM2_6', 'GFDL_CM2_6_s3', 'tracmip', 'tracmip_s3']
print(esm_dkrz_tas)
</work/ik1017/Catalogs/dkrz_cmip6_disk catalog with 1362 dataset(s) from 66069 asset(s)>
print(esm_pangeo_tas)
<pangeo-cmip6 catalog with 975 dataset(s) from 15427 asset(s)>
Let’s
Compare the Metadata#
Both catalogs follow the esmcat-specs which can be seen from the following entry:
print(dict(esm_dkrz_tas.esmcat)["esmcat_version"])
print(dict(esm_pangeo_tas.esmcat)["esmcat_version"])
0.1.0
0.1.0
As both catalogs follow the esmcat standard, they have a list of
attributes
which we can compare: Indeed, they have exactly the same attributes/columns. In the following, we use pandas DataFrames for better displays:
import pandas as pd
esm_dkrz_atts_df=pd.DataFrame([dict(k) for k in dict(esm_dkrz_tas.esmcat)["attributes"]])
esm_pangeo_atts_df=pd.DataFrame([dict(k) for k in dict(esm_pangeo_tas.esmcat)["attributes"]])
esm_dkrz_atts_df
column_name | vocabulary | |
---|---|---|
0 | project | |
1 | activity_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
2 | source_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
3 | institution_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
4 | experiment_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
5 | member_id | |
6 | table_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
7 | variable_id | |
8 | grid_label | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
9 | version | |
10 | dcpp_init_year | |
11 | time_range | |
12 | uri | |
13 | simulation_id | |
14 | realm | |
15 | frequency | |
16 | time_reduction | |
17 | grid_label | |
18 | grid_id | |
19 | level_type | |
20 | time_min | |
21 | time_max | |
22 | opendap_url | |
23 | path | |
24 | units | |
25 | long_name |
esm_pangeo_atts_df
column_name | vocabulary | |
---|---|---|
0 | activity_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
1 | source_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
2 | institution_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
3 | experiment_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
4 | member_id | |
5 | table_id | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
6 | variable_id | |
7 | grid_label | https://raw.githubusercontent.com/WCRP-CMIP/CM... |
8 | version | |
9 | dcpp_start_year |
esm_pangeo_atts_df.equals(esm_dkrz_atts_df)
False
When working with both catalogs, you would notice that the pangeo’s do not use a prefix character ‘v’ for the values of version however dkrz does. We fix that with:
esm_pangeo_tas.df["version"]= "v" + esm_pangeo_tas.df["version"].astype(str)
The data format: The pangeo catalog contains zarr datasets stored in the google cloud storage while dkrz’s catalog allows different formats by providing a column named format. When we combine these catalogs, we have to consider the different formats
esm_pangeo_tas.esmcat.assets.format
<DataFormat.zarr: 'zarr'>
print(
esm_dkrz_tas.df["format"].unique(),
esm_dkrz_tas.esmcat.assets.format_column_name
)
['netcdf'] format
Combine the databases with the underlying DataFrames#
This is a workflow for creating a merged data base:
Find all common column names/keys that are in both data bases.
Create a filtered Catalog
Setting common columns as index in both catalogs
Throw out indices in one catalog that are in both.
Concat the filtered catalog with the reference catalog.
Let us start with 1.:
keys = [key
for key in esm_dkrz_tas.df.columns.values
if key in esm_pangeo_tas.df.columns.values
]
keys
['activity_id',
'institution_id',
'source_id',
'experiment_id',
'member_id',
'table_id',
'variable_id',
'grid_label',
'dcpp_init_year',
'version']
We continue with 2.:
Create a filtered Catalog
We create a multi-index with all common keys with set_index
and save these in new variables i1
and i2
. These can be used as a filter. The ~
sign reverses the condition in the filter:
i1 = esm_pangeo_tas.df.set_index(keys).index
i2 = esm_dkrz_tas.df.set_index(keys).index
esm_pangeo_tas_filtered=esm_pangeo_tas.df[~i1.isin(i2)]
And finally, 3.
Concat the filtered catalog with the reference catalog.
We use pandas concat
function and ignore the indices of both catalogs.
esm_merged=pd.concat([esm_dkrz_tas.df, esm_pangeo_tas_filtered],
ignore_index=True)
esm_merged
activity_id | institution_id | source_id | experiment_id | member_id | table_id | variable_id | grid_label | dcpp_init_year | version | time_range | project | format | uri | zstore | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r1i1p1f1 | Amon | tas | gn | NaN | v20200428 | 185001-201412 | CMIP6 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | NaN |
1 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r2i1p1f1 | Amon | tas | gn | NaN | v20200430 | 185001-201412 | CMIP6 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | NaN |
2 | AerChemMIP | BCC | BCC-ESM1 | hist-piAer | r3i1p1f1 | Amon | tas | gn | NaN | v20200430 | 185001-201412 | CMIP6 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | NaN |
3 | AerChemMIP | BCC | BCC-ESM1 | hist-piNTCF | r1i1p1f1 | Amon | tas | gn | NaN | v20190621 | 185001-201412 | CMIP6 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | NaN |
4 | AerChemMIP | BCC | BCC-ESM1 | hist-piNTCF | r2i1p1f1 | Amon | tas | gn | NaN | v20190621 | 185001-201412 | CMIP6 | netcdf | /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
72250 | ScenarioMIP | MIROC | MIROC6 | ssp245 | r50i1p1f1 | Amon | tas | gn | NaN | v20210917 | NaN | NaN | NaN | NaN | gs://cmip6/CMIP6/ScenarioMIP/MIROC/MIROC6/ssp2... |
72251 | ScenarioMIP | MIROC | MIROC6 | ssp245 | r29i1p1f1 | Amon | tas | gn | NaN | v20210917 | NaN | NaN | NaN | NaN | gs://cmip6/CMIP6/ScenarioMIP/MIROC/MIROC6/ssp2... |
72252 | CMIP | MOHC | HadGEM3-GC31-LL | piControl | r1i1p1f1 | Amon | tas | gn | NaN | v20211103 | NaN | NaN | NaN | NaN | gs://cmip6/CMIP6/CMIP/MOHC/HadGEM3-GC31-LL/piC... |
72253 | CMIP | CMCC | CMCC-CM2-SR5 | historical | r3i1p2f1 | Amon | tas | gn | NaN | v20211108 | NaN | NaN | NaN | NaN | gs://cmip6/CMIP6/CMIP/CMCC/CMCC-CM2-SR5/histor... |
72254 | CMIP | CMCC | CMCC-CM2-SR5 | historical | r2i1p2f1 | Amon | tas | gn | NaN | v20211109 | NaN | NaN | NaN | NaN | gs://cmip6/CMIP6/CMIP/CMCC/CMCC-CM2-SR5/histor... |
72255 rows × 15 columns
Make ALL data accessible and consolidate aggregation#
Intake enables to load assets of different formats. For that,
the data base must have a column which describes the format of the asset.
only one column contains the information how to access the asset needs to be merged. In our example, the
zstore
column and theuri
column needs to be merged into one common column. We name thaturi
.the assets entry in the catalog description needs to be adapted to the new configuration.
We start with
creating a ‘format’ column which is zarr if there is no entry in uri and netcdf in all other cases.
esm_merged["format"]="netcdf"
esm_merged.loc[pd.isna(esm_merged["uri"]),"format"]="zarr"
esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"
We merge the zstore and uri columns in a new column uri. As we need individual values of the asset, we have to loop over the rows.
Note
Whenever you can, you should omit using iterrows
because it is rather slow.
esm_merged.loc[pd.isna(esm_merged["uri"]),"uri"]=esm_merged["zstore"]
del esm_merged["zstore"]
We now create a new description.
This will be based on the dkrz catalog. We use that because it has the aggregation over time which we want to maintain.
The assets entry now does not have a direct description of the format but instead a specification of a format_column_name. Also, the column_name is uri instead of path:
new_cat_json=dict(esm_dkrz_tas.esmcat).copy()
new_cat_json["assets"]={
"column_name":"uri",
"format_column_name":"format"
}
new_cat_json["id"]="Merged dkrz-pangeo cmip6 subset catalog"
In order to make zarr stores compatible with the aggregation over time, we have to fill in a dummy value in time_range:
esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"
Save the new catalog#
Let us test the new catalog first. We can open the new catalog by providing two arguments to open_esm_datastore
:
the data base esm_merged
the catalog description new_cat_json
Afterwards, we search for a subset which is in both
esm_merged_cat=intake.open_esm_datastore(
dict(
esmcat=new_cat_json,
df=esm_merged
)
)
esm_merged_cat_test=esm_merged_cat.search(activity_id="ScenarioMIP",
member_id="r1i1p1f1",
grid_label="gn",
source_id=["MPI-ESM1-2-HR","CAS-ESM2-0"])
Since we have two different formats in the catalog, we have to provide keyword arguments for both formats within the to_dataset_dict
function.
zarr_kwargs={"consolidated":True}
is needed because Pangeo’s zarr assets have consolidated metadatacdf_kwargs={"chunks":{"time":1}}
configures dask to not use very large arrays
test_dsets=esm_merged_cat_test.to_dataset_dict(
zarr_kwargs={"consolidated":True},
cdf_kwargs={"chunks":{"time":1}}
)
--> The keys in the returned dictionary of datasets are constructed as follows:
'activity_id.source_id.experiment_id.table_id.grid_label'
/tmp/ipykernel_711/38849239.py:1: DeprecationWarning: cdf_kwargs and zarr_kwargs are deprecated and will be removed in a future version. Please use xarray_open_kwargs instead.
test_dsets=esm_merged_cat_test.to_dataset_dict(
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
warnings.warn(
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
warnings.warn(
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
warnings.warn(
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
223 datasets = [
224 _open_dataset(
225 record[self.path_column_name],
(...)
241 for _, record in self.df.iterrows()
242 ]
--> 244 datasets = dask.compute(*datasets)
245 if len(datasets) == 1:
File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
664 postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
318 raise exc.with_traceback(tb)
--> 319 raise exc
File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
72 if kwargs:
---> 73 return func(*args, **kwargs)
74 else:
File /envs/lib/python3.11/site-packages/intake_esm/source.py:77, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
76 else:
---> 77 ds = xr.open_dataset(url, **xarray_open_kwargs)
78 if preprocess is not None:
File /envs/lib/python3.11/site-packages/xarray/backends/api.py:566, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
565 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 566 backend_ds = backend.open_dataset(
567 filename_or_obj,
568 drop_variables=drop_variables,
569 **decoders,
570 **kwargs,
571 )
572 ds = _dataset_from_backend_dataset(
573 backend_ds,
574 filename_or_obj,
(...)
584 **kwargs,
585 )
TypeError: NetCDF4BackendEntrypoint.open_dataset() got an unexpected keyword argument 'consolidated'
The above exception was the direct cause of the following exception:
ESMDataSourceError Traceback (most recent call last)
Cell In[47], line 1
----> 1 test_dsets=esm_merged_cat_test.to_dataset_dict(
2 zarr_kwargs={"consolidated":True},
3 cdf_kwargs={"chunks":{"time":1}}
4 )
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()
File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
680 except Exception as exc:
681 if not skip_on_error:
--> 682 raise exc
683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
684 return self.datasets
File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
676 for task in gen:
677 try:
--> 678 key, ds = task.result()
679 datasets[key] = ds
680 except Exception as exc:
File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
447 raise CancelledError()
448 elif self._state == FINISHED:
--> 449 return self.__get_result()
451 self._condition.wait(timeout)
453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
819 def _load_source(key, source):
--> 820 return key, source.to_dask()
File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
275 def to_dask(self):
276 """Return xarray object (which will have chunks)"""
--> 277 self._load_metadata()
278 return self._ds
File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
281 """load metadata only if needed"""
282 if self._schema is None:
--> 283 self._schema = self._get_schema()
284 self.dtype = self._schema.dtype
285 self.shape = self._schema.shape
File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
206 def _get_schema(self) -> Schema:
207 if self._ds is None:
--> 208 self._open_dataset()
209 metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
210 self._schema = Schema(
211 datashape=None,
212 dtype=None,
(...)
215 extra_metadata=metadata,
216 )
File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
266 self._ds.attrs[OPTIONS['dataset_key']] = self.key
268 except Exception as exc:
--> 269 raise ESMDataSourceError(
270 f"""Failed to load dataset with key='{self.key}'
271 You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
272 """
273 ) from exc
ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'
You can use `cat['ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'].df` to inspect the assets/files for this key.
That worked fine. Now we save the catalog with serialize
. We will separate the catalog into two files, the database .csv.gz
file and the descriptor .json
file. We can do that by passing the catalog_type
keyword argument:
esm_merged_cat.serialize(name="our_catalog", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/our_catalog.json
See also
This tutorial is part of a series on intake
:
You can also do another CMIP6 tutorial from the official intake page.