Source code for cordex.transform

from warnings import warn

import numpy as np
import xarray as xr
from pyproj import CRS, Transformer

from . import cf

xr.set_options(keep_attrs=True)


def _map_crs(x_stack, y_stack, src_crs, trg_crs=None):
    """coordinate transformation of longitude and latitude"""

    from cartopy import crs as ccrs

    if trg_crs is None:
        trg_crs = ccrs.PlateCarree()
    result = trg_crs.transform_points(src_crs, x_stack, y_stack)
    return result[:, :, 0], result[:, :, 1]


# wrapper function for xarray.apply_ufunc
def map_crs(x, y, src_crs, trg_crs=None):
    """coordinate transformation using cartopy

    Transforms the coordinates x, y from the source crs
    into the target crs using cartopy.

    Parameters
    ----------
    x : float array like
        x coordinate of source crs.
    y : float array like
        y coordinate of source crs.
    src_crs : cartopy.crs
        Source coordinate reference system in which x and y
        are defined.
    trg_crs : cartopy.crs
        Target coordinate reference system into which x and y
        should be transformed. If `None`, `PlateCarree` is used.

    Returns
    -------
    x_map : xr.DataArray
        Projected x coordinate.
    y_map : xr.DataArray
        Projected y coordinate.

    """
    warn(
        "map_crs is deprecated, please use transform instead",
        DeprecationWarning,
        stacklevel=2,
    )
    y_stack, x_stack = xr.broadcast(y, x)
    input_core_dims = 2 * [list(x_stack.dims)] + [[], []]
    output_core_dims = 2 * [list(x_stack.dims)]

    result = xr.apply_ufunc(
        _map_crs,  # first the function
        x_stack,  # now arguments in the order expected by 'interp1_np'
        y_stack,
        src_crs,
        trg_crs,
        input_core_dims=input_core_dims,  # list with one entry per arg
        # [["rlat", "rlon"], ["rlat", "rlon"]],
        output_core_dims=output_core_dims,
        # exclude_dims=set(("lat",)),  # dimensions allowed to change size. Must be set!
    )

    result[0].name = "x_map"
    result[1].name = "y_map"

    return result


