Compression methods for NetCDF files#
with Xarray
This Notebook gives a guideline how to use recent basic lossy and lossless compression for netdf output via xarray
. It is an implementation of these slides using example data of Earth System Models AWI-CM. We will compare the writing speed and the compression ratio of different methods.
For using lossless compression methods on netcdf files, we use the hdf5plugin tool that enables to use additional hdf5filters within python.
For lossy compression, we use the numcodecs lib to calculate the bitrounding.
We use the BitShuffle filter prior to compression whenever possible. This will rearrange the binary data in order to improve compression.
Requirements#
Lossy compression requires a lot of memory.
Reading lossless compressions other than deflated requires netcdf version 4.9 or newer with access to the HDF5 filters
Lossless compression methods#
zlib#
zlib has been the standard compression method since it was introduced in netcdf. It is based on the deflate algorithm, other compression packages use dictionaries.
Zstandard#
Zstandard allows multithreading. It is used by package registries and is supported by linux systemss. Zstandard offers features beyond zlib/zlib-ng, including better performance and compression.
LZ4#
LZ4 has a focus on very fast compression. It scales with CPUs.
Blosc#
Blosc uses the blocking technique to not only reduce the size of large datasets on-disk or in-memory, but also to accelerate memory-bound computations. Its default compressor is based on LZ4 and Zstandard.
import hdf5plugin
import time
import fsspec as fs
import glob
import xarray as xr
import tqdm
help(hdf5plugin)
Help on package hdf5plugin:
NAME
hdf5plugin
DESCRIPTION
This module provides compiled shared libraries for their use as HDF5 filters
under windows, MacOS and linux.
PACKAGE CONTENTS
_config
_filters
_utils
_version
test
FUNCTIONS
__getattr__(name)
DATA
BLOSC2_ID = 32026
BLOSC_ID = 32001
BSHUF_ID = 32008
BZIP2_ID = 307
FCIDECOMP_ID = 32018
FILTERS = {'blosc': 32001, 'blosc2': 32026, 'bshuf': 32008, 'bzip2': 3...
LZ4_ID = 32004
PLUGINS_PATH = '/envs/lib/python3.11/site-packages/hdf5plugin/plugins'
PLUGIN_PATH = '/envs/lib/python3.11/site-packages/hdf5plugin/plugins'
SZ3_ID = 32024
SZ_ID = 32017
ZFP_ID = 32013
ZSTD_ID = 32015
version = '4.1.3'
version_info = version_info(major=4, minor=1, micro=3, releaselevel='f...
FILE
/envs/lib/python3.11/site-packages/hdf5plugin/__init__.py
%store -r times
#print(times)
times=0
no stored variable or alias times
On Levante, you can use the plugins from the CMIP pool directory /work/ik1017/hdf5plugin/plugins/
:
hdf5plugin.PLUGIN_PATH="/work/ik1017/hdf5plugin/plugins/"
%set_env HDF5_PLUGIN_PATH={hdf5plugin.PLUGIN_PATH}
env: HDF5_PLUGIN_PATH=/work/ik1017/hdf5plugin/plugins/
We use the ocean surface temperature tos
in this example:
source="/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/AWI/AWI-CM-1-1-MR/ssp370/r1i1p1f1/Omon/tos/gn/v20181218/tos_Omon_AWI-CM-1-1-MR_ssp370_r1i1p1f1_gn_201501-202012.nc"
pwd=!pwd
pwd=pwd[0]
source_uncompressed=f"{pwd}/temp.nc"
sds=xr.open_mfdataset(source).isel(time=slice(1,13))
for var in sds.variables:
sds[var].encoding["zlib"]=False
sds.to_netcdf(source_uncompressed)
sds
<xarray.Dataset> Dimensions: (time: 12, bnds: 2, ncells: 830305, vertices: 16) Coordinates: * time (time) datetime64[ns] 2015-02-15 ... 2016-01-16T12:00:00 lat (ncells) float64 dask.array<chunksize=(830305,), meta=np.ndarray> lon (ncells) float64 dask.array<chunksize=(830305,), meta=np.ndarray> Dimensions without coordinates: bnds, ncells, vertices Data variables: time_bnds (time, bnds) datetime64[ns] dask.array<chunksize=(12, 2), meta=np.ndarray> tos (time, ncells) float32 dask.array<chunksize=(12, 830305), meta=np.ndarray> lat_bnds (ncells, vertices) float64 dask.array<chunksize=(830305, 16), meta=np.ndarray> lon_bnds (ncells, vertices) float64 dask.array<chunksize=(830305, 16), meta=np.ndarray> Attributes: (12/39) frequency: mon Conventions: CF-1.7 CMIP-6.2 creation_date: 2018-12-18T12:00:00Z data_specs_version: 01.00.27 experiment: ssp370 experiment_id: ssp370 ... ... parent_experiment_id: historical parent_mip_era: CMIP6 parent_source_id: AWI-CM-1-1-MR parent_time_units: days since 1850-1-1 parent_variant_label: r1i1p1f1 activity_id: ScenarioMIP AerChemMIP
omon2d=xr.open_mfdataset(
source_uncompressed,
engine="h5netcdf",
parallel=False
)
The following “compression : configuration” dictionary is used to configure the encoding
keyword argument in xarray’s to_netcdf:
comprdict=dict(
zlib=dict(
engine="h5netcdf",
compr=dict(
zlib=True,
complevel=5
)
),
zstd=dict(
engine="h5netcdf",
#from python 3.11:
compr=dict(**hdf5plugin.Bitshuffle(cname="zstd"))
#compr=dict(**hdf5plugin.Zstd())
),
lz4=dict(
engine="h5netcdf",
#from python 3.11:
compr=dict(**hdf5plugin.Bitshuffle(cname="lz4"))
#compr=dict(**hdf5plugin.Bitshuffle(lz4=True))
),
blosc=dict(
engine="h5netcdf",
compr=dict(**hdf5plugin.Blosc(cname='blosclz', shuffle=1))
)
)
comprdict["lz4"]
{'engine': 'h5netcdf',
'compr': {'compression': 32008, 'compression_opts': (0, 2)}}
sourcesize=fs.filesystem("file").du(source_uncompressed)
print(f"The size of the uncompressed source file is {sourcesize/1024/1024} MB")
The size of the uncompressed source file is 253.42475032806396 MB
resultdir={}
for compr,config in tqdm.tqdm(comprdict.items()):
enc=dict()
for var in omon2d.data_vars:
enc[var]=config["compr"]
start=time.time()
omon2d.to_netcdf(f"{pwd}/test_{compr}_compression.nc",
mode="w",
engine=config["engine"],
unlimited_dims="time",
encoding=enc,
)
end=time.time()
resultdir[compr]=dict(
speed=sourcesize/(end-start)/1024/1024,
ratio=fs.filesystem("file").du(f"{pwd}/test_{compr}_compression.nc")/sourcesize
)
0%| | 0/4 [00:00<?, ?it/s]
25%|██▌ | 1/4 [00:26<01:20, 26.95s/it]
50%|█████ | 2/4 [00:45<00:43, 21.77s/it]
75%|███████▌ | 3/4 [01:01<00:19, 19.22s/it]
100%|██████████| 4/4 [01:18<00:00, 18.48s/it]
100%|██████████| 4/4 [01:18<00:00, 19.66s/it]
with open(f"results_{str(times)}.csv","w") as f:
for k,v in resultdir.items():
f.write(f"{k},{sourcesize},{v['speed']},{v['ratio']}\n")
Reading not-deflated data#
Before open a file compressed with something else than zlib, you have to import hdf5plugin first:
import hdf5plugin
import xarray as xr
outf=xr.open_dataset(f"{pwd}/test_zstd_compression.nc",engine="h5netcdf")
outf
<xarray.Dataset> Dimensions: (time: 12, bnds: 2, ncells: 830305, vertices: 16) Coordinates: * time (time) datetime64[ns] 2015-02-15 ... 2016-01-16T12:00:00 lat (ncells) float64 ... lon (ncells) float64 ... Dimensions without coordinates: bnds, ncells, vertices Data variables: time_bnds (time, bnds) datetime64[ns] ... tos (time, ncells) float32 ... lat_bnds (ncells, vertices) float64 ... lon_bnds (ncells, vertices) float64 ... Attributes: (12/39) frequency: mon Conventions: CF-1.7 CMIP-6.2 creation_date: 2018-12-18T12:00:00Z data_specs_version: 01.00.27 experiment: ssp370 experiment_id: ssp370 ... ... parent_experiment_id: historical parent_mip_era: CMIP6 parent_source_id: AWI-CM-1-1-MR parent_time_units: days since 1850-1-1 parent_variant_label: r1i1p1f1 activity_id: ScenarioMIP AerChemMIP
Lossy#
Direct
BitRound
ing with 16 bits to be kept. This precision can be considered as similar to e.g. ERA5 data (24 bit Integer space).Calculate number of bits with information level 0.99 via xbitinfo.
losstimestart=time.time()
import numcodecs
rounding = numcodecs.BitRound(keepbits=16)
for var in omon2d.data_vars:
if "bnds" in var :
continue
omon2d[var].data=rounding.decode(
rounding.encode(
omon2d[var].load().data
)
)
losstimeend=time.time()
resultdir={}
for compr,config in tqdm.tqdm(comprdict.items()):
enc=dict()
for var in omon2d.data_vars:
enc[var]=config["compr"]
start=time.time()
omon2d.to_netcdf(f"{pwd}/test_{compr}_compression_lossy.nc",
mode="w",
engine=config["engine"],
unlimited_dims="time",
encoding=enc,
)
end=time.time()
resultdir[compr]=dict(
speed=sourcesize/(end-start+losstimeend-losstimestart)/1024/1024,
ratio=fs.filesystem("file").du(f"{pwd}/test_{compr}_compression_lossy.nc")/sourcesize
)
0%| | 0/4 [00:00<?, ?it/s]
25%|██▌ | 1/4 [00:27<01:23, 27.71s/it]
50%|█████ | 2/4 [00:45<00:44, 22.11s/it]
75%|███████▌ | 3/4 [01:03<00:20, 20.14s/it]
100%|██████████| 4/4 [01:21<00:00, 19.32s/it]
100%|██████████| 4/4 [01:21<00:00, 20.44s/it]
with open(f"results_{str(times)}.csv","a") as f:
for k,v in resultdir.items():
f.write(f"{k}_lossy,{sourcesize},{v['speed']},{v['ratio']}\n")
Xbitinfo#
omon2d=xr.open_mfdataset(
source_uncompressed,
engine="h5netcdf",
parallel=False
)
import xbitinfo as xb
import time
bitinfostart=time.time()
for var in omon2d.data_vars:
if "bnds" in var:
continue
dims=[dim for dim in omon2d[[var]].dims.keys() if "ncell" in dim]
print(dims)
if dims:
bitinfo = xb.get_bitinformation(omon2d[[var]], dim=dims, implementation="python")
keepbits = xb.get_keepbits(bitinfo, inflevel=0.99)
print(keepbits)
if keepbits[var][0] > 0 :
print(keepbits[var][0])
omon2d[var] = xb.xr_bitround(omon2d[[var]], keepbits)[var] # this one wraps around numcodecs.bitround
bitinfoend=time.time()
['ncells']
/envs/lib/python3.11/site-packages/dask/core.py:121: RuntimeWarning: divide by zero encountered in log
return func(*(_execute_task(a, cache) for a in args))
<xarray.Dataset>
Dimensions: (inflevel: 1)
Coordinates:
dim <U6 'ncells'
* inflevel (inflevel) float64 0.99
Data variables:
tos (inflevel) int64 5
<xarray.DataArray 'tos' ()>
array(5)
Coordinates:
dim <U6 'ncells'
inflevel float64 0.99
resultdir={}
for compr,config in tqdm.tqdm(comprdict.items()):
enc=dict()
for var in omon2d.data_vars:
enc[var]=config["compr"]
start=time.time()
omon2d.to_netcdf(f"{pwd}/test_{compr}_compression_lossy_xbit.nc",
mode="w",
engine=config["engine"],
unlimited_dims="time",
encoding=enc,
)
end=time.time()
resultdir[compr]=dict(
speed=sourcesize/(end-start+bitinfoend-bitinfostart)/1024/1024,
ratio=fs.filesystem("file").du(f"{pwd}/test_{compr}_compression_lossy_xbit.nc")/sourcesize
)
0%| | 0/4 [00:00<?, ?it/s]
25%|██▌ | 1/4 [00:25<01:17, 25.76s/it]
50%|█████ | 2/4 [00:43<00:42, 21.06s/it]
75%|███████▌ | 3/4 [01:00<00:19, 19.21s/it]
100%|██████████| 4/4 [01:17<00:00, 18.27s/it]
100%|██████████| 4/4 [01:17<00:00, 19.34s/it]
with open(f"results_{str(times)}.csv","a") as f:
for k,v in resultdir.items():
f.write(f"{k}_lossy_xbit,{sourcesize},{v['speed']},{v['ratio']}\n")
Write the results#
import pandas as pd
import glob
df = pd.concat((pd.read_csv(f,names=["type","insize","write_speed [Mb/s]","ratio"]) for f in glob.glob("results*.csv")), ignore_index=True)
df.groupby("type").mean()[["write_speed [Mb/s]","ratio"]].sort_values(by="write_speed [Mb/s]",ascending=False)
write_speed [Mb/s] | ratio | |
---|---|---|
type | ||
lz4 | 15.652033 | 0.786459 |
blosc | 14.619010 | 0.795161 |
lz4_lossy | 14.105202 | 0.753412 |
zstd | 13.968158 | 0.779027 |
blosc_lossy | 13.907145 | 0.776796 |
zstd_lossy | 13.808572 | 0.746050 |
blosc_lossy_xbit | 12.589180 | 0.711952 |
lz4_lossy_xbit | 12.488856 | 0.701765 |
zstd_lossy_xbit | 12.024892 | 0.693472 |
zlib | 9.404780 | 0.861020 |
zlib_lossy | 9.091680 | 0.829576 |
zlib_lossy_xbit | 8.720787 | 0.755925 |
!rm test_*compression*.nc
!rm temp.nc