Source code for cordex.cmor.cmor

import os
from os import path as op
from warnings import warn
from collections.abc import Iterable
from collections import OrderedDict

import cf_xarray as cfxr
import pandas as pd
import xarray as xr

# from cf_xarray.units import units

try:
    import cmor
except Exception:
    warn("no python cmor package available, consider installing it")

# import cordex as cx
from .. import create_dataset
from ..domain import domain
from .config import (
    freq_map,
    loffsets,
    options,
    time_axis_names,
    time_dtype,
    time_units_default,
    units_format,
    grid_entry_mapping,
)

# from .derived import derivator
from .utils import (
    _encode_time,
    _get_cfvarinfo,
    _get_cordex_pole,
    _read_table,
    _strip_time_cell_method,
    _tmp_table,
    mid_of_month,
    month_bounds,
    time_bounds_name,
)

xr.set_options(keep_attrs=True)

try:
    import flox  # noqa

    has_flox = True
    default_flox_method = "blockwise"
except ImportError:
    has_flox = False
    default_flox_method = None


def resample_both_closed(ds, hfreq, op, **kwargs):
    rolling = getattr(ds.rolling(time=hfreq + 1, center=True), op)()
    freq = f"{hfreq}H"
    return rolling.resample(time=freq, loffset=0.5 * pd.Timedelta(hfreq, "H")).nearest()


def _resample_op(ds, hfreq, op, **kwargs):
    rolling = getattr(ds.rolling(time=hfreq + 1, center=True), op)()
    freq = f"{hfreq}H"
    return rolling.resample(time=freq, loffset=0.5 * pd.Timedelta(hfreq, "H")).nearest()


def _get_loffset(time):
    return loffsets.get(time, None)


def _clear_time_axis(ds):
    """Delete timesteps with NaN arrays"""
    for data_var in ds.data_vars:
        ds = ds.dropna(dim="time", how="all")
    return ds


def _resample(
    ds, time, time_cell_method="point", label="left", time_offset=True, **kwargs
):
    """Resample a variable."""
    if time_cell_method == "point":
        return ds.resample(
            time=time, label="right", closed="right"
        ).nearest()  # .interpolate("nearest") # use as_freq?
    elif time_cell_method == "mean":
        if time_offset is True:
            loffset = _get_loffset(time)
        else:
            loffset = None
        mean_kwargs = {}
        if has_flox:
            mean_kwargs["engine"] = "flox"
            mean_kwargs["method"] = default_flox_method
        return ds.resample(time=time, label=label, offset=loffset, **kwargs).mean(
            **mean_kwargs
        )
    else:
        raise Exception(f"unknown time_cell_method: {time_cell_method}")


def _get_bnds(values):
    bnds = [None] * (len(values) + 1)
    bnds[0] = values[0] - (values[1] - values[0]) / 2
    bnds[len(values)] = values[-1] + (values[-1] - values[-2]) / 2
    i = 1
    while i < len(values):
        bnds[i] = values[i] - (values[i] - values[i - 1]) / 2
        i += 1
    return bnds


def _crop_to_cordex_domain(ds, domain_id, tolerance=1.0e-2):
    """Crop data to official CORDEX domain"""
    try:
        grid = domain(domain_id)
    except KeyError:
        raise KeyError(
            f'unknown domain_id: "{domain_id}", if you are using an unofficial domain, set crop=False'
        )

    info = grid.cx.info()
    rlon_tol = tolerance * info["dlon"]
    rlat_tol = tolerance * info["dlat"]
    # the method=='nearest' approach does not work well with dask
    return ds.sel(
        rlon=slice(grid.rlon.min() - rlon_tol, grid.rlon.max() + rlon_tol),
        rlat=slice(grid.rlat.min() - rlat_tol, grid.rlat.max() + rlat_tol),
    )


def _load_table(table):
    cmor.load_table(table)