[docs] def derotate_vector(u, v, lon=None, lat=None, pollon=None, pollat=None): """Derotate vector components from rotated coordinates. The function performs a backward transformation of, e.g., velocity components u and v from a rotated spherical coordinate system to a geographical system. If only the components u and v are provided, it is assumed, they are DataArrays containing a rotated latitude longitude grid mapping and lon lat coordinates that are used for the transformation. Parameters ---------- u : float or DataArray u component of vector in rotated coordinate system. v : float or DataArray v component of vector in rotated coordinate system. lon : float or DataArray Longitude coordinates in which to transform the vector components. lat : float or DataArray Latitude coordinates in which to transform the vector components. pollon : float Longitude of north pole in geographical coordinate system. pollat : float Latitude of north pole in geographical coordinate system. Returns ------- ut : DataArray Transformed u vector component. vt : DataArray Transformed v vector component. """ if lon is None: lon = u.cf["longitude"] if lat is None: lat = v.cf["latitude"] if pollon is None: pollon = u.cf["grid_mapping"].grid_north_pole_longitude if pollat is None: pollat = v.cf["grid_mapping"].grid_north_pole_latitude rla = np.deg2rad(lon) phi = np.deg2rad(lat) zrla = rla zphi = phi zpollat = np.deg2rad(pollat) zpollon = np.deg2rad(pollon) pollond = pollon zsinpol = np.sin(zpollat) zcospol = np.cos(zpollat) if pollon < 0.0: pollond = 360.0 + pollon # compute vector abs in rotated coordinates zzrla = np.rad2deg(rla) zzrla = xr.where(zzrla > 180.0, zzrla - 360.0, zzrla) zzrla = np.deg2rad(zzrla) zd1 = zzrla - zpollon zd2 = zrla - zpollon # winkel zbeta berechen (schnittwinkel der breitenkreise) zarg1 = -np.sin(zd1) * np.cos(zphi) zarg2 = -zsinpol * np.cos(zphi) * np.cos(zd1) + zcospol * np.sin(zphi) zarg2 = xr.where(abs(zarg2) < 1.0e-20, 1.0e-20, zarg2) zrlas = np.arctan2(zarg1, zarg2) zarg = -np.sin(zpollat) * np.sin(zd2) * np.sin(zrlas) - np.cos(zd2) * np.cos(zrlas) zarg = zarg.clip(min=-1.0, max=1.0) zbeta = abs(np.arccos(zarg)) zbeta = xr.where( (-((np.rad2deg(rla)) - (pollond - 180.0)) < 0.0) & (-((np.rad2deg(rla)) - (pollond - 180.0)) >= -180.0), -zbeta, zbeta, ) x1 = u * np.cos(zbeta) - v * np.sin(zbeta) y1 = u * np.sin(zbeta) + v * np.cos(zbeta) return x1, y1
def _transform(x, y, src_crs, trg_crs): """helper function for transforming coordinates""" # always_xy=True # https://proj.org/faq.html#why-is-the-axis-ordering-in-proj-not-consistent transformer = Transformer.from_crs(src_crs, trg_crs, always_xy=True) xt, yt = transformer.transform(x, y) return xt, yt
[docs] def transform(x, y, src_crs, trg_crs=None): """Coordinate transformation using pyproj. Transforms the coordinates x, y from the source crs into a target crs using pyproj. Parameters ---------- x : DataArray X coordinate. y : DataArray Y coordinate. src_crs : pyproj.CRS Source coordinate reference system in which x and y are defined. trg_crs : pyproj.CRS Target coordinate reference system into which x and y should be transformed. If not supplied, ``EPSG:4326`` is the default. Returns ------- xt : DataArray Transformed x coordinate. yt : DataArray Transformed y coordinate. """ if trg_crs is None: # default target crs trg_crs = CRS("EPSG:4326") y_stack, x_stack = xr.broadcast(y, x) input_core_dims = [x_stack.dims, y_stack.dims] + [[], []] output_core_dims = [x_stack.dims, y_stack.dims] xt, yt = xr.apply_ufunc( _transform, x_stack, y_stack, src_crs, trg_crs, input_core_dims=input_core_dims, output_core_dims=output_core_dims, ) xt.name = "xt" yt.name = "yt" xt.attrs = {"epsg": trg_crs.to_epsg()} yt.attrs = {"epsg": trg_crs.to_epsg()} return xt, yt
[docs] def transform_coords(ds, src_crs=None, trg_crs=None, trg_dims=None): """Transform X and Y coordinates of a Dataset. The transformed coordinates will be added to the Dataset. This function is usefull to add, e.g., global lon/lat coordinates to a rotated pole grid. Parameters ---------- ds : Dataset or DataArray Dataset with input grid. src_crs : pyproj.CRS Source coordinate reference system in which X and Y are defined. If not supplied, a `grid_mapping` variable should be available to define the source CRS. trg_crs : pyproj.CRS Target coordinate reference system into which x and y should be transformed. If not supplied, ``EPSG:4326`` is the default. trg_dims: list or set Names of the output coordinates. Returns ------- ds : Dataset or DataArray Dataset with transformed coordinates. """ ds = ds.copy(deep=False) if trg_crs is None: # default target crs trg_crs = CRS("EPSG:4326") if trg_dims is None: trg_dims = ("xt", "yt") if src_crs is None: src_crs = CRS.from_cf(ds.cf["grid_mapping"].attrs) x, y = ds.cf["X"], ds.cf["Y"] xt, yt = transform(x, y, src_crs, trg_crs) return ds.assign_coords({trg_dims[0]: xt, trg_dims[1]: yt})
[docs] def transform_bounds( ds, src_crs=None, trg_crs=None, trg_dims=None, bnds_dim=None, keep_xy_bounds=False ): """Transform linear X and Y bounds of a Dataset. Transformation of of the bounds of linear X and Y coordinates into the target crs according to https://cfconventions.org/cf-conventions/cf-conventions.html#cell-boundaries If the linear X and Y coordinate bounds are not available, they will be created and transformed. Parameters ---------- ds : xr.Dataset Dataset containing linear X and Y coordinates, e.g., `rlon` and `rlat` src_crs : pyproj.CRS Source coordinate reference system in which X and Y are defined. If not supplied, a `grid_mapping` variable should be available to define the source CRS. trg_crs : pyproj.CRS Target coordinate reference system into which bounds should be transformed. If not supplied, ``EPSG:4326`` is the default. trg_dims: list or set Names of the bounds coordinates. bnds_dim: str Names of the bounds dimension. Returns ------- bounds : xr.Dataset Dataset with X and Y bounds in target crs. Probably some 2D coordinates. References ---------- Please refer to the CF conventions document : https://cfconventions.org/cf-conventions/cf-conventions.html#cell-boundaries """ ds = ds.copy(deep=False) if src_crs is None: src_crs = CRS.from_cf(ds.cf["grid_mapping"].attrs) if trg_crs is None: # default target crs trg_crs = CRS("EPSG:4326") if trg_dims is None: trg_dims = ( ds.cf["longitude"].name + "_vertices", ds.cf["latitude"].name + "_vertices", ) if bnds_dim is None: bnds_dim = cf.BOUNDS_DIM bnds = ds.cf.add_bounds(("X", "Y")) x_bnds = bnds.cf.get_bounds("X").drop_vars(bnds.cf.bounds["X"]) y_bnds = bnds.cf.get_bounds("Y").drop_vars(bnds.cf.bounds["Y"]) # order is counterclockwise starting from lower left vertex v1 = transform(x_bnds.isel(bounds=0), y_bnds.isel(bounds=0), src_crs, trg_crs) v2 = transform(x_bnds.isel(bounds=1), y_bnds.isel(bounds=0), src_crs, trg_crs) v3 = transform(x_bnds.isel(bounds=1), y_bnds.isel(bounds=1), src_crs, trg_crs) v4 = transform(x_bnds.isel(bounds=0), y_bnds.isel(bounds=1), src_crs, trg_crs) xt_vertices = xr.concat([v1[0], v2[0], v3[0], v4[0]], dim=bnds_dim) # .transpose() # ..., "vertices" # ) yt_vertices = xr.concat([v1[1], v2[1], v3[1], v4[1]], dim=bnds_dim) # .transpose() xt_vertices.name = "xt_vertices" # cf.LON_BOUNDS yt_vertices.name = "yt_vertices" # cf.LAT_BOUNDS xt_vertices.attrs = cf.coords[cf.LON_BOUNDS] yt_vertices.attrs = cf.coords[cf.LAT_BOUNDS] bounds = xr.merge([xt_vertices, yt_vertices]).transpose( ds.cf["Y"].dims[0], ds.cf["X"].dims[0], bnds_dim ) ds[ds.cf["longitude"].name].attrs["bounds"] = trg_dims[0] ds[ds.cf["latitude"].name].attrs["bounds"] = trg_dims[1] return ds.assign_coords( { trg_dims[0]: bounds.xt_vertices.drop_vars( (ds.cf["X"].name, ds.cf["Y"].name) ), trg_dims[1]: bounds.yt_vertices.drop_vars( (ds.cf["X"].name, ds.cf["Y"].name) ), } )
def rotated_coord_transform(lon, lat, np_lon, np_lat, direction="rot2geo"): """Transforms a coordinate into a rotated grid coordinate and vice versa. The coordinates have to be given in degree and will be returned in degree. Parameters ---------- lon : float array like Longitude coordinate. lat : float array like Latitude coordinate. np_lon : float array like Longitude coordinate of the rotated north pole. np_lat : float array like Latitude coordinate of the rotated north pole. direction : str Direction of the rotation. Options are: 'rot2geo' (default) for a transformation to regular coordinates from rotated. 'geo2rot' transforms regular coordinates to rotated. Returns ------- lon_new : array like New longitude coordinate. lat_new : array like New latitude coordinate. """ warn( "rotated_coord_transform is deprecated, please use transform_xy instead", DeprecationWarning, stacklevel=2, ) # Convert degrees to radians lon = np.deg2rad(lon) lat = np.deg2rad(lat) theta = 90.0 - np_lat # Rotation around y-axis phi = np_lon + 180.0 # Rotation around z-axis # Convert degrees to radians phi = np.deg2rad(phi) theta = np.deg2rad(theta) # Convert from spherical to cartesian coordinates x = np.cos(lon) * np.cos(lat) y = np.sin(lon) * np.cos(lat) z = np.sin(lat) # Regular -> Rotated if direction == "geo2rot": x_new = ( np.cos(theta) * np.cos(phi) * x + np.cos(theta) * np.sin(phi) * y + np.sin(theta) * z ) y_new = -np.sin(phi) * x + np.cos(phi) * y z_new = ( -np.sin(theta) * np.cos(phi) * x - np.sin(theta) * np.sin(phi) * y + np.cos(theta) * z ) # Rotated -> Regular elif direction == "rot2geo": phi = -phi theta = -theta x_new = ( np.cos(theta) * np.cos(phi) * x + np.sin(phi) * y + np.sin(theta) * np.cos(phi) * z ) y_new = ( -np.cos(theta) * np.sin(phi) * x + np.cos(phi) * y - np.sin(theta) * np.sin(phi) * z ) z_new = -np.sin(theta) * x + np.cos(theta) * z # Convert cartesian back to spherical coordinates lon_new = np.arctan2(y_new, x_new) lat_new = np.arcsin(z_new) # Convert radians back to degrees lon_new = np.rad2deg(lon_new) lat_new = np.rad2deg(lat_new) return lon_new, lat_new def grid_mapping(pollon, pollat, mapping_name=None): """creates a grid mapping DataArray object""" if mapping_name is None: mapping_name = cf.DEFAULT_MAPPING_NCVAR da = xr.DataArray(np.zeros((), dtype=cf.grid_mapping_dtype)) attrs = cf.mapping.copy() attrs["grid_north_pole_longitude"] = pollon attrs["grid_north_pole_latitude"] = pollat da.attrs = attrs da.name = mapping_name return da