"""Convenience functions for :mod:`iris` objects."""
import logging
from pprint import pformat
import iris
import numpy as np
from cf_units import Unit
from iris.exceptions import CoordinateNotFoundError
from ._base import group_metadata
logger = logging.getLogger(__name__)
def _transform_coord_to_ref(cubes, ref_coord):
"""Transform coordinates of cubes to reference."""
try:
# Convert AuxCoord to DimCoord if necessary and possible
ref_coord = iris.coords.DimCoord.from_coord(ref_coord)
except ValueError:
pass
if not np.array_equal(
np.unique(ref_coord.points), np.sort(ref_coord.points)
):
raise ValueError(
f"Expected unique coordinate '{ref_coord.name()}', got {ref_coord}"
)
coord_name = ref_coord.name()
new_cubes = iris.cube.CubeList()
for cube in cubes:
coord = cube.coord(coord_name)
if not np.all(np.isin(coord.points, ref_coord.points)):
raise ValueError(
f"Coordinate {coord} of cube\n{cube}\nis not subset of "
f"reference coordinate {ref_coord}"
)
new_data = np.full(ref_coord.shape, np.nan)
indices = np.where(np.in1d(ref_coord.points, coord.points))
new_data[indices] = np.ma.filled(cube.data, np.nan)
new_cube = iris.cube.Cube(np.ma.masked_invalid(new_data))
if isinstance(ref_coord, iris.coords.DimCoord):
new_cube.add_dim_coord(ref_coord, 0)
else:
new_cube.add_aux_coord(ref_coord, 0)
for aux_coord in cube.coords(dim_coords=False):
if aux_coord.shape in ((), (1,)):
new_cube.add_aux_coord(aux_coord, [])
new_cube.metadata = cube.metadata
new_cubes.append(new_cube)
check_coordinate(new_cubes, coord_name)
logger.debug(
"Successfully unified coordinate '%s' to %s", coord_name, ref_coord
)
logger.debug("of cubes")
logger.debug(pformat(cubes))
return new_cubes
[docs]
def check_coordinate(cubes, coord_name):
"""Compare coordinate of cubes and raise error if not identical.
Parameters
----------
cubes : iris.cube.CubeList
Cubes to be compared.
coord_name : str
Name of the coordinate.
Returns
-------
numpy.array
Points of the coordinate.
Raises
------
iris.exceptions.CoordinateNotFoundError
Coordinate ``coord_name`` is not a coordinate of one of the cubes.
ValueError
Given coordinate differs for the input cubes.
"""
coord = None
for cube in cubes:
try:
new_coord = cube.coord(coord_name)
except CoordinateNotFoundError as exc:
raise CoordinateNotFoundError(
f"'{coord_name}' is not a coordinate of cube\n{cube}"
) from exc
if coord is None:
coord = new_coord
else:
if new_coord != coord:
raise ValueError(
f"Expected cubes with identical coordinates "
f"'{coord_name}', got {new_coord} and {coord}"
)
logger.debug("Successfully checked coordinate '%s' of cubes", coord_name)
logger.debug(pformat(cubes))
return coord.points
[docs]
def convert_to_iris(dict_):
"""Change all appearances of ``short_name`` to ``var_name``.
Parameters
----------
dict_ : dict
Dictionary to convert.
Returns
-------
dict
Converted dictionary.
Raises
------
KeyError
:obj:`dict` contains keys``'short_name'`` **and** ``'var_name'``.
"""
dict_ = dict(dict_)
if "short_name" in dict_:
if "var_name" in dict_:
raise KeyError(
f"Cannot replace 'short_name' by 'var_name', dictionary "
f"already contains 'var_name' (short_name = "
f"'{dict_['short_name']}', var_name = '{dict_['var_name']}')"
)
dict_["var_name"] = dict_.pop("short_name")
return dict_
[docs]
def get_mean_cube(datasets):
"""Get mean cube of a list of datasets.
Parameters
----------
datasets : list of dict
List of datasets (given as metadata :obj:`dict`).
Returns
-------
iris.cube.Cube
Mean cube.
"""
cubes = iris.cube.CubeList()
for dataset in datasets:
path = dataset["filename"]
cube = iris.load_cube(path)
prepare_cube_for_merging(cube, path)
cubes.append(cube)
mean_cube = cubes.merge_cube()
if len(cubes) > 1:
mean_cube = mean_cube.collapsed(["cube_label"], iris.analysis.MEAN)
mean_cube.remove_coord("cube_label")
return mean_cube
[docs]
def iris_project_constraint(projects, input_data, negate=False):
"""Create :class:`iris.Constraint` to select specific projects from data.
Parameters
----------
projects : list of str
Projects to be selected.
input_data : list of dict
List of dataset metadata used to extract all relevant datasets
belonging to given ``projects``.
negate : bool, optional (default: False)
Negate constraint (``False``: select all elements that fit
``projects``, `True``: select all elements that do **not** fit
``projects``).
Returns
-------
iris.Constraint
constraint for coordinate ``dataset``.
"""
datasets = []
grouped_data = group_metadata(input_data, "project")
for project in projects:
datasets.extend([d["dataset"] for d in grouped_data.get(project, [])])
def project_constraint(cell):
"""Constraint function."""
if negate:
return cell not in datasets
return cell in datasets
return iris.Constraint(dataset=project_constraint)
[docs]
def intersect_dataset_coordinates(cubes):
"""Compare dataset coordinates of cubes and match them if necessary.
Use intersection of coordinate 'dataset' of all given cubes and remove
elements which are not given in all cubes.
Parameters
----------
cubes : iris.cube.CubeList
Cubes to be compared.
Returns
-------
iris.cube.CubeList
Transformed cubes.
Raises
------
iris.exceptions.CoordinateNotFoundError
Coordinate ``dataset`` is not a coordinate of one of the cubes.
ValueError
At least one of the cubes contains a ``dataset`` coordinate with
duplicate elements or the cubes do not share common elements.
"""
common_elements = None
# Get common elements
for cube in cubes:
try:
coord_points = cube.coord("dataset").points
except CoordinateNotFoundError as exc:
raise CoordinateNotFoundError(
f"'dataset' is not a coordinate of cube\n{cube}"
) from exc
if len(set(coord_points)) != len(coord_points):
raise ValueError(
f"Coordinate 'dataset' of cube\n{cube}\n contains duplicate "
f"elements"
)
if common_elements is None:
common_elements = set(coord_points)
else:
common_elements = common_elements.intersection(set(coord_points))
common_elements = list(common_elements)
# Save new cubes
new_cubes = iris.cube.CubeList()
for cube in cubes:
cube = cube.extract(iris.Constraint(dataset=common_elements))
if cube is None:
raise ValueError(f"Cubes {cubes} do not share common elements")
sorted_idx = np.argsort(cube.coord("dataset").points)
new_cubes.append(cube[sorted_idx])
check_coordinate(new_cubes, "dataset")
logger.debug(
"Successfully matched 'dataset' coordinate to %s",
sorted(common_elements),
)
logger.debug("of cubes")
logger.debug(pformat(cubes))
return new_cubes
[docs]
def prepare_cube_for_merging(cube, cube_label):
"""Prepare single :class:`iris.cube.Cube` in order to merge it later.
Parameters
----------
cube : iris.cube.Cube
Cube to be pre-processed.
cube_label : str
Label for the new scalar coordinate ``cube_label``.
"""
cube.attributes = {}
cube.cell_methods = ()
for coord in cube.coords(dim_coords=True):
coord.attributes = {}
for coord in cube.coords(dim_coords=False):
cube.remove_coord(coord)
cube_label_coord = iris.coords.AuxCoord(
cube_label, var_name="cube_label", long_name="cube_label"
)
cube.add_aux_coord(cube_label_coord, [])
[docs]
def unify_1d_cubes(cubes, coord_name):
"""Unify 1D cubes by transforming them to identical coordinates.
Use union of all coordinates as reference and transform other cubes to it
by adding missing values.
Parameters
----------
cubes : iris.cube.CubeList
Cubes to be processed.
coord_name : str
Name of the coordinate.
Returns
-------
iris.cube.CubeList
Transformed cubes.
Raises
------
ValueError
Cubes are not 1D, coordinate name differs or not all cube coordinates
are subsets of longest coordinate.
"""
ref_coord = None
# Get reference coordinate
for cube in cubes:
if cube.ndim != 1:
raise ValueError(f"Dimension of cube\n{cube}\nis not 1")
try:
new_coord = cube.coord(coord_name)
except CoordinateNotFoundError as exc:
raise CoordinateNotFoundError(
f"'{coord_name}' is not a coordinate of cube\n{cube}"
) from exc
if not np.array_equal(
np.unique(new_coord.points), np.sort(new_coord.points)
):
raise ValueError(
f"Coordinate '{coord_name}' of cube\n{cube}\n is not unique, "
f"unifying not possible"
)
if ref_coord is None:
ref_coord = new_coord
else:
new_points = np.union1d(ref_coord.points, new_coord.points)
ref_coord = ref_coord.copy(new_points)
if coord_name == "time":
iris.util.unify_time_units(cubes)
# Transform all cubes
return _transform_coord_to_ref(cubes, ref_coord)
[docs]
def unify_time_coord(cube, target_units="days since 1850-01-01 00:00:00"):
"""Unify time coordinate of cube in-place.
Parameters
----------
cube: iris.cube.Cube
Cube whose time coordinate is transformed in-place.
target_units: str or cf_units.Unit, optional
Target time units.
Raises
------
iris.exceptions.CoordinateNotFoundError
Cube does not contain coordinate ``time``.
"""
if not cube.coords("time"):
raise CoordinateNotFoundError(
f"Coordinate 'time' not found in cube {cube.summary(shorten=True)}"
)
# Convert points and (if possible) bounds to new units
target_units = Unit(target_units) # works if target_units already is Unit
time_coord = cube.coord("time")
new_points = target_units.date2num(
time_coord.units.num2date(time_coord.points)
)
if time_coord.bounds is None:
new_bounds = None
else:
new_bounds = target_units.date2num(
time_coord.units.num2date(time_coord.bounds)
)
# Create new coordinate and add it to the cube
new_time_coord = iris.coords.DimCoord(
new_points,
bounds=new_bounds,
var_name="time",
standard_name="time",
long_name="time",
units=target_units,
)
coord_dims = cube.coord_dims("time")
cube.remove_coord("time")
cube.add_dim_coord(new_time_coord, coord_dims)