def _setup(dataset_table, mip_table, grids_table=None, inpath="."):
    if grids_table is None:
        grids_table = f'{options["table_prefix"]}_grids.json'
    cmor.setup(
        inpath,
        set_verbosity=cmor.CMOR_NORMAL,
        netcdf_file_action=cmor.CMOR_REPLACE,
        exit_control=getattr(cmor, options["exit_control"]),
        logfile=None,
    )
    cmor.dataset_json(dataset_table)
    grid_id = cmor.load_table(grids_table)
    table_id = cmor.load_table(mip_table)
    cmor.set_table(table_id)
    return {"grid": grid_id, "mip": table_id}


def _get_time_axis_name(time_cell_method):
    """Get the name of the CMOR time coordinate"""
    return time_axis_names.get(time_cell_method, "time")


def _define_grid(ds, table_id, rewrite_grid="auto"):
    if rewrite_grid == "auto":
        try:
            grid_attrs = ds.cx.info()
            grid = create_dataset(**grid_attrs, bounds=True)
        except (KeyError, ValueError):
            warn("can not rewrite grid")
            grid = ds
    else:
        grid = ds
    grid_mapping = grid.cf["grid_mapping"]
    grid_mapping_name = grid_mapping.grid_mapping_name
    entry_mapping = grid_entry_mapping.get(grid_mapping_name)
    default_attrs = entry_mapping["default_attrs"]
    # if "domain_id" in ds.attrs:
    #     try:
    #         grid = domain(ds.attrs["domain_id"], bounds=True)
    #         lon_vertices = grid.lon_vertices.to_numpy()
    #         lat_vertices = grid.lat_vertices.to_numpy()
    #     except KeyError:
    #         warn(f"Unknown domain: {ds.attrs['domain_id']}")
    # else:
    #     lon_vertices = None
    #     lat_vertices = None

    # if "longitude" not in ds.cf.coords or "latitude" not in ds.cf.coords:
    #     ds = cx.transform_coords(ds, trg_dims=("lon", "lat"))

    cmor.set_table(table_id)

    cmorY = cmor.axis(
        table_entry=entry_mapping["Y"],
        coord_vals=grid.cf["Y"].to_numpy(),
        units=grid.cf["Y"].units,
    )
    cmorX = cmor.axis(
        table_entry=entry_mapping["X"],
        coord_vals=grid.cf["X"].to_numpy(),
        units=grid.cf["X"].units,
    )

    latitude = grid.cf["latitude"].to_numpy()
    longitude = grid.cf["longitude"].to_numpy()
    latitude_vertices = (
        grid.cf.get_bounds("latitude").to_numpy()
        if "latitude" in grid.cf.bounds
        else None
    )
    longitude_vertices = (
        grid.cf.get_bounds("longitude").to_numpy()
        if "longitude" in grid.cf.bounds
        else None
    )

    cmorGrid = cmor.grid(
        [cmorY, cmorX],
        latitude=latitude,
        longitude=longitude,
        latitude_vertices=latitude_vertices,
        longitude_vertices=longitude_vertices,
    )

    # attrs_dict = {k: grid_mapping.attrs.get(v) or v for k, v in default_attrs.items()}
    attrs_dict = OrderedDict()
    for k, v in default_attrs.items():
        if k in grid_mapping.attrs:
            value = grid_mapping.attrs[k]
            if isinstance(value, Iterable):
                for i, val in enumerate(value):
                    # see
                    attrs_dict[k + str(i + 1)] = [val, v[1]]
                    # attrs_dict[k] = [val, v[1]]
            else:
                attrs_dict[k] = [value, v[1]]
        elif v[0] is not None:
            attrs_dict[k] = v
        else:
            raise KeyError(
                f"Missing attribute {k} in grid mapping variable: {grid_mapping_name}"
            )

    cmor.set_grid_mapping(
        cmorGrid,
        grid_mapping_name,
        # parameter_names=attrs_dict,
        parameter_names=list(attrs_dict.keys()),
        parameter_values=[v[0] for v in attrs_dict.values()],
        parameter_units=[v[1] for v in attrs_dict.values()],
        # len(attrs_dict) * [""],
    )

    return cmorGrid


def _time_bounds(ds, freq=None):
    return cfxr.bounds_to_vertices(ds.cf.get_bounds("time"), "bounds")


