"""Convenience functions for MLR diagnostics."""
import logging
import os
import re
import warnings
from copy import deepcopy
from functools import lru_cache
from pprint import pformat
import esmvalcore.preprocessor
import iris
import numpy as np
import shapely.vectorized as shp_vect
from cartopy.io import shapereader
from cf_units import Unit
from iris.fileformats.netcdf import UnknownCellMethodWarning
from esmvaltool.diag_scripts.shared import (
get_diagnostic_filename,
io,
select_metadata,
sorted_metadata,
)
logger = logging.getLogger(os.path.basename(__file__))
NECESSARY_KEYS = io.NECESSARY_KEYS + [
'tag',
'var_type',
]
VAR_TYPES = [
'feature',
'label',
'label_to_rescale',
'prediction_input',
'prediction_input_error',
'prediction_output',
'prediction_output_error',
'prediction_output_misc',
'prediction_reference',
'prediction_residual',
]
WARNINGS_TO_IGNORE = [
{
'message': ".* contains unknown cell method 'trend'",
'category': UnknownCellMethodWarning,
'module': 'iris',
},
{
'message': "Using DEFAULT_SPHERICAL_EARTH_RADIUS",
'category': UserWarning,
'module': 'iris',
},
]
def _check_coords(cube, coords, weights_type):
"""Check coordinates prior to weights calculations."""
cube_str = cube.summary(shorten=True)
for coord_name in coords:
try:
coord = cube.coord(coord_name)
except iris.exceptions.CoordinateNotFoundError:
logger.error(
"Calculation of %s for cube %s failed, coordinate "
"'%s' not found", weights_type, cube_str, coord_name)
raise
if not coord.has_bounds():
logger.debug(
"Guessing bounds of coordinate '%s' of cube %s for "
"calculation of %s", coord_name, cube_str, weights_type)
coord.guess_bounds()
def _get_datasets(input_data, **kwargs):
"""Get datasets according to ``**kwargs``."""
datasets = []
for dataset in input_data:
dataset_copy = deepcopy(dataset)
for key in kwargs:
if key not in dataset_copy:
dataset_copy[key] = None
if select_metadata([dataset_copy], **kwargs):
datasets.append(dataset)
return datasets
@lru_cache
def _get_ne_land_mask_cube(n_lats=1000, n_lons=2000):
"""Get Natural Earth land mask."""
ne_dir = os.path.join(
os.path.dirname(os.path.realpath(esmvalcore.preprocessor.__file__)),
'ne_masks',
)
ne_file = os.path.join(ne_dir, 'ne_10m_land.shp')
reader = shapereader.Reader(ne_file)
geometries = list(reader.geometries())
# Setup grid
lat_coord = iris.coords.DimCoord(
np.linspace(-90.0, 90.0, n_lats), var_name='lat',
standard_name='latitude', long_name='latitude', units='degrees')
lon_coord = iris.coords.DimCoord(
np.linspace(-180.0, 180.0, n_lons), var_name='lon',
standard_name='longitude', long_name='longitude', units='degrees')
(lats, lons) = np.meshgrid(lat_coord.points, lon_coord.points)
# Setup mask (1: land, 0: sea)
mask = np.full(lats.shape, False, dtype=bool)
for geometry in geometries:
mask |= shp_vect.contains(geometry, lons, lats)
land_mask = np.swapaxes(np.where(mask, 1, 0), 0, 1)
# Setup cube
cube = iris.cube.Cube(land_mask,
var_name='land_mask',
long_name='Land mask (1: land, 0: sea)',
units='no_unit',
dim_coords_and_dims=[(lat_coord, 0), (lon_coord, 1)])
return cube
[docs]
def check_predict_kwargs(predict_kwargs):
"""Check keyword argument for ``predict()`` functions.
Parameters
----------
predict_kwargs : keyword arguments, optional
Keyword arguments for a ``predict()`` function.
Raises
------
RuntimeError
``return_var`` and ``return_cov`` are both set to ``True`` in the
keyword arguments.
"""
return_var = predict_kwargs.get('return_var', False)
return_cov = predict_kwargs.get('return_cov', False)
if return_var and return_cov:
raise RuntimeError(
"Cannot return variance (return_cov=True) and full covariance "
"matrix (return_cov=True) simultaneously")
[docs]
def create_alias(dataset, attributes, delimiter='-'):
"""Create alias key of a dataset using a list of attributes.
Parameters
----------
dataset : dict
Metadata dictionary representing a single dataset.
attributes : list of str
List of attributes used to create the alias.
delimiter : str, optional (default: '-')
Delimiter used to separate different attributes in the alias.
Returns
-------
str
Dataset alias.
Raises
------
AttributeError
``dataset`` does not contain one of the ``attributes``.
"""
alias = []
if not attributes:
raise ValueError(
"Expected at least one element for attributes, got empty list")
for attribute in attributes:
if attribute not in dataset:
raise AttributeError(
f"Dataset {dataset} does not contain attribute '{attribute}' "
f"for alias creation")
alias.append(dataset[attribute])
return delimiter.join(alias)
[docs]
def datasets_have_mlr_attributes(datasets, log_level='debug', mode='full'):
"""Check (MLR) attributes of ``datasets``.
Parameters
----------
datasets : list of dict
Datasets to check.
log_level : str, optional (default: 'debug')
Verbosity level of the logger.
mode : str, optional (default: 'full')
Checking mode. Must be one of ``'only_missing'`` (only check if
attributes are missing), ``'only_var_type'`` (check only `var_type`) or
``'full'`` (check both).
Returns
-------
bool
``True`` if all required attributes are available, ``False`` if not.
Raises
------
ValueError
Invalid value for argument ``mode`` is given.
"""
output = True
accepted_modes = ('full', 'only_missing', 'only_var_type')
if mode not in accepted_modes:
raise ValueError(
f"'mode' must be one of {accepted_modes}, got '{mode}'")
for dataset in datasets:
if mode != 'only_var_type':
for key in NECESSARY_KEYS:
if key not in dataset:
getattr(logger, log_level)(
"Dataset '%s' does not have necessary (MLR) attribute "
"'%s'", dataset, key)
output = False
if mode != 'only_missing' and dataset.get('var_type') not in VAR_TYPES:
getattr(logger, log_level)(
"Dataset '%s' has invalid var_type '%s', must be one of %s",
dataset, dataset.get('var_type'), VAR_TYPES)
output = False
return output
[docs]
def get_1d_cube(x_data, y_data, x_kwargs=None, y_kwargs=None):
"""Convert 2 arrays to :class:`iris.cube.Cube` (with single coordinate).
Parameters
----------
x_data : numpy.ndarray
Data for coordinate.
y_data : numpy.ndarray
Data for cube.
x_kwargs : dict
Keyword arguments passed to :class:`iris.coords.AuxCoord`.
y_kwargs : dict
Keyword arguments passed to :class:`iris.cube.Cube`.
Returns
-------
iris.cube.Cube
1D cube with single auxiliary coordinate.
Raises
------
ValueError
Arrays are not 1D and do not have matching shapes.
"""
if x_kwargs is None:
x_kwargs = {}
if y_kwargs is None:
y_kwargs = {}
x_data = np.ma.array(x_data)
y_data = np.ma.array(y_data)
if x_data.ndim != 1:
raise ValueError(
f"Expected 1D array for 'x_data', got {x_data.ndim:d}D array")
if y_data.ndim != 1:
raise ValueError(
f"Expected 1D array for 'y_data', got {y_data.ndim:d}D array")
if x_data.shape != y_data.shape:
raise ValueError(
f"Expected identical shapes for 'x_data' and 'y_data', got "
f"{x_data.shape} and {y_data.shape}, respectively")
aux_coord = iris.coords.AuxCoord(x_data, **x_kwargs)
cube = iris.cube.Cube(y_data, aux_coords_and_dims=[(aux_coord, 0)],
**y_kwargs)
return cube
[docs]
def get_absolute_time_units(units):
"""Convert time reference units to absolute ones.
This function converts reference time units (like ``'days since YYYY'``) to
absolute ones (like ``'days'``).
Parameters
----------
units : cf_units.Unit
Time units to convert.
Returns
-------
cf_units.Unit
Absolute time units.
Raises
------
ValueError
If conversion failed (e.g. input units are not time units).
"""
if units.is_time_reference():
units = Unit(units.symbol.split()[0])
if not units.is_time():
raise ValueError(
f"Cannot convert units '{units}' to reasonable time units")
return units
[docs]
def get_alias(dataset):
"""Get alias for dataset.
Parameters
----------
dataset : dict
Dataset metadata.
Returns
-------
str
Alias.
"""
alias = f"{dataset['project']} dataset {dataset['dataset']}"
additional_info = []
for key in ('mip', 'exp', 'ensemble'):
if key in dataset:
additional_info.append(dataset[key])
if additional_info:
alias += f" ({', '.join(additional_info)})"
if 'start_year' in dataset and 'end_year' in dataset:
alias += f" from {dataset['start_year']:d} to {dataset['end_year']:d}"
return alias
[docs]
def get_all_weights(cube, area_weighted=True, time_weighted=True,
landsea_fraction_weighted=None, normalize=False):
"""Get all desired weights for a cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_weighted : bool, optional (default: True)
Use area weights calculated from grid cell areas using
:func:`iris.analysis.cartography.area_weights`. Only works for regular
grids.
time_weighted : bool, optional (default: True)
Use time weights calculated from time bounds.
landsea_fraction_weighted : str, optional
If given, use land/sea fraction weights calculated from Natural Earth
files. Must be one of ``'land'``, ``'sea'``. Only works for regular
grids.
normalize : bool, optional (default: False)
Normalize weights with total area and total time range.
Returns
-------
numpy.ndarray
Area weights.
Raises
------
iris.exceptions.CoordinateMultiDimError
Dimension of ``latitude`` or ``longitude`` coordinate is greater than
1.
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``
(if used with ``area_weighted`` or ``landsea_fraction_weighted``) or
cube does not contain the coordinate ``time`` (if used with
``time_weighted``).
ValueError
``landsea_fraction_weighted`` is not one of ``None``, ``'land'``,
``'sea'`` or coordinates ``latitude`` and ``longitude`` share
dimensions.
"""
logger.debug("Calculating all weights of cube %s",
cube.summary(shorten=True))
weights = np.ones(cube.shape)
# Horizontal weights
horizontal_weights = get_horizontal_weights(
cube, area_weighted=area_weighted,
landsea_fraction_weighted=landsea_fraction_weighted,
normalize=normalize)
weights *= horizontal_weights
# Time weights
if time_weighted:
time_weights = get_time_weights(cube, normalize=normalize)
weights *= time_weights
return weights
[docs]
def get_area_weights(cube, normalize=False):
"""Get area weights calculated from grid cell areas.
Note
----
Only works for regular grids. Uses
:func:`iris.analysis.cartography.area_weights` for an approximate
calculation of the grid cell areas.
Parameters
----------
cube : iris.cube.Cube
Input cube.
normalize : bool, optional (default: False)
Normalize weights with total area.
Returns
-------
numpy.ndarray
Area weights.
Raises
------
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
"""
logger.debug("Calculating area weights")
_check_coords(cube, ['latitude', 'longitude'], 'area weights')
area_weights = iris.analysis.cartography.area_weights(cube,
normalize=normalize)
return area_weights
[docs]
def get_horizontal_weights(cube, area_weighted=True,
landsea_fraction_weighted=None, normalize=False):
"""Get horizontal (latitude/longitude) weights of cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_weighted : bool, optional (default: True)
Use area weights calculated from grid cell areas using
:func:`iris.analysis.cartography.area_weights`. Only works for regular
grids.
landsea_fraction_weighted : str, optional
If given, use land/sea fraction weights calculated from Natural Earth
files. Must be one of ``'land'``, ``'sea'``. Only works for regular
grids.
normalize : bool, optional (default: False)
Normalize weights with sum of weights over latitude and longitude (i.e.
if only ``area_weighted`` is given, this is equal to the total area).
Returns
-------
numpy.ndarray
Horizontal (latitude/longitude) weights.
Raises
------
iris.exceptions.CoordinateMultiDimError
Dimension of ``latitude`` or ``longitude`` coordinate is greater than
1.
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
ValueError
``landsea_fraction_weighted`` is not one of ``'land'``, ``'sea'``.
"""
logger.debug("Calculating horizontal weights")
weights = np.ones(cube.shape)
if not (area_weighted or landsea_fraction_weighted):
return weights
# Get weights
if area_weighted:
weights *= get_area_weights(cube, normalize=False)
if landsea_fraction_weighted is not None:
weights *= get_landsea_fraction_weights(
cube, landsea_fraction_weighted, normalize=False)
# No normalization
if not normalize:
return weights
# Get horizontal dimensions
horizontal_dims = []
if cube.coord_dims('latitude'):
horizontal_dims.append(cube.coord_dims('latitude')[0])
if cube.coord_dims('longitude'):
horizontal_dims.append(cube.coord_dims('longitude')[0])
# Normalization
horizontal_dims = tuple(horizontal_dims)
if not horizontal_dims:
norm = np.ravel(weights)[0]
else:
norm = np.ravel(np.sum(weights, axis=horizontal_dims))[0]
return weights / norm
[docs]
def get_landsea_fraction_weights(cube, area_type, normalize=False):
"""Get land/sea fraction weights calculated from Natural Earth files.
Note
----
The implementation of this feature is not optimal. For large cubes,
calculating the land/sea fraction weights might be very slow. Only works
for regular grids.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_type : str
Area type. Must be one of ``'land'`` (land fraction weighting) or
``'sea'`` (sea fraction weighting).
normalize : bool, optional (default: False)
Normalize weights with total land/sea fraction.
Raises
------
iris.exceptions.CoordinateMultiDimError
Dimension of ``latitude`` or ``longitude`` coordinate is greater than
1.
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
ValueError
``area_type`` is not one of ``'land'``, ``'sea'`` or coordinates
``latitude`` and ``longitude`` share dimensions.
"""
allowed_types = ('land', 'sea')
if area_type not in allowed_types:
raise ValueError(
f"Expected one of {allowed_types} for 'area_type' of land/sea "
f"fraction weighting, got '{area_type}'")
logger.debug("Calculating %s fraction weights", area_type)
_check_coords(cube, ['latitude', 'longitude'],
f'{area_type} fraction weights')
lat_coord = cube.coord('latitude')
lon_coord = cube.coord('longitude')
for coord in (lat_coord, lon_coord):
if coord.ndim > 1:
raise iris.exceptions.CoordinateMultiDimError(
f"Calculating {area_type} fraction weights for "
f"multidimensional coordinate '{coord.name}' is not supported")
if cube.coord_dims(lat_coord) != ():
if cube.coord_dims(lat_coord) == cube.coord_dims(lon_coord):
raise ValueError(
f"1D latitude and longitude coordinates share dimensions "
f"(this usually happens with unstructured grids) - "
f"calculating {area_type} fraction weights for latitude and "
"longitude that share dimensions is not possible")
# Calculate land fractions on coordinate grid of cube
ne_land_mask_cube = _get_ne_land_mask_cube()
land_fraction = np.empty((lat_coord.shape[0], lon_coord.shape[0]),
dtype=np.float64)
for lat_idx in range(lat_coord.shape[0]):
for lon_idx in range(lon_coord.shape[0]):
lat_bounds = lat_coord.bounds[lat_idx]
lon_bounds = lon_coord.bounds[lon_idx]
submask = ne_land_mask_cube.intersection(latitude=lat_bounds,
longitude=lon_bounds)
land_fraction[lat_idx, lon_idx] = (submask.data.sum() /
submask.data.size)
if area_type == 'sea':
fraction_weights = 1.0 - land_fraction
else:
fraction_weights = land_fraction
if normalize:
fraction_weights /= np.ma.sum(fraction_weights)
# Broadcast to original shape
coord_dims = []
if cube.coord_dims(lon_coord):
coord_dims.append(cube.coord_dims(lon_coord)[0])
else:
fraction_weights = np.squeeze(fraction_weights, axis=1)
if cube.coord_dims(lat_coord):
coord_dims.insert(0, cube.coord_dims(lat_coord)[0])
else:
fraction_weights = np.squeeze(fraction_weights, axis=0)
fraction_weights = iris.util.broadcast_to_shape(fraction_weights,
cube.shape,
tuple(coord_dims))
return fraction_weights
[docs]
def get_new_path(cfg, old_path):
"""Convert old path to new diagnostic path.
Parameters
----------
cfg : dict
Recipe configuration.
old_path : str
Old path.
Returns
-------
str
New diagnostic path.
"""
basename = os.path.splitext(os.path.basename(old_path))[0]
new_path = get_diagnostic_filename(basename, cfg)
return new_path
[docs]
def get_squared_error_cube(ref_cube, error_datasets):
"""Get array of squared errors.
Parameters
----------
ref_cube : iris.cube.Cube
Reference cube (determines mask, coordinates and attributes of output).
error_datasets : list of dict
List of metadata dictionaries where each dictionary represents a single
dataset.
Returns
-------
iris.cube.Cube
Cube containing squared errors.
Raises
------
ValueError
Shape of a dataset does not match shape of reference cube.
"""
squared_error_cube = ref_cube.copy()
# Fill cube with zeros
squared_error_cube.data = np.ma.array(
np.full(squared_error_cube.shape, 0.0),
mask=np.ma.getmaskarray(squared_error_cube.data),
)
# Adapt cube metadata
if 'error' in squared_error_cube.attributes.get('var_type', ''):
if not squared_error_cube.attributes.get('squared'):
squared_error_cube.var_name += '_squared'
squared_error_cube.long_name += ' (squared)'
squared_error_cube.units = units_power(squared_error_cube.units, 2)
else:
if squared_error_cube.attributes.get('squared'):
squared_error_cube.var_name += '_error'
squared_error_cube.long_name += ' (error)'
else:
squared_error_cube.var_name += '_squared_error'
squared_error_cube.long_name += ' (squared error)'
squared_error_cube.units = units_power(squared_error_cube.units, 2)
squared_error_cube.attributes['squared'] = 1
squared_error_cube.attributes['var_type'] = 'prediction_output_error'
# Aggregate errors
filenames = []
for dataset in error_datasets:
path = dataset['filename']
cube = iris.load_cube(path)
filenames.append(path)
# Check shape
if cube.shape != ref_cube.shape:
raise ValueError(
f"Expected shape {ref_cube.shape} for error cubes, got "
f"{cube.shape} for dataset '{path}'")
# Add squared error
new_data = cube.data
if not cube.attributes.get('squared'):
new_data **= 2
squared_error_cube.data += new_data
logger.debug("Added '%s' to squared error datasets", path)
squared_error_cube.attributes['filename'] = '|'.join(filenames)
return squared_error_cube
[docs]
def get_time_weights(cube, normalize=False):
"""Get time weights of cube calculated from time bounds.
Parameters
----------
cube : iris.cube.Cube
Input cube.
normalize : bool, optional (default: False)
Normalize weights with total time range.
Returns
-------
numpy.ndarray
Time weights.
Raises
------
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinate ``time``.
"""
logger.debug("Calculating time weights")
_check_coords(cube, ['time'], 'time weights')
coord = cube.coord('time')
time_weights = coord.bounds[:, 1] - coord.bounds[:, 0]
time_weights = time_weights.squeeze()
if normalize:
time_weights /= np.ma.sum(time_weights)
if time_weights.shape == ():
time_weights = np.broadcast_to(time_weights, cube.shape)
else:
time_weights = iris.util.broadcast_to_shape(time_weights, cube.shape,
cube.coord_dims('time'))
return time_weights
[docs]
def ignore_warnings():
"""Ignore warnings given by ``WARNINGS_TO_IGNORE``."""
for warning_kwargs in WARNINGS_TO_IGNORE:
warning_kwargs.setdefault('action', 'ignore')
warnings.filterwarnings(**warning_kwargs)
[docs]
def units_power(units, power):
"""Raise a :class:`cf_units.Unit` to given power preserving symbols.
Raise :class:`cf_units.Unit` to given power without expanding it first. For
example, using ``units_power(Unit('J'), 2)`` gives ``Unit('J2')``. In
contrast, simply using ``Unit('J')**2`` would yield ``'kg2 m4 s-4'``.
Parameters
----------
units : cf_units.Unit
Input units.
power : int
Desired exponent.
Returns
-------
cf_units.Unit
Input units raised to given power.
Raises
------
TypeError
Argument ``power`` is not :obj:`int`-like.
ValueError
Invalid unit given.
"""
if round(power) != power:
raise TypeError(
f"Expected integer-like power for units exponentiation, got "
f"{power}")
power = int(power)
if any([units.is_no_unit(), units.is_unknown()]):
raise ValueError(
f"Cannot raise units '{units.name}' to power {power:d}")
if units.origin is None:
logger.warning(
"Symbol-preserving exponentiation of units '%s' is not "
"supported, origin is not given", units)
return units**power
if units.origin.isdigit():
return units**power
if units.origin.split()[0][0].isdigit():
logger.warning(
"Symbol-preserving exponentiation of units '%s' is not "
"supported yet because of leading numbers", units)
return units**power
new_units_list = []
for split in units.origin.split():
for elem in split.split('.'):
if elem[-1].isdigit():
exp = [int(d) for d in re.findall(r'-?\d+', elem)][0]
val = ''.join(list(re.findall(r'[A-Za-z]', elem)))
new_units_list.append(f'{val}{exp * power}')
else:
new_units_list.append(f'{elem}{power}')
new_units = ' '.join(new_units_list)
return Unit(new_units)