Source code for cordex.cmor.utils

"""CORDEX Cmorization utilities."""

import datetime as dt
import json
import tempfile
from warnings import warn

import cftime as cfdt
import xarray as xr
from xarray import DataArray, Dataset

from .. import cordex_domain
from .config import time_bounds_name

xr.set_options(keep_attrs=True)


[docs] def to_cftime(date, calendar="standard"): """Convert date to cftime object Can handle all CMIP6 calendars. Parameters ---------- date : datetime object, str Input date. calendar : str Calendar of the cftime object. Returns ------- cftime : cftime object Cftime ojbect. """ if isinstance(date, dt.date) and not isinstance(date, dt.datetime): date = dt.datetime.combine(date, dt.time()) elif isinstance(date, cfdt.datetime): # do nothing return date elif isinstance(date, str): # xarray hack for cftime.strptime return xr.date_range( start=date, end=date, calendar=calendar, use_cftime=True, )[0] # date = pd.to_datetime(date) return cfdt.datetime( date.year, date.month, date.day, date.hour, date.minute, date.second, date.microsecond, calendar=calendar, )
def _seasons_bounds(year, calendar=None): if calendar is None: import datetime as dt args = {} else: # calendar requires cftime import cftime as dt args = {"calendar": calendar} return { "DJF": (dt.datetime(year - 1, 12, 1, **args), dt.datetime(year, 3, 1, **args)), "MAM": (dt.datetime(year, 3, 1, **args), dt.datetime(year, 6, 1, **args)), "JJA": (dt.datetime(year, 6, 1, **args), dt.datetime(year, 9, 1, **args)), "SON": (dt.datetime(year, 9, 1, **args), dt.datetime(year, 12, 1, **args)), }
[docs] def season_bounds(date): """Determines the temporal bounds of the meteorological season. Uses the month to determine the season and returns the temporal bounds of the season. Parameters ---------- date : datetime object Date in the current season. Returns ------- season : tuple of datetime objects Temporal bounds of the current meteorological season. """ month = date.month if month != 12: year = date.year else: year = date.year + 1 try: calendar = date.calendar except Exception: calendar = None seasons_bounds = _seasons_bounds(year, calendar=calendar) return seasons_bounds[season(date)]
def _seasons(): seasons = [ ("DJF", (12, 1, 2)), ("MAM", (3, 4, 5)), ("JJA", (6, 7, 8)), ("SON", (9, 10, 11)), ] return seasons # @ensure_cftime
[docs] def season(date): """Determines the meteorological season. Uses the month to determine the season. Parameters ---------- date : datetime object Date in the current season. Returns ------- season : str Meteorological season of the current date. """ return next(season for season, months in _seasons() if date.month in months)
def mid_of_season(date): """Determine the mid of the current season Parameters ---------- date : datetime object Date in the current season. Returns ------- mid_of_season : datetime object Mid date of the current season. """ bounds = season_bounds(date) return bounds[0] + 0.5 * (bounds[1] - bounds[0]) def _month_bounds(date): """Determine the bounds of the current month. Parameters ---------- date : datetime object Date in the current month. Returns ------- month_bounds : tuple of datetime object Temporal bounds of the current month. """ if isinstance(date, dt.date): date = dt.datetime.combine(date, dt.time()) month = date.month begin = date.replace(day=1, hour=0, minute=0, second=0) # this does not work with cftime # end = (date + reld.relativedelta(months=1)).replace(day=1) if month == 12: year = date.year + 1 month = 1 else: year = date.year month = date.month + 1 end = date.replace(day=1, year=year, month=month, hour=0, minute=0, second=0) return begin, end
[docs] def month_bounds(ds, bounds_dim="bounds"): """Returns the bounds of the current month. Parameters ---------- ds : Dataset Dataset with cf time coordinate. bounds_dim: str Name of the bounds dimension. If not supplied, the default is ``bounds``. Returns ------- ds : DataArray Monthly time bounds. """ if not isinstance(ds, (Dataset, DataArray)): warn( "using month_bounds without xarray object is deprecated and will be removed in the future.", DeprecationWarning, stacklevel=2, ) return _month_bounds(ds) ds = ds.copy(deep=False) bounds = xr.apply_ufunc( _month_bounds, ds.time, output_core_dims=[[], []], vectorize=True, keep_attrs=True, ) return xr.concat(bounds, dim=bounds_dim).transpose(..., bounds_dim) ds.time.attrs["bounds"] = time_bounds_name return ds.assign_coords({time_bounds_name: bounds})
def _mid_of_month(date): """Determine the mid of the current month. Parameters ---------- date : datetime object Date in the current month. Returns ------- mid_of_month : datetime object Mid date of the current month. """ bounds = _month_bounds(date) mid = bounds[0] + 0.5 * (bounds[1] - bounds[0]) return mid
[docs] def mid_of_month(ds): """Determine the mid of the current month. Parameters ---------- ds : Dataset or DataArray Dataset with time axis. Returns ------- da : DataArray The mid date of the month. """ if not isinstance(ds, (Dataset, DataArray)): warn( "using mid_of_month without xarray object is deprecated and will be removed in the future.", DeprecationWarning, stacklevel=2, ) return _mid_of_month(ds) time = xr.apply_ufunc(_mid_of_month, ds.time, vectorize=True, keep_attrs=True) return time
def _get_pole(ds): """returns the first pole we find in the dataset""" pol_names = ["rotated_latitude_longitude", "rotated_pole"] for pol in pol_names: if pol in ds: return ds[pol] warn(f"no grid_mapping found in dataset, tried: {pol_names}") return None def _get_grid_definitions(CORDEX_domain, **kwargs): return cordex_domain(CORDEX_domain, add_vertices=True, **kwargs) def _get_cordex_pole(CORDEX_domain): return cordex_domain(CORDEX_domain).rotated_latitude_longitude def _encode_time(time): """encode xarray time axis into cf values see https://github.com/pydata/xarray/issues/4412 """ return xr.conventions.encode_cf_variable(time.variable) def _read_table(table): return _read_json_file(table) def _read_json_file(filename): with open(filename) as f: data = json.load(f) return data def _write_json_file(filename, data): with open(filename, "w") as fp: json.dump(data, fp, indent=4) return filename def _get_cfvarinfo(out_name, table): """Returns variable entry from cmor table""" if isinstance(table, str): table = _read_table(table) info = table["variable_entry"].get(out_name, None) if info is None: raise Exception(f"{out_name} not found in table {get_table_id(table)}") return info def get_table_id(table): """parse the table_id from a cmor table header""" separator = " " table_id = table["Header"].get("table_id", None) if table_id is None: raise Exception("no table_id in Header") if separator in table_id: return table_id.split(separator)[1] return table_id def _tmp_table(table, format="json"): """creates a temporay table json file""" _, filename = tempfile.mkstemp() warn(f"writing temporary table to {filename}") if format == "json": return _write_json_file(filename, table) def _get_time_cell_method(cf_varname, table): return _strip_time_cell_method(_get_cfvarinfo(cf_varname, table)) def _strip_time_cell_method(cfvarinfo): try: return cfvarinfo["cell_methods"].split("time:")[1].strip() except Exception: return None