def _define_time(ds, table_id, time_cell_method=None):
    cmor.set_table(table_id)

    if time_cell_method is None:
        warn("no time_cell_method given, assuming: mean")
        time_cell_method = "mean"

    # encode time and time bounds

    time_axis_encode = _encode_time(ds.time).to_numpy()
    time_axis_name = _get_time_axis_name(time_cell_method)

    if time_cell_method != "point":
        time_bounds = _time_bounds(ds)
        time_bounds.encoding = ds.time.encoding
        time_bounds_encode = _encode_time(time_bounds).to_numpy()
    else:
        time_bounds_encode = None

    return cmor.axis(
        time_axis_name,
        coord_vals=time_axis_encode,
        # cell_bounds=_get_bnds(time_encode.to_numpy()),
        cell_bounds=time_bounds_encode,
        units=ds.time.encoding["units"],
    )


def _define_axis(ds, table_ids, time_cell_method="point"):
    cmorGrid = _define_grid(ds, table_ids["grid"])

    if "time" in ds:
        cmorTime = _define_time(ds, table_ids["mip"], time_cell_method)
    else:
        cmorTime = None

    # add z axis if required
    if "Z" in ds.cf.dims:
        z = ds.cf["Z"]
        print(ds.cf.bounds)
        if "Z" not in ds.cf.bounds:
            # try to add bounds via cf_xarray
            warn("found not bounds for z-axis, trying to create bounds...")
            ds = ds.cf.add_bounds("Z", output_dim="bnds")
        bounds = ds.cf.get_bounds("Z")
        if bounds.ndim != 1:
            # transpose to bounds
            bounds = cfxr.bounds_to_vertices(bounds, "bnds")
        cmorZ = cmor.axis(
            table_entry=z.name,
            coord_vals=z.to_numpy(),
            units=z.attrs.get("units"),
            cell_bounds=bounds.to_numpy(),
        )
    else:
        cmorZ = None

    return cmorTime, cmorZ, cmorGrid


def _cmor_write(da, table_id, cmorTime, cmorZ, cmorGrid, file_name=True):
    """write to netcdf via cmor python API"""

    cmor.set_table(table_id)

    # create coordinate ids
    coords = []
    if cmorTime:
        coords.append(cmorTime)
    if cmorZ:
        coords.append(cmorZ)
    coords.append(cmorGrid)

    cmor_var_kwargs = {}
    for kwarg in ["positive", "missing_value", "original_name", "history", "comment"]:
        if kwarg in da.attrs:
            cmor_var_kwargs[kwarg] = da.attrs[kwarg]

    var_id = cmor.variable(
        table_entry=da.name, units=da.units, axis_ids=coords, **cmor_var_kwargs
    )

    cmor.set_deflate(var_id, **options["compression"])

    if "time" in da.coords:
        ntimes_passed = da.time.size
    else:
        ntimes_passed = None
    cmor.write(var_id, da.to_numpy(), ntimes_passed=ntimes_passed)

    return cmor.close(var_id, file_name=file_name)


def _units_convert(da, cf_units, format=None):
    import pint_xarray  # noqa
    from cf_xarray.units import units  # noqa

    if format is None:
        format = units_format
    da_quant = da.pint.quantify()
    da = da_quant.pint.to(cf_units).pint.dequantify(format=units_format)
    return da


def _cf_units_convert(da, table, mapping_table={}):
    """Convert units.

    Convert units according to the rules in units_convert_rules dict.
    Maybe metpy can do this also: https://unidata.github.io/MetPy/latest/tutorials/unit_tutorial.html

    """
    from cf_xarray.units import units as cfxr_units  # noqa

    if da.name in mapping_table:
        map_units = mapping_table[da.name].get("units")
        atr_units = da.attrs.get("units")
        if map_units is not None and atr_units is not None and atr_units != map_units:
            warn(
                f"unit [{map_units}] from mapping table differs from units attribute [{da}], assuming [{map_units}] is correct"
            )
            units = map_units
        elif map_units is not None:
            units = map_units
        else:
            units = atr_units
    else:
        try:
            units = da.units
        except AttributeError:
            warn(
                f"could not find units attribute on {da.name}, if you are sure about the units, set allow_units_convert=False."
            )
    da.attrs["units"] = units
    cf_units = table["variable_entry"][da.name]["units"]

    if cfxr_units.Unit(units) != cfxr_units.Unit(cf_units):
        warn(f"converting units {units} from input data to CF units {cf_units}")
        da = _units_convert(da, cf_units)
        # da = da.pint.to(cf_units)
    return da


