"""Shared functions for fixes."""
import logging
import os
from datetime import datetime
from functools import lru_cache
import dask.array as da
import iris
import numpy as np
import pandas as pd
from cf_units import Unit
from iris import NameConstraint
from iris.coords import Coord
from scipy.interpolate import interp1d
from esmvalcore.iris_helpers import date2num
logger = logging.getLogger(__name__)
def add_aux_coords_from_cubes(cube, cubes, coord_dict):
"""Add auxiliary coordinate to cube from another cube in list of cubes.
Parameters
----------
cube : iris.cube.Cube
Input cube to which the auxiliary coordinates will be added.
cubes : iris.cube.CubeList
List of cubes which contains the desired coordinates as single cubes.
coord_dict : dict
Dictionary of the form ``coord_name: coord_dims``, where ``coord_name``
is the ``var_name`` (:obj:`str`) of the desired coordinates and
``coord_dims`` a :obj:`tuple` of :obj:`int` describing the coordinate
dimensions in ``cube``.
Raises
------
ValueError
``cubes`` do not contain a desired coordinate or multiple copies of
it.
"""
for (coord_name, coord_dims) in coord_dict.items():
coord_cube = cubes.extract(NameConstraint(var_name=coord_name))
if len(coord_cube) != 1:
raise ValueError(
f"Expected exactly one coordinate cube '{coord_name}' in "
f"list of cubes {cubes}, got {len(coord_cube):d}")
coord_cube = coord_cube[0]
aux_coord = cube_to_aux_coord(coord_cube)
cube.add_aux_coord(aux_coord, coord_dims)
cubes.remove(coord_cube)
def _map_on_filled(function, array):
"""Map function on filled array."""
if array.size == 0:
return array
# We support dask and numpy arrays
# Note: numpy's dispatch mechanism does not work here because of the usage
# of `np.ma.filled` and `np.ma.masked_array`.
if isinstance(array, da.core.Array):
num_module = da
else:
num_module = np
mask = num_module.ma.getmaskarray(array)
# Fill masked values with dummy value (simply use first value in array for
# this to preserve dtype; these entries get re-masked later so the actual
# value does not matter). Note: `array.fill_value` is not defined for dask
# arrays, and `ma.filled` also works for regular (non-masked) arrays.
fill_value = num_module.ravel(array)[0]
array = num_module.ma.filled(array, fill_value)
# Apply function and return masked array
array = function(array)
return num_module.ma.masked_array(array, mask=mask)
[docs]
def add_plev_from_altitude(cube):
"""Add pressure level coordinate from altitude coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
Raises
------
ValueError
``cube`` does not contain coordinate ``altitude``.
"""
if cube.coords('altitude'):
height_coord = cube.coord('altitude')
if height_coord.units != 'm':
height_coord.convert_units('m')
altitude_to_pressure = get_altitude_to_pressure_func()
pressure_points = _map_on_filled(
altitude_to_pressure, height_coord.core_points()
)
if height_coord.core_bounds() is None:
pressure_bounds = None
else:
pressure_bounds = _map_on_filled(
altitude_to_pressure, height_coord.core_bounds()
)
pressure_coord = iris.coords.AuxCoord(pressure_points,
bounds=pressure_bounds,
var_name='plev',
standard_name='air_pressure',
long_name='pressure',
units='Pa')
cube.add_aux_coord(pressure_coord, cube.coord_dims(height_coord))
return
raise ValueError(
"Cannot add 'air_pressure' coordinate, 'altitude' coordinate not "
"available")
[docs]
def add_altitude_from_plev(cube):
"""Add altitude coordinate from pressure level coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
Raises
------
ValueError
``cube`` does not contain coordinate ``air_pressure``.
"""
if cube.coords('air_pressure'):
plev_coord = cube.coord('air_pressure')
if plev_coord.units != 'Pa':
plev_coord.convert_units('Pa')
pressure_to_altitude = get_pressure_to_altitude_func()
altitude_points = _map_on_filled(
pressure_to_altitude, plev_coord.core_points()
)
if plev_coord.core_bounds() is None:
altitude_bounds = None
else:
altitude_bounds = _map_on_filled(
pressure_to_altitude, plev_coord.core_bounds()
)
altitude_coord = iris.coords.AuxCoord(altitude_points,
bounds=altitude_bounds,
var_name='alt',
standard_name='altitude',
long_name='altitude',
units='m')
cube.add_aux_coord(altitude_coord, cube.coord_dims(plev_coord))
return
raise ValueError(
"Cannot add 'altitude' coordinate, 'air_pressure' coordinate not "
"available")
def add_scalar_depth_coord(cube, depth=0.0):
"""Add scalar coordinate 'depth' with value of `depth`m."""
logger.debug("Adding depth coordinate (%sm)", depth)
depth_coord = iris.coords.AuxCoord(depth,
var_name='depth',
standard_name='depth',
long_name='depth',
units=Unit('m'),
attributes={'positive': 'down'})
try:
cube.coord('depth')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(depth_coord, ())
return cube
def add_scalar_height_coord(cube, height=2.0):
"""Add scalar coordinate 'height' with value of `height`m."""
logger.debug("Adding height coordinate (%sm)", height)
height_coord = iris.coords.AuxCoord(height,
var_name='height',
standard_name='height',
long_name='height',
units=Unit('m'),
attributes={'positive': 'up'})
try:
cube.coord('height')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(height_coord, ())
return cube
def add_scalar_lambda550nm_coord(cube):
"""Add scalar coordinate 'lambda550nm'."""
logger.debug("Adding lambda550nm coordinate")
lambda550nm_coord = iris.coords.AuxCoord(
550.0,
var_name='wavelength',
standard_name='radiation_wavelength',
long_name='Radiation Wavelength 550 nanometers',
units='nm',
)
try:
cube.coord('radiation_wavelength')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(lambda550nm_coord, ())
return cube
def add_scalar_typeland_coord(cube, value='default'):
"""Add scalar coordinate 'typeland' with value of `value`."""
logger.debug("Adding typeland coordinate (%s)", value)
typeland_coord = iris.coords.AuxCoord(value,
var_name='type',
standard_name='area_type',
long_name='Land area type',
units=Unit('no unit'))
try:
cube.coord('area_type')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typeland_coord, ())
return cube
def add_scalar_typesea_coord(cube, value='default'):
"""Add scalar coordinate 'typesea' with value of `value`."""
logger.debug("Adding typesea coordinate (%s)", value)
typesea_coord = iris.coords.AuxCoord(value,
var_name='type',
standard_name='area_type',
long_name='Ocean area type',
units=Unit('no unit'))
try:
cube.coord('area_type')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typesea_coord, ())
return cube
def add_scalar_typesi_coord(cube, value='sea_ice'):
"""Add scalar coordinate 'typesi' with value of `value`."""
logger.debug("Adding typesi coordinate (%s)", value)
typesi_coord = iris.coords.AuxCoord(value,
var_name='type',
standard_name='area_type',
long_name='Sea Ice area type',
units=Unit('no unit'))
try:
cube.coord('area_type')
except iris.exceptions.CoordinateNotFoundError:
cube.add_aux_coord(typesi_coord, ())
return cube
def cube_to_aux_coord(cube):
"""Convert cube to iris AuxCoord."""
return iris.coords.AuxCoord(
points=cube.core_data(),
var_name=cube.var_name,
standard_name=cube.standard_name,
long_name=cube.long_name,
units=cube.units,
)
@lru_cache(maxsize=None)
def get_altitude_to_pressure_func():
"""Get function converting altitude [m] to air pressure [Pa].
Returns
-------
callable
Function that converts altitude to air pressure.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
source_file = os.path.join(base_dir, 'us_standard_atmosphere.csv')
data_frame = pd.read_csv(source_file, comment='#')
func = interp1d(data_frame['Altitude [m]'],
data_frame['Pressure [Pa]'],
kind='cubic',
fill_value='extrapolate')
return func
def get_bounds_cube(cubes, coord_var_name):
"""Find bound cube for a given variable in a :class:`iris.cube.CubeList`.
Parameters
----------
cubes : iris.cube.CubeList
List of cubes containing the coordinate bounds for the desired
coordinate as single cube.
coord_var_name : str
``var_name`` of the desired coordinate (without suffix ``_bnds`` or
``_bounds``).
Returns
-------
iris.cube.Cube
Bounds cube.
Raises
------
ValueError
``cubes`` do not contain the desired coordinate bounds or multiple
copies of them.
"""
for bounds in ('bnds', 'bounds'):
bound_var = f'{coord_var_name}_{bounds}'
cube = cubes.extract(NameConstraint(var_name=bound_var))
if len(cube) == 1:
return cube[0]
if len(cube) > 1:
raise ValueError(
f"Multiple cubes with var_name '{bound_var}' found")
raise ValueError(
f"No bounds for coordinate variable '{coord_var_name}' available in "
f"cubes\n{cubes}")
@lru_cache(maxsize=None)
def get_pressure_to_altitude_func():
"""Get function converting air pressure [Pa] to altitude [m].
Returns
-------
callable
Function that converts air pressure to altitude.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
source_file = os.path.join(base_dir, 'us_standard_atmosphere.csv')
data_frame = pd.read_csv(source_file, comment='#')
func = interp1d(data_frame['Pressure [Pa]'],
data_frame['Altitude [m]'],
kind='cubic',
fill_value='extrapolate')
return func
def fix_bounds(cube, cubes, coord_var_names):
"""Fix bounds for cube that could not be read correctly by :mod:`iris`.
Parameters
----------
cube : iris.cube.Cube
Input cube whose coordinate bounds will be fixed.
cubes : iris.cube.CubeList
List of cubes which contains the desired coordinate bounds as single
cubes.
coord_var_names : list of str
``var_name``s of the desired coordinates (without suffix ``_bnds`` or
``_bounds``).
Raises
------
ValueError
``cubes`` do not contain a desired coordinate bounds or multiple copies
of them.
"""
for coord_var_name in coord_var_names:
coord = cube.coord(var_name=coord_var_name)
if coord.bounds is not None:
continue
bounds_cube = get_bounds_cube(cubes, coord_var_name)
cube.coord(var_name=coord_var_name).bounds = bounds_cube.core_data()
logger.debug("Fixed bounds of coordinate '%s'", coord_var_name)
def round_coordinates(cubes, decimals=5, coord_names=None):
"""Round all dimensional coordinates of all cubes in place.
Cubes can be a list of Iris cubes, or an Iris `CubeList`.
Cubes are modified *in place*. The return value is simply for
convenience.
Parameters
----------
cubes : iris.cube.CubeList or list of iris.cube.Cube
Cubes which are modified in place.
decimals : int
Number of decimals to round to.
coord_names : list of str or None
If ``None`` (or a falsey value), all dimensional coordinates will be
rounded. Otherwise, only coordinates given by the names in
``coord_names`` are rounded.
Returns
-------
iris.cube.CubeList or list of iris.cube.Cube
The modified input ``cubes``.
"""
for cube in cubes:
if not coord_names:
coords = cube.coords(dim_coords=True)
else:
coords = [cube.coord(c) for c in coord_names if cube.coords(c)]
for coord in coords:
coord.points = da.round(da.asarray(coord.core_points()), decimals)
if coord.bounds is not None:
coord.bounds = da.round(da.asarray(coord.core_bounds()),
decimals)
return cubes
def fix_ocean_depth_coord(cube):
"""Fix attributes of ocean vertical level coordinate.
Parameters
----------
cube : iris.cube.Cube
Input cube.
"""
depth_coord = cube.coord(axis='Z')
depth_coord.standard_name = 'depth'
depth_coord.var_name = 'lev'
depth_coord.units = 'm'
depth_coord.long_name = 'ocean depth coordinate'
depth_coord.attributes = {'positive': 'down'}
[docs]
def get_next_month(month: int, year: int) -> tuple[int, int]:
"""Get next month and year.
Parameters
----------
month:
Current month.
year:
Current year.
Returns
-------
tuple[int, int]
Next month and next year.
"""
if month != 12:
return month + 1, year
return 1, year + 1
[docs]
def get_time_bounds(time: Coord, freq: str) -> np.ndarray:
"""Get bounds for time coordinate.
For monthly data, use the first day of the current month and the first day
of the next month. For yearly or decadal data, use 1 January of the current
year and 1 January of the next year or 10 years from the current year. For
other frequencies (daily, 6-hourly, 3-hourly, hourly), half of the
frequency is subtracted/added from the current point in time to get the
bounds.
Parameters
----------
time:
Time coordinate.
freq:
Frequency.
Returns
-------
np.ndarray
Time bounds
Raises
------
NotImplementedError
Non-supported frequency is given.
"""
bounds = []
dates = time.units.num2date(time.points)
for step, date in enumerate(dates):
month = date.month
year = date.year
if freq in ['mon', 'mo']:
next_month, next_year = get_next_month(month, year)
min_bound = date2num(datetime(year, month, 1, 0, 0),
time.units, time.dtype)
max_bound = date2num(datetime(next_year, next_month, 1, 0, 0),
time.units, time.dtype)
elif freq == 'yr':
min_bound = date2num(datetime(year, 1, 1, 0, 0),
time.units, time.dtype)
max_bound = date2num(datetime(year + 1, 1, 1, 0, 0),
time.units, time.dtype)
elif freq == 'dec':
min_bound = date2num(datetime(year, 1, 1, 0, 0),
time.units, time.dtype)
max_bound = date2num(datetime(year + 10, 1, 1, 0, 0),
time.units, time.dtype)
else:
delta = {
'day': 12.0 / 24,
'6hr': 3.0 / 24,
'3hr': 1.5 / 24,
'1hr': 0.5 / 24,
}
if freq not in delta:
raise NotImplementedError(
f"Cannot guess time bounds for frequency '{freq}'"
)
point = time.points[step]
min_bound = point - delta[freq]
max_bound = point + delta[freq]
bounds.append([min_bound, max_bound])
return np.array(bounds)