Intake V - create intake-esm catalog from scratch#
Overview
🎯 objectives: Learn how to create intake-esm
ESM-collections
⌛ time_estimation: “60min”
☑️ requirements: intake_esm.__version__ == 2023.4.*
, at least 10GB memory.
© contributors: k204210
⚖ license:
Agenda
In this part, you learn
When to build
intake-esm
collectionsHow to create a standardized intake-esm catalog from scratch
How to equip the catalog with attributes and configurations for assets and aggregation
How to add the collection of assets to the catalog
How to validate and save the newly created catalog
How to configure the catalog to process multivariable assets
Intake is a cataloging tool for data repositories. It opens catalogs with drivers. Drivers can be plug-ins like intake-esm
.
This tutorial gives insight into the creation of a intake-esm catalogs. We recommend this specific driver for intake when working with ESM-data as the plugin allows to load the data with the widely used and accepted tool xarray
.
Note
This tutorial creates a catalog from scratch. If you work based on another catalog, it might be sufficient for you to look into intake II - save subset
1. When should I create an intake-esm
catalog?#
Cataloging your data set with a static catalog for easy access is beneficial if
the data set is stable 🏔 such that you do not have to update the content of the catalog to make it usable at all
the data set is very large 🗃 such that browsing and accessing data via file system is less performant
the data set should be shared 🔀 with many people such that you cannot use a data base format
2. Create an intake-esm catalog which complies to esmcat-specs#
In order to create a well-defined, helpful catalog, you have to answer the following questions:
What should be search facetts of the catalog?
How are assets of the catalog combined to a dataset?
How should
xarray
open the data set?
For intake-esm
catalogs, an early standard has been developped to ensure compatibility across different intake-esm
catalogs. We will follow those specs in this tutorial.
In the code example, we will use a python dictionary in this example but you could also write directly into a file with your favorite editor. We start with a catalog dictionary intake_esm_catalog
and add the required basic meta data:
intake_esm_catalog={
# we follow the esmcat specs version 0.1.0:
'esmcat_version': '0.1.0',
'id': 'Intake-esmI',
'description': "This is an intake catalog created for the intake tutorial"
}
2.1. Create the description#
The description contains all the meta data which is necessary to understand the catalog. That makes the catalog self-descriptive. It also includes configuration for intake how to load assets of the data set(s) with the specified driver.
Define attributes of your catalog#
The catalog’s collection uses attributes to describe the assets. These attributes are defined in the description via python dict
ionaries and given as a list in the intake-esm
catalog .json
file, e.g.:
"attributes": [
{
"column_name": "activity_id",
"vocabulary": "https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json"
},
]
and will be accessed by users from the loaded catalog variable catalog
via:
catalog.esmcat.attributes
Catalog’s attributes should allow users to
effectively browse:
The in-memory representation and the visulation tool for the catalog is a
Pandas
DataFrame. By specifying acolumn_name
, the columns of the DataFrame are generated by the attributes of the catalog.Additionally, the
column_name
of the catalog’s attributes can be used as search facetts - they will be keyword arguments of thecatalog.search()
function
understand the content: You can provide information to the attributes, e.g. by specifying a
vocabulary
for all available (or allowed) values of the attribute.
➡ The collection must have values for all defined attributes (see 3.2.)
➡ In other terms: If assets should be integrated into the catalog, they have to be described with these attributes.
Best Practise
The best configuration is reached if all datasets can be uniquely identified. I.e., if the users fill out all search facets, they will end up with only one dataset.
Do not exaggerate with supply of additional columns. Users may be confused when many search fields have similar meanings. Also, the display of the DataFrame should fit into the window width.
Use case: Catalog for project data on a file system
Given a more than one level directory tree, ensure that:
All files are on the same and deepest directory level.
Each directory level has the same meaning across the project data. E.g. the deepest directory can have the meaning version.
This can easily be done by creating a directory structure template and check against their definitions.
If that is approved, each directory level can be used as an catalog’s attribute.
attributes=[]
directory_structure_template="mip_era/activity_id/institution_id/source_id/experiment_id/member_id/table_id/variable_id/grid_label/version"
for att in directory_structure_template.split('/'):
attributes.append(
dict(column_name=att,
vocabulary=f"https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_{att}.json"
)
)
intake_esm_catalog["attributes"]=attributes
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}]}
Note
For data managemant purposes in general, we highly recoomend to define a path_template and a filename_template for a clear directory structure before storing any data.
You can add more attributes from files or by parsing filenames
Define the assets column of your catalog#
The assets
entry is a python dict
ionary in the catalog similiar to an attribute, e.g.:
"assets": {
"column_name": "path",
"format": "netcdf"
},
The assets of a catalog refer to the data source that can be loaded by intake
. Assets are essential for connecting intake
’s function of browsing with the function of accessing the data. It contains
a
column_name
which is associated with the keyword in the collection. The value ofcolumn_name
in the collection points at the asset which can be loaded byxarray
.the entry
format
specifies the dataformat of the asset.
Note
If you have assets of mixed types, you can substitute format
by format_column_name
so that both information for the asset is taken from the collection
assets={
"column_name": "path",
"format": "netcdf"
}
intake_esm_catalog["assets"]=assets
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}], 'assets': {'column_name': 'path', 'format': 'netcdf'}}
Optional: Define aggregation control for your data sets#
Note
If aggregation_control is not defined, intake opens one xarray dataset per asset
One goal of a catalog is to the make access of the data as analysis ready as possible. Therefore, intake-esm
features aggregating multiple assets to a larger single data set. If aggregation_control is defined in the catalog and users run the catalog’s to_dataset_dict()
function, a Python dictionary of aggregated xarray datasets is created. The logic for merging and/or concatenating the catalog into datasets has to be configured under aggregation_control.
The implementation works such that the variable’s dimensions are either enhanced by a new dimension or an existing dimension is extended with new data included in the addtional assets.
aggregation_control
is adict
ionary in the catalog. If it is set, three keywords have to be configured:variable_column_name
: In the collection, the variable name is specified under that column. Intake-esm will aggregate assets with the same name only. Thus, all assets to be combined to a dataset have to include at least one unique variable. If your assets contain more than one data variable and users should be able to subset with intake, check multi variable assets.groupby_attrs
: assets attributed with different values of thegroupby_attrs
should not be aggregated to one xarray dataset. E.g., if you have data for different ESMs in one catalog you do not want users to merge them into one dataset. Thegroupby_attrs
will be combined to the key of the aggregated dataset in the returned dictionary ofto_dataset_dict()
.aggregations
: Specification of how xarray should combine assets with same values of thesegroupby_attrs
.
. E.g.:
"aggregation_control": {
"variable_column_name": "variable_id",
"groupby_attrs": [
"activity_id",
"institution_id"
],
"aggregations": [
{
"type": "union",
"attribute_name": "variable_id"
}
]
}
Let’s start with defining variable_column_name
and groupby_attrs
:
aggregation_control=dict(
variable_column_name="variable_id",
groupby_attrs=[
"activity_id",
"institution_id"
]
)
Best Practise
A well-defined aggregation control contains all defined attributes
Aggregations:
aggregations is an optional list of dictionaries each of which configures
on which dimension of the variable the assets should be aggregated
optionally: what keyword arguments should be passed to xarray’s
concat()
andmerge()
functions
for one attribute/column of the catalog given as attribute_name
.
A dictionary of the aggregations list is named aggregation object and has to include three specifications:
attribute_name
: the column name which is not a groupby_attr and should be used for aggregating a single variable over a dimensiontype
: Can either bejoin_new
:join_existing
union
optional:
options
: Keyword arguments for xarray
The following defines that variable_id
will be taken for a unique dataset:
aggregation_control["aggregations"]=[dict(
attribute_name="variable_id",
type="union"
)]
Now, we configure intake to use time
for extending the existing dimension time
. Therefore, we have to add options
with “dim”:”time” as keyword argument for xarray:
aggregation_control["aggregations"].append(
dict(
attribute_name="time_range",
type="join_existing",
options={ "dim": "time", "coords": "minimal", "compat": "override" }
)
)
We can also, kind of retrospectively, combine all member of an ensemble on a new dimension of a variable:
aggregation_control["aggregations"].append(
dict(
attribute_name= "member_id",
type= "join_new",
options={ "coords": "minimal", "compat": "override" }
)
)
Note
It is not possible to pre-configure dask
options for xarray
. Be sure that users of your catalog know if and how to set chunks.
intake_esm_catalog["aggregation_control"]=aggregation_control
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}], 'assets': {'column_name': 'path', 'format': 'netcdf'}, 'aggregation_control': {'variable_column_name': 'variable_id', 'groupby_attrs': ['activity_id', 'institution_id'], 'aggregations': [{'attribute_name': 'variable_id', 'type': 'union'}, {'attribute_name': 'time_range', 'type': 'join_existing', 'options': {'dim': 'time', 'coords': 'minimal', 'compat': 'override'}}, {'attribute_name': 'member_id', 'type': 'join_new', 'options': {'coords': 'minimal', 'compat': 'override'}}]}}
2.2. Create the data base for the catalog#
The collection of assets can be specified either
under
catalog_dict
as a list of dictionaries inside the catalog. One asset including all attribute specifications is saved as an individual dictionary, e.g.:
"catalog_dict": [
{
"filename": "/work/mh0287/m221078/prj/switch/icon-oes/experiments/khwX155/outdata/khwX155_atm_mon_18500101.nc",
"variable": "tas_gmean"
}
]
or under
catalog_file
which refers to a separate.csv
file, e.g.
"catalog_file": "dkrz_cmip6_disk_netcdf.csv.gz"
Option A: Catalog_dict implementation#
Assuming, we would like to create a catalog for all files in /work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/
, we can parse the path with our directory_structure_template
:
trunk="/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/"
trunkdict={}
for i,item in enumerate(directory_structure_template.split('/')):
trunkdict[item]=trunk.split('/')[-(len(directory_structure_template.split('/'))-i+1)]
Afterwards, we can associate all files in that directory with these attributes and the additional time_range
and path
using os:
import os
filelist=!ls {trunk}
catalog_dict=[]
for asset in filelist:
assetdict={}
assetdict["time_range"]=asset.split('.')[0].split('_')[-1]
assetdict["path"]=trunk+asset
assetdict.update(trunkdict)
catalog_dict.append(assetdict)
Then, we put that dict into the catalog:
intake_esm_catalog["catalog_dict"]=catalog_dict
Option B: Catalog_file implementation#
The catalog_file
format needs to comply with the following rules:
all file types that can be opened by pandas are allowed to be set as
catalog_file
the
.csv
file needs a header which includes all catalog attributes
An example would be:
filename,variable
/work/mh0287/m221078/prj/switch/icon-oes/experiments/khwX155/outdata/khwX155_atm_mon_18500101.nc,tas_gmean
Note
Note that the catalog_file can also live in the cloud i.e. be an URL. You can host both the collection and catalog in the cloud as DKRZ does.
Best practice
For keeping clear overview, you better use the same prefix name for both catalog
and catalog_file
.
import pandas as pd
catalog_dict_df=pd.DataFrame(catalog_dict)
Saving a separate data base for assets or use a dictionary in the catalog?#
Use case: Updating the collection for a living project on file system#
Solution: Write a builder script and run it as a cronjob (automatically and regularly):
A typical builder for a community project contains the following sequence:
Create one or more lists of files based on a
find
shell command on the data base directory. This type of job is also named crawler as it crawls through the file system.Read the lists of files and create a
panda
s DataFrame for these files.Parse the file names and file paths and fill column values. That can be easily done by deconstructing filepaths and filenames into their parts assuming you defined a mandatory
Filenames that cannot be parsed should be sorted out
The data frame is saved as the final catalog as a
.csv
file. You can also compress it to.csv.gz
.
At DKRZ, we run scripts for project data on disk repeatedly in cronjobs to keep the catalog updated.
Builder tool examples#
The NCAR builder tool for community projects like CMIP6 and CMIP5.
DKRZ builder notebooks (based on NCAR tools) like this Era5 notebook
3. Validate and save the catalog:#
If we open the defined catalog with open_esm_datastore()
and try to_dataset_dict()
, we can check if our creation is successful. The resulting catalog should give us exactly 1 dataset from 18 assets as we aggregate over time.
import intake
validated_cat=intake.open_esm_datastore(
obj=dict(
df=catalog_dict_df,
esmcat=intake_esm_catalog
)
)
validated_cat
Intake-esmI catalog with 1 dataset(s) from 1 asset(s):
unique | |
---|---|
time_range | 1 |
path | 1 |
mip_era | 1 |
activity_id | 1 |
institution_id | 1 |
source_id | 1 |
experiment_id | 1 |
member_id | 1 |
table_id | 1 |
variable_id | 1 |
grid_label | 1 |
version | 1 |
derived_variable_id | 0 |
validated_cat.to_dataset_dict()
--> The keys in the returned dictionary of datasets are constructed as follows:
'activity_id.institution_id'
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
223 datasets = [
224 _open_dataset(
225 record[self.path_column_name],
(...)
241 for _, record in self.df.iterrows()
242 ]
--> 244 datasets = dask.compute(*datasets)
245 if len(datasets) == 1:
File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
664 postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
318 raise exc.with_traceback(tb)
--> 319 raise exc
File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
72 if kwargs:
---> 73 return func(*args, **kwargs)
74 else:
File /envs/lib/python3.11/site-packages/intake_esm/source.py:66, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
65 elif fsspec.utils.can_be_local(urlpath):
---> 66 url = fsspec.open_local(urlpath, **storage_options)
67 else:
File /envs/lib/python3.11/site-packages/fsspec/core.py:477, in open_local(url, mode, **storage_options)
473 raise ValueError(
474 "open_local can only be used on a filesystem which"
475 " has attribute local_file=True"
476 )
--> 477 with of as files:
478 paths = [f.name for f in files]
File /envs/lib/python3.11/site-packages/fsspec/core.py:177, in OpenFiles.__enter__(self)
176 break
--> 177 return [s.__enter__() for s in self]
File /envs/lib/python3.11/site-packages/fsspec/core.py:177, in <listcomp>(.0)
176 break
--> 177 return [s.__enter__() for s in self]
File /envs/lib/python3.11/site-packages/fsspec/core.py:102, in OpenFile.__enter__(self)
100 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 102 f = self.fs.open(self.path, mode=mode)
104 self.fobjects = [f]
File /envs/lib/python3.11/site-packages/fsspec/spec.py:1241, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
1240 ac = kwargs.pop("autocommit", not self._intrans)
-> 1241 f = self._open(
1242 path,
1243 mode=mode,
1244 block_size=block_size,
1245 autocommit=ac,
1246 cache_options=cache_options,
1247 **kwargs,
1248 )
1249 if compression is not None:
File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:184, in LocalFileSystem._open(self, path, mode, block_size, **kwargs)
183 self.makedirs(self._parent(path), exist_ok=True)
--> 184 return LocalFileOpener(path, mode, fs=self, **kwargs)
File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:315, in LocalFileOpener.__init__(self, path, mode, autocommit, fs, compression, **kwargs)
314 self.blocksize = io.DEFAULT_BUFFER_SIZE
--> 315 self._open()
File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:320, in LocalFileOpener._open(self)
319 if self.autocommit or "w" not in self.mode:
--> 320 self.f = open(self.path, mode=self.mode)
321 if self.compression:
FileNotFoundError: [Errno 2] No such file or directory: "/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/ls: cannot open directory '/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/': Permission denied"
The above exception was the direct cause of the following exception:
ESMDataSourceError Traceback (most recent call last)
Cell In[15], line 1
----> 1 validated_cat.to_dataset_dict()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()
File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
680 except Exception as exc:
681 if not skip_on_error:
--> 682 raise exc
683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
684 return self.datasets
File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
676 for task in gen:
677 try:
--> 678 key, ds = task.result()
679 datasets[key] = ds
680 except Exception as exc:
File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
447 raise CancelledError()
448 elif self._state == FINISHED:
--> 449 return self.__get_result()
451 self._condition.wait(timeout)
453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
819 def _load_source(key, source):
--> 820 return key, source.to_dask()
File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
275 def to_dask(self):
276 """Return xarray object (which will have chunks)"""
--> 277 self._load_metadata()
278 return self._ds
File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
281 """load metadata only if needed"""
282 if self._schema is None:
--> 283 self._schema = self._get_schema()
284 self.dtype = self._schema.dtype
285 self.shape = self._schema.shape
File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
206 def _get_schema(self) -> Schema:
207 if self._ds is None:
--> 208 self._open_dataset()
209 metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
210 self._schema = Schema(
211 datashape=None,
212 dtype=None,
(...)
215 extra_metadata=metadata,
216 )
File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
266 self._ds.attrs[OPTIONS['dataset_key']] = self.key
268 except Exception as exc:
--> 269 raise ESMDataSourceError(
270 f"""Failed to load dataset with key='{self.key}'
271 You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
272 """
273 ) from exc
ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.DKRZ'
You can use `cat['ScenarioMIP.DKRZ'].df` to inspect the assets/files for this key.
Intake esm allows to write catalog file(s) with the serialize()
function. The only argument is the name of the catalog which will be used as filename. It writes the two parts of the catalog either together in a .json
file:
validated_cat.serialize("validated_cat")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/validated_cat.json
Or in two seperated files if we provide catalog_type=file
as a second argument. The test.json
may be very large while we can save disk space if we svae the data base in a separate .csv.gz
file:
validated_cat.serialize("validated_cat", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/validated_cat.json
4. Multivariable assets#
If an asset contains more than one variable, intake-esm
also features pre-selection of a variable before loading the data. Here is a user guide on how to configure the collection for that.
the variable_column of the catalog must contain iterables (
list
,tuple
,set
) of values.the user must specifiy a dictionary of functions for converting values in certain columns into iterables. This is done via the
csv_kwargs
argument such that the collection needs to be opened as follows:
import ast
import intake
col = intake.open_esm_datastore(
"multi-variable-collection.json",
csv_kwargs={"converters": {"variable": ast.literal_eval}},
)
col
See also
This tutorial is part of a series on intake
:
You can also do another CMIP6 tutorial from the official intake page.