# def _convert_cmor_to_resample_frequency(cmor_table):
#    """Convert CMOR table name into resample frequency"""
#    return resample_frequency[cmor_table]


def _get_time_units(ds):
    """Determine time units of dataset"""
    try:
        return ds.time.encoding["units"]
    except Exception:
        return ds.time.units
    return None


def _set_time_encoding(ds, units, orig):
    u = None
    if units is not None:
        if units == "input":
            if "units" in orig.time.encoding:
                u = orig.time.encoding["units"]
            elif "units" in orig.time.attrs:
                u = orig.time.attrs["units"]
        else:
            u = units
    if u is None:
        u = time_units_default
        warn(f"time units are set to default: {u}")
    if not isinstance(u, str):
        raise TypeError(f"time units invalid: {u}")
    ds.time.encoding["units"] = u
    ds.time.encoding["dtype"] = time_dtype
    return ds


def _update_time_axis(ds, freq=None, time_cell_method=None):
    if time_cell_method is None:
        warn("no time_cell_method given, assuming: point")
        time_cell_method = "point"

    if time_cell_method == "mean":
        time_bounds = _time_bounds(ds)
        time_bounds.encoding = ds.time.encoding
        time_bounds_encode = _encode_time(time_bounds).to_numpy()
    else:
        time_bounds_encode = None
    return time_bounds_encode


def _rewrite_time_axis(ds, freq=None, calendar=None):
    """rewrite time axis to ensure correct timestamps

    For monthly frequencies, the timestamp will be mid of month.

    """
    ds = ds.copy(deep=False)
    pd_freq = freq_map.get(freq, None)
    if freq is None:
        pd_freq = xr.infer_freq(ds.time).upper()
    if calendar is None:
        calendar = ds.time.dt.calendar
    start = ds.time.data[0]
    if freq == "mon":
        ds["time"] = mid_of_month(ds)
        return ds

    date_range = xr.cftime_range(
        start,
        periods=ds.time.size,
        freq=pd_freq,
        calendar=calendar,  # inclusive="left"
    )

    return ds.assign_coords(time=date_range)


def _add_month_bounds(ds):
    ds[time_bounds_name] = month_bounds(ds)
    return ds


def _add_time_bounds(ds, cf_freq):
    """add time bounds

    Take special care of monthly frequencies.

    """
    # monthly time bounds are funny in ESGF, it seems that
    # they should always be the first of each month and first
    # of second month. This is not really the bounds you would get
    # from arithmetics but seems fine. We have to take special care
    # of them here.
    if cf_freq == "mon":
        ds = _add_month_bounds(ds)
    else:
        try:
            ds = ds.convert_calendar(
                ds.time.dt.calendar, use_cftime=False
            ).cf.add_bounds("time")
        except Exception:
            # wait for cftime arithemtics in xarry here:
            warn("could not add time bounds.")

    ds[time_bounds_name].encoding = ds.time.encoding
    ds.time.attrs.update({"bounds": time_bounds_name})
    return ds


def _adjust_frequency(ds, cf_freq, input_freq=None, time_cell_method=None):
    if input_freq is None and "time" in ds.coords:
        input_freq = xr.infer_freq(ds.time)
    if input_freq is None:
        warn("Could not determine frequency of input data, will assume it is correct.")
        return ds
    else:
        input_freq = input_freq.upper()
    pd_freq = freq_map[cf_freq]
    if pd_freq != input_freq:
        warn(f"resampling input data from {input_freq} to {pd_freq}")
        resample = _resample(
            ds, pd_freq, time_cell_method=time_cell_method, **options["resample_kwargs"]
        )
        return resample
    return ds


