"""Convenience functions for MLR diagnostics."""
import logging
import os
import re
import warnings
from copy import deepcopy
from pprint import pformat
import esmvalcore.preprocessor
import iris
import numpy as np
import shapely.vectorized as shp_vect
from cartopy.io import shapereader
from cf_units import Unit
from iris.fileformats.netcdf import UnknownCellMethodWarning
from esmvaltool.diag_scripts.shared import (
get_diagnostic_filename,
io,
select_metadata,
)
logger = logging.getLogger(os.path.basename(__file__))
NECESSARY_KEYS = io.NECESSARY_KEYS + [
'tag',
'var_type',
]
VAR_TYPES = [
'feature',
'label',
'label_to_rescale',
'prediction_input',
'prediction_input_error',
'prediction_output',
'prediction_output_error',
'prediction_output_misc',
'prediction_reference',
'prediction_residual',
]
WARNINGS_TO_IGNORE = [
{
'message': ".* contains unknown cell method 'trend'",
'category': UnknownCellMethodWarning,
'module': 'iris',
},
{
'message': "Using DEFAULT_SPHERICAL_EARTH_RADIUS",
'category': UserWarning,
'module': 'iris',
},
]
def _check_coords(cube, coords, weights_type):
"""Check coordinates prior to weights calculations."""
cube_str = cube.summary(shorten=True)
for coord_name in coords:
try:
coord = cube.coord(coord_name)
except iris.exceptions.CoordinateNotFoundError:
logger.error(
"Calculation of %s for cube %s failed, coordinate "
"'%s' not found", weights_type, cube_str, coord_name)
raise
if not coord.has_bounds():
logger.debug(
"Guessing bounds of coordinate '%s' of cube %s for "
"calculation of %s", coord_name, cube_str, weights_type)
coord.guess_bounds()
def _get_datasets(input_data, **kwargs):
"""Get datasets according to ``**kwargs``."""
datasets = []
for dataset in input_data:
dataset_copy = deepcopy(dataset)
for key in kwargs:
if key not in dataset_copy:
dataset_copy[key] = None
if select_metadata([dataset_copy], **kwargs):
datasets.append(dataset)
return datasets
def _get_ne_land_mask_cube(n_lats=1000, n_lons=2000):
"""Get Natural Earth land mask."""
ne_dir = os.path.join(
os.path.dirname(os.path.realpath(esmvalcore.preprocessor.__file__)),
'ne_masks',
)
ne_file = os.path.join(ne_dir, 'ne_10m_land.shp')
reader = shapereader.Reader(ne_file)
geometries = list(reader.geometries())
# Setup grid
lat_coord = iris.coords.DimCoord(
np.linspace(-90.0, 90.0, n_lats), var_name='lat',
standard_name='latitude', long_name='latitude', units='degrees')
lon_coord = iris.coords.DimCoord(
np.linspace(-180.0, 180.0, n_lons), var_name='lon',
standard_name='longitude', long_name='longitude', units='degrees')
(lats, lons) = np.meshgrid(lat_coord.points, lon_coord.points)
# Setup mask (1: land, 0: sea)
mask = np.full(lats.shape, False, dtype=bool)
for geometry in geometries:
mask |= shp_vect.contains(geometry, lons, lats)
land_mask = np.swapaxes(np.where(mask, 1, 0), 0, 1)
# Setup cube
cube = iris.cube.Cube(land_mask,
var_name='land_mask',
long_name='Land mask (1: land, 0: sea)',
units='no_unit',
dim_coords_and_dims=[(lat_coord, 0), (lon_coord, 1)])
return cube
def _has_valid_coords(cube, coords):
"""Check if cube has valid coords for calculating weights."""
for coord_name in coords:
try:
cube.coord(coord_name)
except iris.exceptions.CoordinateNotFoundError:
return False
return True
[docs]def check_predict_kwargs(predict_kwargs):
"""Check keyword argument for ``predict()`` functions.
Parameters
----------
predict_kwargs : keyword arguments, optional
Keyword arguments for a ``predict()`` function.
Raises
------
RuntimeError
``return_var`` and ``return_cov`` are both set to ``True`` in the
keyword arguments.
"""
return_var = predict_kwargs.get('return_var', False)
return_cov = predict_kwargs.get('return_cov', False)
if return_var and return_cov:
raise RuntimeError(
"Cannot return variance (return_cov=True) and full covariance "
"matrix (return_cov=True) simultaneously")
[docs]def create_alias(dataset, attributes, delimiter='-'):
"""Create alias key of a dataset using a list of attributes.
Parameters
----------
dataset : dict
Metadata dictionary representing a single dataset.
attributes : list of str
List of attributes used to create the alias.
delimiter : str, optional (default: '-')
Delimiter used to separate different attributes in the alias.
Returns
-------
str
Dataset alias.
Raises
------
AttributeError
``dataset`` does not contain one of the ``attributes``.
"""
alias = []
if not attributes:
raise ValueError(
"Expected at least one element for attributes, got empty list")
for attribute in attributes:
if attribute not in dataset:
raise AttributeError(
f"Dataset {dataset} does not contain attribute '{attribute}' "
f"for alias creation")
alias.append(dataset[attribute])
return delimiter.join(alias)
[docs]def datasets_have_mlr_attributes(datasets, log_level='debug', mode='full'):
"""Check (MLR) attributes of ``datasets``.
Parameters
----------
datasets : list of dict
Datasets to check.
log_level : str, optional (default: 'debug')
Verbosity level of the logger.
mode : str, optional (default: 'full')
Checking mode. Must be one of ``'only_missing'`` (only check if
attributes are missing), ``'only_var_type'`` (check only `var_type`) or
``'full'`` (check both).
Returns
-------
bool
``True`` if all required attributes are available, ``False`` if not.
Raises
------
ValueError
Invalid value for argument ``mode`` is given.
"""
output = True
accepted_modes = ('full', 'only_missing', 'only_var_type')
if mode not in accepted_modes:
raise ValueError(
f"'mode' must be one of {accepted_modes}, got '{mode}'")
for dataset in datasets:
if mode != 'only_var_type':
for key in NECESSARY_KEYS:
if key not in dataset:
getattr(logger, log_level)(
"Dataset '%s' does not have necessary (MLR) attribute "
"'%s'", dataset, key)
output = False
if mode != 'only_missing' and dataset.get('var_type') not in VAR_TYPES:
getattr(logger, log_level)(
"Dataset '%s' has invalid var_type '%s', must be one of %s",
dataset, dataset.get('var_type'), VAR_TYPES)
output = False
return output
[docs]def get_1d_cube(x_data, y_data, x_kwargs=None, y_kwargs=None):
"""Convert 2 arrays to :class:`iris.cube.Cube` (with single coordinate).
Parameters
----------
x_data : numpy.ndarray
Data for coordinate.
y_data : numpy.ndarray
Data for cube.
x_kwargs : dict
Keyword arguments passed to :class:`iris.coords.AuxCoord`.
y_kwargs : dict
Keyword arguments passed to :class:`iris.cube.Cube`.
Returns
-------
iris.cube.Cube
1D cube with single auxiliary coordinate.
Raises
------
ValueError
Arrays are not 1D and do not have matching shapes.
"""
if x_kwargs is None:
x_kwargs = {}
if y_kwargs is None:
y_kwargs = {}
x_data = np.ma.array(x_data)
y_data = np.ma.array(y_data)
if x_data.ndim != 1:
raise ValueError(
f"Expected 1D array for 'x_data', got {x_data.ndim:d}D array")
if y_data.ndim != 1:
raise ValueError(
f"Expected 1D array for 'y_data', got {y_data.ndim:d}D array")
if x_data.shape != y_data.shape:
raise ValueError(
f"Expected identical shapes for 'x_data' and 'y_data', got "
f"{x_data.shape} and {y_data.shape}, respectively")
aux_coord = iris.coords.AuxCoord(x_data, **x_kwargs)
cube = iris.cube.Cube(y_data, aux_coords_and_dims=[(aux_coord, 0)],
**y_kwargs)
return cube
[docs]def get_absolute_time_units(units):
"""Convert time reference units to absolute ones.
This function converts reference time units (like ``'days since YYYY'``) to
absolute ones (like ``'days'``).
Parameters
----------
units : cf_units.Unit
Time units to convert.
Returns
-------
cf_units.Unit
Absolute time units.
Raises
------
ValueError
If conversion failed (e.g. input units are not time units).
"""
if units.is_time_reference():
units = Unit(units.symbol.split()[0])
if not units.is_time():
raise ValueError(
f"Cannot convert units '{units}' to reasonable time units")
return units
[docs]def get_alias(dataset):
"""Get alias for dataset.
Parameters
----------
dataset : dict
Dataset metadata.
Returns
-------
str
Alias.
"""
alias = f"{dataset['project']} dataset {dataset['dataset']}"
additional_info = []
for key in ('mip', 'exp', 'ensemble'):
if key in dataset:
additional_info.append(dataset[key])
if additional_info:
alias += f" ({', '.join(additional_info)})"
if 'start_year' in dataset and 'end_year' in dataset:
alias += f" from {dataset['start_year']:d} to {dataset['end_year']:d}"
return alias
[docs]def get_all_weights(cube, area_weighted=True, time_weighted=True,
landsea_fraction_weighted=None, normalize=False):
"""Get all possible weights of cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_weighted : bool, optional (default: True)
Use area weights.
time_weighted : bool, optional (default: True)
Use time weights.
landsea_fraction_weighted : str, optional
If given, use land/sea fraction weights. Must be one of ``'land'``,
``'sea'``.
normalize : bool, optional (default: False)
Normalize weights with total area and total time range.
Returns
-------
numpy.ndarray
Area weights.
"""
logger.debug("Calculating all weights of cube %s",
cube.summary(shorten=True))
weights = np.ones(cube.shape)
# Horizontal weights
if _has_valid_coords(cube, ['latitude', 'longitude']):
horizontal_weights = get_horizontal_weights(
cube, area_weighted=area_weighted,
landsea_fraction_weighted=landsea_fraction_weighted,
normalize=normalize)
if horizontal_weights is not None:
weights *= horizontal_weights
# Time weights
if _has_valid_coords(cube, ['time']) and time_weighted:
time_weights = get_time_weights(cube, normalize=normalize)
weights *= time_weights
return weights
[docs]def get_area_weights(cube, normalize=False):
"""Get area weights of cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
normalize : bool, optional (default: False)
Normalize weights with total area.
Returns
-------
numpy.ndarray
Area weights.
Raises
------
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
"""
logger.debug("Calculating area weights")
_check_coords(cube, ['latitude', 'longitude'], 'area weights')
area_weights = iris.analysis.cartography.area_weights(cube,
normalize=normalize)
return area_weights
[docs]def get_horizontal_weights(cube, area_weighted=True,
landsea_fraction_weighted=None, normalize=False):
"""Get horizontal weights of cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_weighted : bool, optional (default: True)
Use area weights.
landsea_fraction_weighted : str, optional
If given, use land/sea fraction weights. Must be one of ``'land'``,
``'sea'``.
normalize : bool, optional (default: False)
Normalize weights with sum of weights over latitude and longitude (i.e.
if only ``area_weighted`` is given, this is equal to the total area).
Returns
-------
numpy.ndarray
Area weights.
Raises
------
iris.exceptions.CoordinateMultiDimError
Dimension of latitude or longitude coordinate is greater than 1.
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
ValueError
``landsea_fraction_weighted`` is not one of ``'land'``, ``'sea'``.
"""
logger.debug("Calculating horizontal weights")
if not (area_weighted or landsea_fraction_weighted):
return None
# Get weights
weights = np.ones(cube.shape)
if area_weighted:
weights *= get_area_weights(cube, normalize=False)
if landsea_fraction_weighted is not None:
weights *= get_landsea_fraction_weights(
cube, landsea_fraction_weighted, normalize=False)
# No normalization
if not normalize:
return weights
# Get horizontal dimensions
horizontal_dims = []
if cube.coord_dims('latitude'):
horizontal_dims.append(cube.coord_dims('latitude')[0])
if cube.coord_dims('longitude'):
horizontal_dims.append(cube.coord_dims('longitude')[0])
# Normalization
horizontal_dims = tuple(horizontal_dims)
if not horizontal_dims:
norm = np.ravel(weights)[0]
else:
norm = np.ravel(np.sum(weights, axis=horizontal_dims))[0]
return weights / norm
[docs]def get_landsea_fraction_weights(cube, area_type, normalize=False):
"""Get land/sea fraction weights of cube using Natural Earth files.
Note
----
The implementation of this feature is not optimal. For large cubes,
calculating the land/sea fraction weights might be very slow.
Parameters
----------
cube : iris.cube.Cube
Input cube.
area_type : str
Area type. Must be one of ``'land'`` (land fraction weighting) or
``'sea'`` (sea fraction weighting).
normalize : bool, optional (default: False)
Normalize weights with total land/sea fraction.
Raises
------
iris.exceptions.CoordinateMultiDimError
Dimension of latitude or longitude coordinate is greater than 1.
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinates ``latitude`` and ``longitude``.
ValueError
``area_type`` is not one of ``'land'``, ``'sea'``.
"""
allowed_types = ('land', 'sea')
if area_type not in allowed_types:
raise ValueError(
f"Expected one of {allowed_types} for 'area_type' of land/sea "
f"fraction weighting, got '{area_type}'")
logger.debug("Calculating %s fraction weights", area_type)
_check_coords(cube, ['latitude', 'longitude'],
f'{area_type} fraction weights')
lat_coord = cube.coord('latitude')
lon_coord = cube.coord('longitude')
for coord in (lat_coord, lon_coord):
if coord.ndim > 1:
raise iris.exceptions.CoordinateMultiDimError(
f"Calculating {area_type} fraction weights for "
f"multidimensional coordinate '{coord.name}' is not supported")
# Calculate land fractions on coordinate grid of cube
ne_land_mask_cube = _get_ne_land_mask_cube()
land_fraction = np.empty((lat_coord.shape[0], lon_coord.shape[0]),
dtype=np.float64)
for lat_idx in range(lat_coord.shape[0]):
for lon_idx in range(lon_coord.shape[0]):
lat_bounds = lat_coord.bounds[lat_idx]
lon_bounds = lon_coord.bounds[lon_idx]
submask = ne_land_mask_cube.intersection(latitude=lat_bounds,
longitude=lon_bounds)
land_fraction[lat_idx, lon_idx] = (submask.data.sum() /
submask.data.size)
if area_type == 'sea':
fraction_weights = 1.0 - land_fraction
else:
fraction_weights = land_fraction
if normalize:
fraction_weights /= np.ma.sum(fraction_weights)
# Broadcast to original shape
coord_dims = []
if cube.coord_dims(lon_coord):
coord_dims.append(cube.coord_dims(lon_coord)[0])
else:
fraction_weights = np.squeeze(fraction_weights, axis=1)
if cube.coord_dims(lat_coord):
coord_dims.insert(0, cube.coord_dims(lat_coord)[0])
else:
fraction_weights = np.squeeze(fraction_weights, axis=0)
fraction_weights = iris.util.broadcast_to_shape(fraction_weights,
cube.shape,
tuple(coord_dims))
return fraction_weights
[docs]def get_new_path(cfg, old_path):
"""Convert old path to new diagnostic path.
Parameters
----------
cfg : dict
Recipe configuration.
old_path : str
Old path.
Returns
-------
str
New diagnostic path.
"""
basename = os.path.splitext(os.path.basename(old_path))[0]
new_path = get_diagnostic_filename(basename, cfg)
return new_path
[docs]def get_squared_error_cube(ref_cube, error_datasets):
"""Get array of squared errors.
Parameters
----------
ref_cube : iris.cube.Cube
Reference cube (determines mask, coordinates and attributes of output).
error_datasets : list of dict
List of metadata dictionaries where each dictionary represents a single
dataset.
Returns
-------
iris.cube.Cube
Cube containing squared errors.
Raises
------
ValueError
Shape of a dataset does not match shape of reference cube.
"""
squared_error_cube = ref_cube.copy()
# Fill cube with zeros
squared_error_cube.data = np.ma.array(
np.full(squared_error_cube.shape, 0.0),
mask=np.ma.getmaskarray(squared_error_cube.data),
)
# Adapt cube metadata
if 'error' in squared_error_cube.attributes.get('var_type', ''):
if not squared_error_cube.attributes.get('squared'):
squared_error_cube.var_name += '_squared'
squared_error_cube.long_name += ' (squared)'
squared_error_cube.units = units_power(squared_error_cube.units, 2)
else:
if squared_error_cube.attributes.get('squared'):
squared_error_cube.var_name += '_error'
squared_error_cube.long_name += ' (error)'
else:
squared_error_cube.var_name += '_squared_error'
squared_error_cube.long_name += ' (squared error)'
squared_error_cube.units = units_power(squared_error_cube.units, 2)
squared_error_cube.attributes['squared'] = 1
squared_error_cube.attributes['var_type'] = 'prediction_output_error'
# Aggregate errors
filenames = []
for dataset in error_datasets:
path = dataset['filename']
cube = iris.load_cube(path)
filenames.append(path)
# Check shape
if cube.shape != ref_cube.shape:
raise ValueError(
f"Expected shape {ref_cube.shape} for error cubes, got "
f"{cube.shape} for dataset '{path}'")
# Add squared error
new_data = cube.data
if not cube.attributes.get('squared'):
new_data **= 2
squared_error_cube.data += new_data
logger.debug("Added '%s' to squared error datasets", path)
squared_error_cube.attributes['filename'] = '|'.join(filenames)
return squared_error_cube
[docs]def get_time_weights(cube, normalize=False):
"""Get time weights of cube.
Parameters
----------
cube : iris.cube.Cube
Input cube.
normalize : bool, optional (default: False)
Normalize weights with total time range.
Returns
-------
numpy.ndarray
Time weights.
Raises
------
iris.exceptions.CoordinateNotFoundError
Cube does not contain the coordinate ``time``.
"""
logger.debug("Calculating time weights")
_check_coords(cube, ['time'], 'time weights')
coord = cube.coord('time')
time_weights = coord.bounds[:, 1] - coord.bounds[:, 0]
time_weights = time_weights.squeeze()
if normalize:
time_weights /= np.ma.sum(time_weights)
if time_weights.shape == ():
time_weights = np.broadcast_to(time_weights, cube.shape)
else:
time_weights = iris.util.broadcast_to_shape(time_weights, cube.shape,
cube.coord_dims('time'))
return time_weights
[docs]def ignore_warnings():
"""Ignore warnings given by ``WARNINGS_TO_IGNORE``."""
for warning_kwargs in WARNINGS_TO_IGNORE:
warning_kwargs.setdefault('action', 'ignore')
warnings.filterwarnings(**warning_kwargs)
[docs]def units_power(units, power):
"""Raise a :class:`cf_units.Unit` to given power preserving symbols.
Raise :class:`cf_units.Unit` to given power without expanding it first. For
example, using ``units_power(Unit('J'), 2)`` gives ``Unit('J2')``. In
contrast, simply using ``Unit('J')**2`` would yield ``'kg2 m4 s-4'``.
Parameters
----------
units : cf_units.Unit
Input units.
power : int
Desired exponent.
Returns
-------
cf_units.Unit
Input units raised to given power.
Raises
------
TypeError
Argument ``power`` is not :obj:`int`-like.
ValueError
Invalid unit given.
"""
if round(power) != power:
raise TypeError(
f"Expected integer-like power for units exponentiation, got "
f"{power}")
power = int(power)
if any([units.is_no_unit(), units.is_unknown()]):
raise ValueError(
f"Cannot raise units '{units.name}' to power {power:d}")
if units.origin is None:
logger.warning(
"Symbol-preserving exponentiation of units '%s' is not "
"supported, origin is not given", units)
return units**power
if units.origin.isdigit():
return units**power
if units.origin.split()[0][0].isdigit():
logger.warning(
"Symbol-preserving exponentiation of units '%s' is not "
"supported yet because of leading numbers", units)
return units**power
new_units_list = []
for split in units.origin.split():
for elem in split.split('.'):
if elem[-1].isdigit():
exp = [int(d) for d in re.findall(r'-?\d+', elem)][0]
val = ''.join(list(re.findall(r'[A-Za-z]', elem)))
new_units_list.append(f'{val}{exp * power}')
else:
new_units_list.append(f'{elem}{power}')
new_units = ' '.join(new_units_list)
return Unit(new_units)