Intake IV - preprocessing and derived variables#
Overview
🎯 objectives: Learn how to integrate intake-esm
in your workflow
⌛ time_estimation: “30min”
☑️ requirements: intake_esm.__version__ == 2023.4.*
, at least 10GB memory.
intake I
© contributors: k204210
⚖ license:
Agenda
Based on DKRZ’s CMIP6 catalog, you learn in this part how to
add a preprocessing to
to_dataset_dict()
create a derived variable registry
import intake
#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#only for generating the web page we need to take the original link:
dkrz_cdp=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])
esm_dkrz=dkrz_cdp.dkrz_cmip6_disk
Use Preprocessing when opening assets and creating datasets#
When calling intake-esm’s to_dataset_dict
function, we can pass an argument preprocess. Its value should be a function which is applied to all assets before they are opened.
Note
For CMIP6, a preprocessing package has been developped for homogenizing and preparing datasets of different ESMs for a grand analysis featuring
renaming and setting of coordinates
adjusting grid values to fit into a common range (0-360 for lon)
E.g., if you would like to set some specific variables as coordinates, you can define a function which
receives an xarray dataset as an argument
returns a new xarray dataset
def correct_coordinates(ds) :
"""converts wrongly assigned data_vars to coordinates"""
ds = ds.copy()
for co in [
"x",
"y",
"lon",
"lat",
"lev",
"bnds",
"lev_bounds",
"lon_bounds",
"lat_bounds",
"time_bounds",
"lat_verticies",
"lon_verticies",
]:
if co in ds.variables:
ds = ds.set_coords(co)
return ds
Now, when you open the dataset dictionary, you provide it for preprocess:
cat=esm_dkrz.search(variable_id="tas",
table_id="Amon",
source_id="MPI-ESM1-2-HR",
member_id="r1i1p1f1",
experiment_id="ssp370"
)
test_dsets=cat.to_dataset_dict(
zarr_kwargs={"consolidated":True},
cdf_kwargs={"chunks":{"time":1}},
preprocess=correct_coordinates
)
--> The keys in the returned dictionary of datasets are constructed as follows:
'activity_id.source_id.experiment_id.table_id.grid_label'
/tmp/ipykernel_804/2104413171.py:7: DeprecationWarning: cdf_kwargs and zarr_kwargs are deprecated and will be removed in a future version. Please use xarray_open_kwargs instead.
test_dsets=cat.to_dataset_dict(
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
223 datasets = [
224 _open_dataset(
225 record[self.path_column_name],
(...)
241 for _, record in self.df.iterrows()
242 ]
--> 244 datasets = dask.compute(*datasets)
245 if len(datasets) == 1:
File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
664 postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
318 raise exc.with_traceback(tb)
--> 319 raise exc
File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
72 if kwargs:
---> 73 return func(*args, **kwargs)
74 else:
File /envs/lib/python3.11/site-packages/intake_esm/source.py:77, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
76 else:
---> 77 ds = xr.open_dataset(url, **xarray_open_kwargs)
78 if preprocess is not None:
File /envs/lib/python3.11/site-packages/xarray/backends/api.py:566, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
565 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 566 backend_ds = backend.open_dataset(
567 filename_or_obj,
568 drop_variables=drop_variables,
569 **decoders,
570 **kwargs,
571 )
572 ds = _dataset_from_backend_dataset(
573 backend_ds,
574 filename_or_obj,
(...)
584 **kwargs,
585 )
TypeError: NetCDF4BackendEntrypoint.open_dataset() got an unexpected keyword argument 'consolidated'
The above exception was the direct cause of the following exception:
ESMDataSourceError Traceback (most recent call last)
Cell In[3], line 7
1 cat=esm_dkrz.search(variable_id="tas",
2 table_id="Amon",
3 source_id="MPI-ESM1-2-HR",
4 member_id="r1i1p1f1",
5 experiment_id="ssp370"
6 )
----> 7 test_dsets=cat.to_dataset_dict(
8 zarr_kwargs={"consolidated":True},
9 cdf_kwargs={"chunks":{"time":1}},
10 preprocess=correct_coordinates
11 )
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()
File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()
File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
680 except Exception as exc:
681 if not skip_on_error:
--> 682 raise exc
683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
684 return self.datasets
File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
676 for task in gen:
677 try:
--> 678 key, ds = task.result()
679 datasets[key] = ds
680 except Exception as exc:
File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
447 raise CancelledError()
448 elif self._state == FINISHED:
--> 449 return self.__get_result()
451 self._condition.wait(timeout)
453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
819 def _load_source(key, source):
--> 820 return key, source.to_dask()
File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
275 def to_dask(self):
276 """Return xarray object (which will have chunks)"""
--> 277 self._load_metadata()
278 return self._ds
File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
281 """load metadata only if needed"""
282 if self._schema is None:
--> 283 self._schema = self._get_schema()
284 self.dtype = self._schema.dtype
285 self.shape = self._schema.shape
File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
206 def _get_schema(self) -> Schema:
207 if self._ds is None:
--> 208 self._open_dataset()
209 metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
210 self._schema = Schema(
211 datashape=None,
212 dtype=None,
(...)
215 extra_metadata=metadata,
216 )
File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
266 self._ds.attrs[OPTIONS['dataset_key']] = self.key
268 except Exception as exc:
--> 269 raise ESMDataSourceError(
270 f"""Failed to load dataset with key='{self.key}'
271 You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
272 """
273 ) from exc
ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'
You can use `cat['ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'].df` to inspect the assets/files for this key.
Derived variables#
Most of the following is taken from the intake-esm tutorial.
A “derived variable” in this case is a variable that doesn’t itself exist in an intake-esm catalog, but can be computed (i.e., “derived”) from variables that do exist in the catalog. Currently, the derived variable implementation requires variables on the same grid, etc.; i.e., it assumes that all variables involved can be merged within the same dataset. […] Derived variables could include more sophsticated diagnostic output like aggregations of terms in a tracer budget or gradients in a particular field.
The registry of the derived variables can be connected to the catalog. When users open
import intake
import intake_esm
from intake_esm import DerivedVariableRegistry
See also
This tutorial is part of a series on intake
:
You can also do another CMIP6 tutorial from the official intake page.