def cmorize_cmor(
    ds, out_name, cmor_table, dataset_table, grids_table=None, inpath=None
):
    # get meta info from cmor table
    if inpath is None:
        inpath = os.path.dirname(cmor_table)

    dataset_table_json = dataset_table
    cmor_table_json = cmor_table

    if isinstance(dataset_table, dict):
        dataset_table_json = _tmp_table(dataset_table)
    if isinstance(cmor_table, dict):
        cmor_table_json = _tmp_table(cmor_table)

    cfvarinfo = _get_cfvarinfo(out_name, cmor_table)

    if cfvarinfo is None:
        raise Exception(f"{out_name} not found in {cmor_table}")

    time_cell_method = _strip_time_cell_method(cfvarinfo)

    table_ids = _setup(
        dataset_table_json, cmor_table_json, grids_table=grids_table, inpath=inpath
    )

    cmorTime, cmorZ, cmorGrid = _define_axis(
        ds, table_ids, time_cell_method=time_cell_method
    )

    return _cmor_write(ds[out_name], table_ids["mip"], cmorTime, cmorZ, cmorGrid)


def prepare_variable(
    ds,
    out_name,
    cmor_table,
    mapping_table=None,
    replace_coords=False,
    allow_units_convert=False,
    allow_resample=False,
    input_freq=None,
    domain_id=None,
    time_units=None,
    rewrite_time_axis=False,
    use_cftime=False,
    squeeze=True,
    crop=None,
    guess_coord_axis=None,
):
    """prepares a variable for cmorization."""

    if mapping_table is None:
        mapping_table = {}

    ds = ds.copy(deep=False)
    # use cf_xarray to guess coordinate meta data
    if guess_coord_axis is None:
        guess_coord_axis = "X" not in ds.cf.dims or "Y" not in ds.cf.dims
    if guess_coord_axis is True:
        ds = ds.cf.guess_coord_axis(verbose=True)

    if isinstance(cmor_table, str):
        cmor_table = _read_table(cmor_table)
    cfvarinfo = _get_cfvarinfo(out_name, cmor_table)

    cf_freq = cfvarinfo["frequency"]
    time_cell_method = _strip_time_cell_method(cfvarinfo)

    if isinstance(ds, xr.DataArray):
        ds = ds.to_dataset()

    # ensure that we propagate everything
    # ds = xr.decode_cf(ds, decode_coords="all")

    # no mapping table provided, we assume datasets has already correct out_names and units.
    if out_name in ds.data_vars:
        var_ds = ds  # [[out_name]]
    elif mapping_table and out_name not in mapping_table:
        raise Exception(
            f"Could not find {out_name} in dataset. Please make sure, variable names and units have CF standard or pass a mapping table."
        )
    else:
        varname = mapping_table[out_name]["varname"]
        # cf_name = varinfo["cf_name"]
        var_ds = ds  # [[varname]]  # .to_dataset()
        var_ds = var_ds.rename({varname: out_name})
    # remove point coordinates, e.g, height2m
    if squeeze is True:
        var_ds = var_ds.squeeze(drop=True)
    if crop is True:
        # var_ds.attrs["domain_id"] = domain_id
        var_ds = _crop_to_cordex_domain(var_ds, domain_id)
    if replace_coords is True:
        # domain_id = domain_id or var_ds.cx.domain_id
        grid = domain(domain_id, bounds=True)
        var_ds = var_ds.assign_coords(rlon=grid.rlon, rlat=grid.rlat)
        var_ds = var_ds.assign_coords(lon=grid.lon, lat=grid.lat)
        var_ds = var_ds.assign_coords(
            lon_vertices=grid.lon_vertices, lat_vertices=grid.lat_vertices
        )

    if "time" in var_ds:
        # ensure cftime
        var_ds = var_ds.convert_calendar(ds.time.dt.calendar, use_cftime=True)
        if allow_resample is True:
            var_ds = _adjust_frequency(var_ds, cf_freq, input_freq, time_cell_method)
        if rewrite_time_axis is True:
            var_ds = _rewrite_time_axis(var_ds, cf_freq)
        if "time" not in ds.cf.bounds and time_cell_method != "point":
            warn("adding time bounds")
            var_ds = _add_time_bounds(var_ds, cf_freq)
        if use_cftime is False:
            var_ds = var_ds.convert_calendar(ds.time.dt.calendar, use_cftime=False)
        var_ds = _set_time_encoding(var_ds, time_units, ds)

    if allow_units_convert is True:
        var_ds[out_name] = _cf_units_convert(
            var_ds[out_name], cmor_table, mapping_table
        )

    try:
        mapping = ds.cf["grid_mapping"]  # _get_pole(ds)
    except KeyError:
        warn(f"adding pole from archive specs: {domain_id}")
        mapping = _get_cordex_pole(domain_id)

    if "time" in mapping.coords:
        raise Exception("grid_mapping variable should have no time coordinate!")

    var_ds[mapping.name] = mapping

    return var_ds


