"""Shared functions for fixes."""
import logging
import os
from datetime import datetime, timedelta
from functools import cache
import dask.array as da
import iris
import numpy as np
import pandas as pd
from cf_units import Unit
from iris import NameConstraint
from iris.coords import Coord
from scipy.interpolate import interp1d
from esmvalcore.iris_helpers import date2num
logger = logging.getLogger(__name__)
def add_aux_coords_from_cubes(cube, cubes, coord_dict):
"""Add auxiliary coordinate to cube from another cube in list of cubes.
Parameters
----------
cube : iris.cube.Cube
Input cube to which the auxiliary coordinates will be added.
cubes : iris.cube.CubeList
List of cubes which contains the desired coordinates as single cubes.
coord_dict : dict
Dictionary of the form ``coord_name: coord_dims``, where ``coord_name``
is the ``var_name`` (:obj:`str`) of the desired coordinates and
``coord_dims`` a :obj:`tuple` of :obj:`int` describing the coordinate
dimensions in ``cube``.
Raises
------
ValueError
``cubes`` do not contain a desired coordinate or multiple copies of
it.
"""
for coord_name, coord_dims in coord_dict.items():
coord_cube = cubes.extract(NameConstraint(var_name=coord_name))
if len(coord_cube) != 1:
msg = (
f"Expected exactly one coordinate cube '{coord_name}' in "
f"list of cubes {cubes}, got {len(coord_cube):d}"
)
raise ValueError(
msg,
)
coord_cube = coord_cube[0]
aux_coord = cube_to_aux_coord(coord_cube)
cube.add_aux_coord(aux_coord, coord_dims)
cubes.remove(coord_cube)
def _map_on_filled(function, array):
"""Map function on filled array."""
if array.size == 0:
return array
# We support dask and numpy arrays
# Note: numpy's dispatch mechanism does not work here because of the usage
# of `np.ma.filled` and `np.ma.masked_array`.
num_module = da if isinstance(array, da.core.Array) else np
mask = num_module.ma.getmaskarray(array)
# Fill masked values with dummy value (simply use first value in array for
# this to preserve dtype; these entries get re-masked later so the actual
# value does not matter). Note: `array.fill_value` is not defined for dask
# arrays, and `ma.filled` also works for regular (non-masked) arrays.
fill_value = num_module.ravel(array)[0]
array = num_module.ma.filled(array, fill_value)
# Apply function and return masked array
if isinstance(array, da.Array):
array = da.map_blocks(
function,
array,
dtype=array.dtype,
enforce_ndim=True,
meta=da.utils.meta_from_array(array),
)
else:
array = function(array)
return num_module.ma.masked_array(array, mask=mask)
[docs]
def add_plev_from_altitude(cube):
"""Add pressure level coordinate from altitude coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
Raises
------
ValueError
``cube`` does not contain coordinate ``altitude``.
"""
if cube.coords("altitude"):
height_coord = cube.coord("altitude")
if height_coord.units != "m":
height_coord.convert_units("m")
altitude_to_pressure = get_altitude_to_pressure_func()
pressure_points = _map_on_filled(
altitude_to_pressure,
height_coord.core_points(),
)
if height_coord.core_bounds() is None:
pressure_bounds = None
else:
pressure_bounds = _map_on_filled(
altitude_to_pressure,
height_coord.core_bounds(),
)
pressure_coord = iris.coords.AuxCoord(
pressure_points,
bounds=pressure_bounds,
var_name="plev",
standard_name="air_pressure",
long_name="pressure",
units="Pa",
)
cube.add_aux_coord(pressure_coord, cube.coord_dims(height_coord))
return
msg = (
"Cannot add 'air_pressure' coordinate, 'altitude' coordinate not "
"available"
)
raise ValueError(
msg,
)
[docs]
def add_altitude_from_plev(cube):
"""Add altitude coordinate from pressure level coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
Raises
------
ValueError
``cube`` does not contain coordinate ``air_pressure``.
"""
if cube.coords("air_pressure"):
plev_coord = cube.coord("air_pressure")
if plev_coord.units != "Pa":
plev_coord.convert_units("Pa")
pressure_to_altitude = get_pressure_to_altitude_func()
altitude_points = _map_on_filled(
pressure_to_altitude,
plev_coord.core_points(),
)
if plev_coord.core_bounds() is None:
altitude_bounds = None
else:
altitude_bounds = _map_on_filled(
pressure_to_altitude,
plev_coord.core_bounds(),
)
altitude_coord = iris.coords.AuxCoord(
altitude_points,
bounds=altitude_bounds,
var_name="alt",
standard_name="altitude",
long_name="altitude",
units="m",
)
cube.add_aux_coord(altitude_coord, cube.coord_dims(plev_coord))
return
msg = (
"Cannot add 'altitude' coordinate, 'air_pressure' coordinate not "
"available"
)
raise ValueError(
msg,
)
def add_scalar_depth_coord(cube, depth=0.0):
"""Add scalar coordinate 'depth' with value of `depth`m."""
logger.debug("Adding depth coordinate (%sm)", depth)
depth_coord = iris.coords.AuxCoord(
depth,
var_name="depth",
standard_name="depth",
long_name="depth",
units=Unit("m"),
attributes={"positive": "down"},
)
try:
cube.coord("depth")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(depth_coord, ())
return cube
def add_scalar_height_coord(cube, height=2.0):
"""Add scalar coordinate 'height' with value of `height`m."""
logger.debug("Adding height coordinate (%sm)", height)
height_coord = iris.coords.AuxCoord(
height,
var_name="height",
standard_name="height",
long_name="height",
units=Unit("m"),
attributes={"positive": "up"},
)
try:
cube.coord("height")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(height_coord, ())
return cube
def add_scalar_lambda550nm_coord(cube):
"""Add scalar coordinate 'lambda550nm'."""
logger.debug("Adding lambda550nm coordinate")
lambda550nm_coord = iris.coords.AuxCoord(
550.0,
var_name="wavelength",
standard_name="radiation_wavelength",
long_name="Radiation Wavelength 550 nanometers",
units="nm",
)
try:
cube.coord("radiation_wavelength")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(lambda550nm_coord, ())
return cube
def add_scalar_typeland_coord(cube, value="default"):
"""Add scalar coordinate 'typeland' with value of `value`."""
logger.debug("Adding typeland coordinate (%s)", value)
typeland_coord = iris.coords.AuxCoord(
value,
var_name="type",
standard_name="area_type",
long_name="Land area type",
units=Unit("no unit"),
)
try:
cube.coord("area_type")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typeland_coord, ())
return cube
def add_scalar_typesea_coord(cube, value="default"):
"""Add scalar coordinate 'typesea' with value of `value`."""
logger.debug("Adding typesea coordinate (%s)", value)
typesea_coord = iris.coords.AuxCoord(
value,
var_name="type",
standard_name="area_type",
long_name="Ocean area type",
units=Unit("no unit"),
)
try:
cube.coord("area_type")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typesea_coord, ())
return cube
def add_scalar_typesi_coord(cube, value="sea_ice"):
"""Add scalar coordinate 'typesi' with value of `value`."""
logger.debug("Adding typesi coordinate (%s)", value)
typesi_coord = iris.coords.AuxCoord(
value,
var_name="type",
standard_name="area_type",
long_name="Sea Ice area type",
units=Unit("no unit"),
)
try:
cube.coord("area_type")
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typesi_coord, ())
return cube
def cube_to_aux_coord(cube):
"""Convert cube to iris AuxCoord."""
return iris.coords.AuxCoord(
points=cube.core_data(),
var_name=cube.var_name,
standard_name=cube.standard_name,
long_name=cube.long_name,
units=cube.units,
)
@cache
def get_altitude_to_pressure_func():
"""Get function converting altitude [m] to air pressure [Pa].
Returns
-------
callable
Function that converts altitude to air pressure.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
source_file = os.path.join(base_dir, "us_standard_atmosphere.csv")
data_frame = pd.read_csv(source_file, comment="#")
return interp1d(
data_frame["Altitude [m]"],
data_frame["Pressure [Pa]"],
kind="cubic",
fill_value="extrapolate",
)
def get_bounds_cube(cubes, coord_var_name):
"""Find bound cube for a given variable in a :class:`iris.cube.CubeList`.
Parameters
----------
cubes : iris.cube.CubeList
List of cubes containing the coordinate bounds for the desired
coordinate as single cube.
coord_var_name : str
``var_name`` of the desired coordinate (without suffix ``_bnds`` or
``_bounds``).
Returns
-------
iris.cube.Cube
Bounds cube.
Raises
------
ValueError
``cubes`` do not contain the desired coordinate bounds or multiple
copies of them.
"""
for bounds in ("bnds", "bounds"):
bound_var = f"{coord_var_name}_{bounds}"
cube = cubes.extract(NameConstraint(var_name=bound_var))
if len(cube) == 1:
return cube[0]
if len(cube) > 1:
msg = f"Multiple cubes with var_name '{bound_var}' found"
raise ValueError(
msg,
)
msg = (
f"No bounds for coordinate variable '{coord_var_name}' available in "
f"cubes\n{cubes}"
)
raise ValueError(
msg,
)
@cache
def get_pressure_to_altitude_func():
"""Get function converting air pressure [Pa] to altitude [m].
Returns
-------
callable
Function that converts air pressure to altitude.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
source_file = os.path.join(base_dir, "us_standard_atmosphere.csv")
data_frame = pd.read_csv(source_file, comment="#")
return interp1d(
data_frame["Pressure [Pa]"],
data_frame["Altitude [m]"],
kind="cubic",
fill_value="extrapolate",
)
def fix_bounds(cube, cubes, coord_var_names):
"""Fix bounds for cube that could not be read correctly by :mod:`iris`.
Parameters
----------
cube : iris.cube.Cube
Input cube whose coordinate bounds will be fixed.
cubes : iris.cube.CubeList
List of cubes which contains the desired coordinate bounds as single
cubes.
coord_var_names : list of str
``var_name``s of the desired coordinates (without suffix ``_bnds`` or
``_bounds``).
Raises
------
ValueError
``cubes`` do not contain a desired coordinate bounds or multiple copies
of them.
"""
for coord_var_name in coord_var_names:
coord = cube.coord(var_name=coord_var_name)
if coord.has_bounds():
continue
bounds_cube = get_bounds_cube(cubes, coord_var_name)
cube.coord(var_name=coord_var_name).bounds = bounds_cube.core_data()
logger.debug("Fixed bounds of coordinate '%s'", coord_var_name)
def round_coordinates(cubes, decimals=5, coord_names=None):
"""Round all dimensional coordinates of all cubes in place.
Cubes can be a list of Iris cubes, or an Iris `CubeList`.
Cubes are modified *in place*. The return value is simply for
convenience.
Parameters
----------
cubes : iris.cube.CubeList or list of iris.cube.Cube
Cubes which are modified in place.
decimals : int
Number of decimals to round to.
coord_names : list of str or None
If ``None`` (or a falsey value), all dimensional coordinates will be
rounded. Otherwise, only coordinates given by the names in
``coord_names`` are rounded.
Returns
-------
iris.cube.CubeList or list of iris.cube.Cube
The modified input ``cubes``.
"""
for cube in cubes:
if not coord_names:
coords = cube.coords(dim_coords=True)
else:
coords = [cube.coord(c) for c in coord_names if cube.coords(c)]
for coord in coords:
coord.points = np.round(coord.core_points(), decimals)
if coord.has_bounds():
coord.bounds = np.round(coord.core_bounds(), decimals)
return cubes
def fix_ocean_depth_coord(cube):
"""Fix attributes of ocean vertical level coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
"""
depth_coord = cube.coord(axis="Z")
depth_coord.standard_name = "depth"
depth_coord.var_name = "lev"
depth_coord.units = "m"
depth_coord.long_name = "ocean depth coordinate"
depth_coord.attributes = {"positive": "down"}
[docs]
def get_next_month(month: int, year: int) -> tuple[int, int]:
"""Get next month and year.
Parameters
----------
month:
Current month.
year:
Current year.
Returns
-------
tuple[int, int]
Next month and next year.
"""
if month != 12:
return month + 1, year
return 1, year + 1
[docs]
def get_time_bounds(time: Coord, freq: str) -> np.ndarray:
"""Get bounds for time coordinate.
For decadal data, use 1 January 5 years before/after the current year. For
yearly data, use 1 January of the current year and 1 January of the next
year. For monthly data, use the first day of the current month and the
first day of the next month. For other frequencies (daily or `n`-hourly,
where `n` is a divisor of 24), half of the frequency is subtracted/added
from the current point in time to get the bounds.
Parameters
----------
time:
Time coordinate.
freq:
Frequency.
Returns
-------
np.ndarray
Time bounds
Raises
------
NotImplementedError
Non-supported frequency is given.
"""
bounds = []
dates = time.units.num2date(time.points)
for date in dates:
if "dec" in freq:
min_bound = datetime(date.year - 5, 1, 1, 0, 0)
max_bound = datetime(date.year + 5, 1, 1, 0, 0)
elif "yr" in freq:
min_bound = datetime(date.year, 1, 1, 0, 0)
max_bound = datetime(date.year + 1, 1, 1, 0, 0)
elif "mon" in freq or freq == "mo":
next_month, next_year = get_next_month(date.month, date.year)
min_bound = datetime(date.year, date.month, 1, 0, 0)
max_bound = datetime(next_year, next_month, 1, 0, 0)
elif "day" in freq:
min_bound = date - timedelta(hours=12.0)
max_bound = date + timedelta(hours=12.0)
elif "hr" in freq:
(n_hours_str, _, _) = freq.partition("hr")
n_hours = 1 if not n_hours_str else int(n_hours_str)
if 24 % n_hours:
msg = (
f"For `n`-hourly data, `n` must be a divisor of 24, got "
f"'{freq}'"
)
raise NotImplementedError(
msg,
)
min_bound = date - timedelta(hours=n_hours / 2.0)
max_bound = date + timedelta(hours=n_hours / 2.0)
else:
msg = f"Cannot guess time bounds for frequency '{freq}'"
raise NotImplementedError(
msg,
)
bounds.append([min_bound, max_bound])
return date2num(np.array(bounds), time.units, time.dtype)