[docs] def cmorize_variable( ds, out_name, cmor_table, dataset_table, mapping_table=None, grids_table=None, inpath=None, replace_coords=False, allow_units_convert=False, allow_resample=False, input_freq=None, domain_id=None, crop=None, time_units=None, rewrite_time_axis=False, outpath=None, **kwargs, ): """Cmorizes a variable. This functions call the python cmor API and creates a cmorized NetCDF file from the input dataset. Coordinates of the input dataset should be understandable by ``cf_xarray`` if they follow basic CF conventions. If a vertical coordinate is available, it should have an ``axis="Z"`` attribute so it can be understood by ``cf_xarray`` and it should be named after the unique key of the coordinate in the cmor grids table (not the out_name), e.g., name it ``sdepth`` if you need a soil layer coordinate instead of ``depth``. Variables may have one of the following attributes that are used as keyword arguments in the call to `cmor_variable <https://cmor.llnl.gov/mydoc_cmor3_api/#cmor_variable>`_, e.g., ``positive``, ``missing_value``, ``original_name``, ``history`` or ``comment``. Parameters ---------- ds : xr.Dataset Dataset containing at least the variable that should be cmorized. out_name: str CF out_name of the variable that should be cmorized. The corresponding variable name in the dataset is looked up from the mapping_table if provided. cmor_table : str or dict Cmor table dict of filepath to cmor table (json). dataset_table: str or dict Dataset table dict of filepath to dataset cmor table (json). mapping_table: dict Mapping of input variable names and meta data to CF out_name. Required if the variable name in the input dataset is not equal to out_name. grids_table: str Filepath to cmor grids table. inpath: str Path to cmor tables, if ``inpath == None``, inpath is the path to ``cmor_table``. This is required to find additional cmor tables, like ``CMIP6_coordinates``, ``CMIP6_grids`` etc. replace_coords: bool Replace coordinates from input file and create them from archive specifications. Only possible, if a domain identifier is given in the global attributes or as a keyword argument, e.g., see the ``domain_id`` keyword. allow_units_convert: bool Allow units to be converted if they do not agree with the units in the cmor table. Defaults to ``False`` to make the user aware of having correct ``units`` attributes set. allow_resample: bool Allow to resample temporal data to the frequency required from the cmor table. Handles both downsampling and upsampling. Defaults to ``False`` to make users aware of the correct frequency input. input_freq: str The frequency of the input dataset in pandas notation. It ``None`` and the dataset contains a time axis, the frequency will be determined automatically using ``pandas.infer_freq`` if possible. domain_id: str Cordex domain identifier. If ``None``, the domain will be determined by the ``domain_id`` global attribute if available. If ``domain_id`` is given as a keyword, it will override a possible ``domain_id`` global attribute. crop: bool Crop dataset to official Cordex domain if it contains, e.g., a nudging zone. If set to ``None``, cropping will be done if a domain identifier is given or the domain can be identified automatically. Set to ``crop=False``, if you have an 'unofficial' domain_id. time_units: str Time units of the cmorized dataset (``ISO 8601``). If ``None``, time units will be set to default (``"days since 1950-01-01T00:00:00"``). If ``time_units='input'``, the original time units of the input dataset are used. rewrite_time_axis: bool Rewrite the time axis to CF compliant timestamps. outpath: str Root directory for output (can be either a relative or full path). This will override the outpath defined in the dataset cmor input table (``dataset_table``). **kwargs: Argumets passed to prepare_variable. Returns ------- filename Filepath to cmorized file. Example ------- To cmorize a dataset, you can use ,e.g.,:: import cordex as cx from cordex.cmor import cmorize_variable from cordex.tables import cordex_cmor_table ds = cx.domain("EUR-11", dummy="topo").rename(topo="orog") dataset_table = cordex_cmor_table(f"CORDEX-CMIP6_remo_example") filename = cmorize_variable( ds, "orog", cmor_table=cordex_cmor_table("CORDEX-CMIP6_fx"), dataset_table=dataset_table, replace_coords=True, allow_units_convert=True, ) """ ds = ds.copy() if "CORDEX_domain" in kwargs: warn( "'CORDEX_domain' keyword is deprecated, please use the 'domain_id' keyword instead", DeprecationWarning, stacklevel=2, ) domain_id = kwargs["CORDEX_domain"] del kwargs["CORDEX_domain"] if domain_id is None: try: domain_id = ds.cx.domain_id except Exception as e: warn(e) warn( "could not identify CORDEX domain, try to set the 'domain_id' if there is no domain identifier in global attributes." ) crop = False replace_coords = False else: if "domain_id" in ds.attrs and ds.attrs.get("domain_id") != domain_id: warn( f"overwriting global attribute 'domain_id' with value '{ds.attrs['domain_id']}' with '{domain_id}' from keyword argument." ) ds.attrs["domain_id"] = domain_id if domain_id and crop is None: crop = True if inpath is None: inpath = os.path.dirname(cmor_table) if not isinstance(dataset_table, dict): dataset_table = _read_table(dataset_table) if outpath: dataset_table["outpath"] = outpath ds_prep = prepare_variable( ds, out_name, cmor_table, domain_id=domain_id, mapping_table=mapping_table, replace_coords=replace_coords, input_freq=input_freq, rewrite_time_axis=rewrite_time_axis, time_units=time_units, allow_resample=allow_resample, allow_units_convert=allow_units_convert, crop=crop, **kwargs, ) return cmorize_cmor( ds_prep, out_name, cmor_table, dataset_table, grids_table, inpath )
class CmorizerBase: def __init__(self): pass class Cmorizer(CmorizerBase): def __init__( self, dataset_table=None, mapping_table=None, grids_table=None, inpath=None, replace_coords=False, allow_units_convert=False, allow_resample=False, input_freq=None, domain_id=None, time_units=None, rewrite_time_axis=False, outpath=None, ): """Cmorizes a variable. Parameters ---------- dataset_table: str or dict Dataset table dict of filepath to dataset cmor table (json). mapping_table: dict Mapping of input variable names and meta data to CF out_name. Required if the variable name in the input dataset is not equal to out_name. inpath: str Path to cmor tables, if ``inpath == None``, inpath is the path to ``cmor_table``. This is required to find additional cmor tables, like ``CMIP6_coordinates``, ``CMIP6_grids`` etc. grids_table: str Filepath to cmor grids table. replace_coords: bool Replace coordinates from input file and create them from archive specifications. allow_units_convert: bool Allow units to be converted if they do not agree with the units in the cmor table. Defaults to ``False`` to make the user aware of having correct ``units`` attributes set. allow_resample: bool Allow to resample temporal data to the frequency required from the cmor table. Handles both downsampling and upsampling. Defaults to ``False`` to make users aware of the correct frequency input. input_freq: str The frequency of the input dataset in pandas notation. It ``None`` and the dataset contains a time axis, the frequency will be determined automatically using ``pandas.infer_freq`` if possible. domain_id: str Cordex domain short name. If ``None``, the domain will be determined by the ``domain_id`` global attribute if available. time_units: str Time units of the cmorized dataset (``ISO 8601``). If ``None``, time units will be set to default (``"days since 1950-01-01T00:00:00"``). If ``time_units='input'``, the original time units of the input dataset are used. rewrite_time_axis: bool Rewrite the time axis to CF compliant timestamps. outpath: str Root directory for output (can be either a relative or full path). This will override the outpath defined in the dataset cmor input table (``dataset_table``). """ super().__init__() self.dataset_table = dataset_table self.mapping_table = mapping_table self.grids_table = grids_table self.inpath = inpath self.replace_coords = replace_coords self.allow_units_convert = allow_units_convert self.allow_resample = allow_resample self.domain_id = domain_id self.time_units = time_units self.rewrite_time_axis = rewrite_time_axis self.outpath = outpath if op.isfile(self.dataset_table): self.dataset_table = _read_table(self.dataset_table) if outpath: self.dataset_table["outpath"] = outpath def preprocess( self, ds, out_name, cmor_table, replace_coords=False, allow_units_convert=False, allow_resample=False, input_freq=None, domain_id=None, time_units=None, rewrite_time_axis=False, use_cftime=False, squeeze=True, ): """prepares a variable for cmorization.""" return prepare_variable( ds, out_name, cmor_table, mapping_table=self.mapping_table, replace_coords=replace_coords or self.replace_coords, allow_units_convert=allow_units_convert or self.allow_units_convert, allow_resample=allow_resample or self.allow_resample, input_freq=input_freq, domain_id=domain_id or self.domain_id, time_units=time_units or self.time_units, rewrite_time_axis=rewrite_time_axis or self.rewrite_time_axis, use_cftime=use_cftime, squeeze=squeeze, ) def _write_with_cmor(self, ds, out_name, cmor_table): return cmorize_cmor( ds, out_name, cmor_table, self.dataset_table, self.grids_table, self.inpath ) def cmorize(self, ds, out_name, cmor_table): """Cmorizes a variable. Parameters ---------- ds : xr.Dataset Dataset containing at least the variable that should be cmorized. out_name: str CF out_name of the variable that should be cmorized. The corresponding variable name in the dataset is looked up from the mapping_table if provided. cmor_table : str or dict Cmor table dict of filepath to cmor table (json). dataset_table: str or dict Dataset table dict of filepath to dataset cmor table (json). mapping_table: dict Mapping of input variable names and meta data to CF out_name. Required if the variable name in the input dataset is not equal to out_name. grids_table: str Filepath to cmor grids table. inpath: str Path to cmor tables, if ``inpath == None``, inpath is the path to ``cmor_table``. This is required to find additional cmor tables, like ``CMIP6_coordinates``, ``CMIP6_grids`` etc. replace_coords: bool Replace coordinates from input file and create them from archive specifications. allow_units_convert: bool Allow units to be converted if they do not agree with the units in the cmor table. Defaults to ``False`` to make the user aware of having correct ``units`` attributes set. allow_resample: bool Allow to resample temporal data to the frequency required from the cmor table. Handles both downsampling and upsampling. Defaults to ``False`` to make users aware of the correct frequency input. input_freq: str The frequency of the input dataset in pandas notation. It ``None`` and the dataset contains a time axis, the frequency will be determined automatically using ``pandas.infer_freq`` if possible. domain_id: str Cordex domain short name. If ``None``, the domain will be determined by the ``domain_id`` global attribute if available. time_units: str Time units of the cmorized dataset (``ISO 8601``). If ``None``, time units will be set to default (``"days since 1950-01-01T00:00:00"``). If ``time_units='input'``, the original time units of the input dataset are used. rewrite_time_axis: bool Rewrite the time axis to CF compliant timestamps. outpath: str Root directory for output (can be either a relative or full path). This will override the outpath defined in the dataset cmor input table (``dataset_table``). **kwargs: Argumets passed to prepare_variable. Returns ------- filename Filepath to cmorized file. """ ds_prep = self.preprocess(ds, out_name, cmor_table) return self._write_with_cmor(ds_prep, out_name, cmor_table) def cfinfo(self, out_name, cmor_table): if isinstance(cmor_table, str): cmor_table = _read_table(cmor_table) return _get_cfvarinfo(out_name, cmor_table)