Source code for esmvaltool.diag_scripts.mlr

"""Convenience functions for MLR diagnostics."""

import logging
import os
import re
import warnings
from copy import deepcopy
from functools import lru_cache
from pprint import pformat

import esmvalcore.preprocessor
import iris
import numpy as np
import shapely.vectorized as shp_vect
from import shapereader
from cf_units import Unit
from iris.fileformats.netcdf import UnknownCellMethodWarning

from esmvaltool.diag_scripts.shared import (

logger = logging.getLogger(os.path.basename(__file__))

        'message': ".* contains unknown cell method 'trend'",
        'category': UnknownCellMethodWarning,
        'module': 'iris',
        'message': "Using DEFAULT_SPHERICAL_EARTH_RADIUS",
        'category': UserWarning,
        'module': 'iris',

def _check_coords(cube, coords, weights_type):
    """Check coordinates prior to weights calculations."""
    cube_str = cube.summary(shorten=True)
    for coord_name in coords:
            coord = cube.coord(coord_name)
        except iris.exceptions.CoordinateNotFoundError:
                "Calculation of %s for cube %s failed, coordinate "
                "'%s' not found", weights_type, cube_str, coord_name)
        if not coord.has_bounds():
                "Guessing bounds of coordinate '%s' of cube %s for "
                "calculation of %s", coord_name, cube_str, weights_type)

def _get_datasets(input_data, **kwargs):
    """Get datasets according to ``**kwargs``."""
    datasets = []
    for dataset in input_data:
        dataset_copy = deepcopy(dataset)
        for key in kwargs:
            if key not in dataset_copy:
                dataset_copy[key] = None
        if select_metadata([dataset_copy], **kwargs):
    return datasets

def _get_ne_land_mask_cube(n_lats=1000, n_lons=2000):
    """Get Natural Earth land mask."""
    ne_dir = os.path.join(
    ne_file = os.path.join(ne_dir, 'ne_10m_land.shp')
    reader = shapereader.Reader(ne_file)
    geometries = list(reader.geometries())

    # Setup grid
    lat_coord = iris.coords.DimCoord(
        np.linspace(-90.0, 90.0, n_lats), var_name='lat',
        standard_name='latitude', long_name='latitude', units='degrees')
    lon_coord = iris.coords.DimCoord(
        np.linspace(-180.0, 180.0, n_lons), var_name='lon',
        standard_name='longitude', long_name='longitude', units='degrees')
    (lats, lons) = np.meshgrid(lat_coord.points, lon_coord.points)

    # Setup mask (1: land, 0: sea)
    mask = np.full(lats.shape, False, dtype=bool)
    for geometry in geometries:
        mask |= shp_vect.contains(geometry, lons, lats)
    land_mask = np.swapaxes(np.where(mask, 1, 0), 0, 1)

    # Setup cube
    cube = iris.cube.Cube(land_mask,
                          long_name='Land mask (1: land, 0: sea)',
                          dim_coords_and_dims=[(lat_coord, 0), (lon_coord, 1)])
    return cube

[docs] def check_predict_kwargs(predict_kwargs): """Check keyword argument for ``predict()`` functions. Parameters ---------- predict_kwargs : keyword arguments, optional Keyword arguments for a ``predict()`` function. Raises ------ RuntimeError ``return_var`` and ``return_cov`` are both set to ``True`` in the keyword arguments. """ return_var = predict_kwargs.get('return_var', False) return_cov = predict_kwargs.get('return_cov', False) if return_var and return_cov: raise RuntimeError( "Cannot return variance (return_cov=True) and full covariance " "matrix (return_cov=True) simultaneously")
[docs] def create_alias(dataset, attributes, delimiter='-'): """Create alias key of a dataset using a list of attributes. Parameters ---------- dataset : dict Metadata dictionary representing a single dataset. attributes : list of str List of attributes used to create the alias. delimiter : str, optional (default: '-') Delimiter used to separate different attributes in the alias. Returns ------- str Dataset alias. Raises ------ AttributeError ``dataset`` does not contain one of the ``attributes``. """ alias = [] if not attributes: raise ValueError( "Expected at least one element for attributes, got empty list") for attribute in attributes: if attribute not in dataset: raise AttributeError( f"Dataset {dataset} does not contain attribute '{attribute}' " f"for alias creation") alias.append(dataset[attribute]) return delimiter.join(alias)
[docs] def datasets_have_mlr_attributes(datasets, log_level='debug', mode='full'): """Check (MLR) attributes of ``datasets``. Parameters ---------- datasets : list of dict Datasets to check. log_level : str, optional (default: 'debug') Verbosity level of the logger. mode : str, optional (default: 'full') Checking mode. Must be one of ``'only_missing'`` (only check if attributes are missing), ``'only_var_type'`` (check only `var_type`) or ``'full'`` (check both). Returns ------- bool ``True`` if all required attributes are available, ``False`` if not. Raises ------ ValueError Invalid value for argument ``mode`` is given. """ output = True accepted_modes = ('full', 'only_missing', 'only_var_type') if mode not in accepted_modes: raise ValueError( f"'mode' must be one of {accepted_modes}, got '{mode}'") for dataset in datasets: if mode != 'only_var_type': for key in NECESSARY_KEYS: if key not in dataset: getattr(logger, log_level)( "Dataset '%s' does not have necessary (MLR) attribute " "'%s'", dataset, key) output = False if mode != 'only_missing' and dataset.get('var_type') not in VAR_TYPES: getattr(logger, log_level)( "Dataset '%s' has invalid var_type '%s', must be one of %s", dataset, dataset.get('var_type'), VAR_TYPES) output = False return output
[docs] def get_1d_cube(x_data, y_data, x_kwargs=None, y_kwargs=None): """Convert 2 arrays to :class:`iris.cube.Cube` (with single coordinate). Parameters ---------- x_data : numpy.ndarray Data for coordinate. y_data : numpy.ndarray Data for cube. x_kwargs : dict Keyword arguments passed to :class:`iris.coords.AuxCoord`. y_kwargs : dict Keyword arguments passed to :class:`iris.cube.Cube`. Returns ------- iris.cube.Cube 1D cube with single auxiliary coordinate. Raises ------ ValueError Arrays are not 1D and do not have matching shapes. """ if x_kwargs is None: x_kwargs = {} if y_kwargs is None: y_kwargs = {} x_data = y_data = if x_data.ndim != 1: raise ValueError( f"Expected 1D array for 'x_data', got {x_data.ndim:d}D array") if y_data.ndim != 1: raise ValueError( f"Expected 1D array for 'y_data', got {y_data.ndim:d}D array") if x_data.shape != y_data.shape: raise ValueError( f"Expected identical shapes for 'x_data' and 'y_data', got " f"{x_data.shape} and {y_data.shape}, respectively") aux_coord = iris.coords.AuxCoord(x_data, **x_kwargs) cube = iris.cube.Cube(y_data, aux_coords_and_dims=[(aux_coord, 0)], **y_kwargs) return cube
[docs] def get_absolute_time_units(units): """Convert time reference units to absolute ones. This function converts reference time units (like ``'days since YYYY'``) to absolute ones (like ``'days'``). Parameters ---------- units : cf_units.Unit Time units to convert. Returns ------- cf_units.Unit Absolute time units. Raises ------ ValueError If conversion failed (e.g. input units are not time units). """ if units.is_time_reference(): units = Unit(units.symbol.split()[0]) if not units.is_time(): raise ValueError( f"Cannot convert units '{units}' to reasonable time units") return units
[docs] def get_alias(dataset): """Get alias for dataset. Parameters ---------- dataset : dict Dataset metadata. Returns ------- str Alias. """ alias = f"{dataset['project']} dataset {dataset['dataset']}" additional_info = [] for key in ('mip', 'exp', 'ensemble'): if key in dataset: additional_info.append(dataset[key]) if additional_info: alias += f" ({', '.join(additional_info)})" if 'start_year' in dataset and 'end_year' in dataset: alias += f" from {dataset['start_year']:d} to {dataset['end_year']:d}" return alias
[docs] def get_all_weights(cube, area_weighted=True, time_weighted=True, landsea_fraction_weighted=None, normalize=False): """Get all desired weights for a cube. Parameters ---------- cube : iris.cube.Cube Input cube. area_weighted : bool, optional (default: True) Use area weights calculated from grid cell areas using :func:`iris.analysis.cartography.area_weights`. Only works for regular grids. time_weighted : bool, optional (default: True) Use time weights calculated from time bounds. landsea_fraction_weighted : str, optional If given, use land/sea fraction weights calculated from Natural Earth files. Must be one of ``'land'``, ``'sea'``. Only works for regular grids. normalize : bool, optional (default: False) Normalize weights with total area and total time range. Returns ------- numpy.ndarray Area weights. Raises ------ iris.exceptions.CoordinateMultiDimError Dimension of ``latitude`` or ``longitude`` coordinate is greater than 1. iris.exceptions.CoordinateNotFoundError Cube does not contain the coordinates ``latitude`` and ``longitude`` (if used with ``area_weighted`` or ``landsea_fraction_weighted``) or cube does not contain the coordinate ``time`` (if used with ``time_weighted``). ValueError ``landsea_fraction_weighted`` is not one of ``None``, ``'land'``, ``'sea'`` or coordinates ``latitude`` and ``longitude`` share dimensions. """ logger.debug("Calculating all weights of cube %s", cube.summary(shorten=True)) weights = np.ones(cube.shape) # Horizontal weights horizontal_weights = get_horizontal_weights( cube, area_weighted=area_weighted, landsea_fraction_weighted=landsea_fraction_weighted, normalize=normalize) weights *= horizontal_weights # Time weights if time_weighted: time_weights = get_time_weights(cube, normalize=normalize) weights *= time_weights return weights
[docs] def get_area_weights(cube, normalize=False): """Get area weights calculated from grid cell areas. Note ---- Only works for regular grids. Uses :func:`iris.analysis.cartography.area_weights` for an approximate calculation of the grid cell areas. Parameters ---------- cube : iris.cube.Cube Input cube. normalize : bool, optional (default: False) Normalize weights with total area. Returns ------- numpy.ndarray Area weights. Raises ------ iris.exceptions.CoordinateNotFoundError Cube does not contain the coordinates ``latitude`` and ``longitude``. """ logger.debug("Calculating area weights") _check_coords(cube, ['latitude', 'longitude'], 'area weights') area_weights = iris.analysis.cartography.area_weights(cube, normalize=normalize) return area_weights
[docs] def get_horizontal_weights(cube, area_weighted=True, landsea_fraction_weighted=None, normalize=False): """Get horizontal (latitude/longitude) weights of cube. Parameters ---------- cube : iris.cube.Cube Input cube. area_weighted : bool, optional (default: True) Use area weights calculated from grid cell areas using :func:`iris.analysis.cartography.area_weights`. Only works for regular grids. landsea_fraction_weighted : str, optional If given, use land/sea fraction weights calculated from Natural Earth files. Must be one of ``'land'``, ``'sea'``. Only works for regular grids. normalize : bool, optional (default: False) Normalize weights with sum of weights over latitude and longitude (i.e. if only ``area_weighted`` is given, this is equal to the total area). Returns ------- numpy.ndarray Horizontal (latitude/longitude) weights. Raises ------ iris.exceptions.CoordinateMultiDimError Dimension of ``latitude`` or ``longitude`` coordinate is greater than 1. iris.exceptions.CoordinateNotFoundError Cube does not contain the coordinates ``latitude`` and ``longitude``. ValueError ``landsea_fraction_weighted`` is not one of ``'land'``, ``'sea'``. """ logger.debug("Calculating horizontal weights") weights = np.ones(cube.shape) if not (area_weighted or landsea_fraction_weighted): return weights # Get weights if area_weighted: weights *= get_area_weights(cube, normalize=False) if landsea_fraction_weighted is not None: weights *= get_landsea_fraction_weights( cube, landsea_fraction_weighted, normalize=False) # No normalization if not normalize: return weights # Get horizontal dimensions horizontal_dims = [] if cube.coord_dims('latitude'): horizontal_dims.append(cube.coord_dims('latitude')[0]) if cube.coord_dims('longitude'): horizontal_dims.append(cube.coord_dims('longitude')[0]) # Normalization horizontal_dims = tuple(horizontal_dims) if not horizontal_dims: norm = np.ravel(weights)[0] else: norm = np.ravel(np.sum(weights, axis=horizontal_dims))[0] return weights / norm
[docs] def get_input_data(cfg, pattern=None, check_mlr_attributes=True, ignore=None): """Get input data and check MLR attributes if desired. Use ``input_data`` and ancestors to get all relevant input files. Parameters ---------- cfg : dict Recipe configuration. pattern : str, optional Pattern matched against ancestor file names. check_mlr_attributes : bool, optional (default: True) If ``True``, only returns datasets with valid MLR attributes. If ``False``, returns all found datasets. ignore : list of dict, optional Ignore specific datasets by specifying multiple :obj:`dict`s of metadata. By setting an attribute to ``None``, ignore all datasets which do not have that attribute. Returns ------- list of dict List of input datasets. Raises ------ ValueError No input data found or at least one dataset has invalid attributes. """ logger.debug("Extracting input files") input_data = list(cfg['input_data'].values()) input_data.extend(io.netcdf_to_metadata(cfg, pattern=pattern)) input_data = deepcopy(input_data) if ignore is not None: valid_data = [] ignored_datasets = []"Ignoring files with %s", ignore) for kwargs in ignore: ignored_datasets.extend(_get_datasets(input_data, **kwargs)) for dataset in input_data: if dataset not in ignored_datasets: valid_data.append(dataset) else: valid_data = input_data if not valid_data: raise ValueError("No input data found") if check_mlr_attributes: if not datasets_have_mlr_attributes(valid_data, log_level='error'): raise ValueError("At least one input dataset does not have valid " "MLR attributes") valid_data = sorted_metadata(valid_data, ['var_type', 'tag', 'dataset']) logger.debug("Found files:") logger.debug(pformat([d['filename'] for d in valid_data])) return valid_data
[docs] def get_landsea_fraction_weights(cube, area_type, normalize=False): """Get land/sea fraction weights calculated from Natural Earth files. Note ---- The implementation of this feature is not optimal. For large cubes, calculating the land/sea fraction weights might be very slow. Only works for regular grids. Parameters ---------- cube : iris.cube.Cube Input cube. area_type : str Area type. Must be one of ``'land'`` (land fraction weighting) or ``'sea'`` (sea fraction weighting). normalize : bool, optional (default: False) Normalize weights with total land/sea fraction. Raises ------ iris.exceptions.CoordinateMultiDimError Dimension of ``latitude`` or ``longitude`` coordinate is greater than 1. iris.exceptions.CoordinateNotFoundError Cube does not contain the coordinates ``latitude`` and ``longitude``. ValueError ``area_type`` is not one of ``'land'``, ``'sea'`` or coordinates ``latitude`` and ``longitude`` share dimensions. """ allowed_types = ('land', 'sea') if area_type not in allowed_types: raise ValueError( f"Expected one of {allowed_types} for 'area_type' of land/sea " f"fraction weighting, got '{area_type}'") logger.debug("Calculating %s fraction weights", area_type) _check_coords(cube, ['latitude', 'longitude'], f'{area_type} fraction weights') lat_coord = cube.coord('latitude') lon_coord = cube.coord('longitude') for coord in (lat_coord, lon_coord): if coord.ndim > 1: raise iris.exceptions.CoordinateMultiDimError( f"Calculating {area_type} fraction weights for " f"multidimensional coordinate '{}' is not supported") if cube.coord_dims(lat_coord) != (): if cube.coord_dims(lat_coord) == cube.coord_dims(lon_coord): raise ValueError( f"1D latitude and longitude coordinates share dimensions " f"(this usually happens with unstructured grids) - " f"calculating {area_type} fraction weights for latitude and " "longitude that share dimensions is not possible") # Calculate land fractions on coordinate grid of cube ne_land_mask_cube = _get_ne_land_mask_cube() land_fraction = np.empty((lat_coord.shape[0], lon_coord.shape[0]), dtype=np.float64) for lat_idx in range(lat_coord.shape[0]): for lon_idx in range(lon_coord.shape[0]): lat_bounds = lat_coord.bounds[lat_idx] lon_bounds = lon_coord.bounds[lon_idx] submask = ne_land_mask_cube.intersection(latitude=lat_bounds, longitude=lon_bounds) land_fraction[lat_idx, lon_idx] = ( / if area_type == 'sea': fraction_weights = 1.0 - land_fraction else: fraction_weights = land_fraction if normalize: fraction_weights /= # Broadcast to original shape coord_dims = [] if cube.coord_dims(lon_coord): coord_dims.append(cube.coord_dims(lon_coord)[0]) else: fraction_weights = np.squeeze(fraction_weights, axis=1) if cube.coord_dims(lat_coord): coord_dims.insert(0, cube.coord_dims(lat_coord)[0]) else: fraction_weights = np.squeeze(fraction_weights, axis=0) fraction_weights = iris.util.broadcast_to_shape(fraction_weights, cube.shape, tuple(coord_dims)) return fraction_weights
[docs] def get_new_path(cfg, old_path): """Convert old path to new diagnostic path. Parameters ---------- cfg : dict Recipe configuration. old_path : str Old path. Returns ------- str New diagnostic path. """ basename = os.path.splitext(os.path.basename(old_path))[0] new_path = get_diagnostic_filename(basename, cfg) return new_path
[docs] def get_squared_error_cube(ref_cube, error_datasets): """Get array of squared errors. Parameters ---------- ref_cube : iris.cube.Cube Reference cube (determines mask, coordinates and attributes of output). error_datasets : list of dict List of metadata dictionaries where each dictionary represents a single dataset. Returns ------- iris.cube.Cube Cube containing squared errors. Raises ------ ValueError Shape of a dataset does not match shape of reference cube. """ squared_error_cube = ref_cube.copy() # Fill cube with zeros = np.full(squared_error_cube.shape, 0.0),, ) # Adapt cube metadata if 'error' in squared_error_cube.attributes.get('var_type', ''): if not squared_error_cube.attributes.get('squared'): squared_error_cube.var_name += '_squared' squared_error_cube.long_name += ' (squared)' squared_error_cube.units = units_power(squared_error_cube.units, 2) else: if squared_error_cube.attributes.get('squared'): squared_error_cube.var_name += '_error' squared_error_cube.long_name += ' (error)' else: squared_error_cube.var_name += '_squared_error' squared_error_cube.long_name += ' (squared error)' squared_error_cube.units = units_power(squared_error_cube.units, 2) squared_error_cube.attributes['squared'] = 1 squared_error_cube.attributes['var_type'] = 'prediction_output_error' # Aggregate errors filenames = [] for dataset in error_datasets: path = dataset['filename'] cube = iris.load_cube(path) filenames.append(path) # Check shape if cube.shape != ref_cube.shape: raise ValueError( f"Expected shape {ref_cube.shape} for error cubes, got " f"{cube.shape} for dataset '{path}'") # Add squared error new_data = if not cube.attributes.get('squared'): new_data **= 2 += new_data logger.debug("Added '%s' to squared error datasets", path) squared_error_cube.attributes['filename'] = '|'.join(filenames) return squared_error_cube
[docs] def get_time_weights(cube, normalize=False): """Get time weights of cube calculated from time bounds. Parameters ---------- cube : iris.cube.Cube Input cube. normalize : bool, optional (default: False) Normalize weights with total time range. Returns ------- numpy.ndarray Time weights. Raises ------ iris.exceptions.CoordinateNotFoundError Cube does not contain the coordinate ``time``. """ logger.debug("Calculating time weights") _check_coords(cube, ['time'], 'time weights') coord = cube.coord('time') time_weights = coord.bounds[:, 1] - coord.bounds[:, 0] time_weights = time_weights.squeeze() if normalize: time_weights /= if time_weights.shape == (): time_weights = np.broadcast_to(time_weights, cube.shape) else: time_weights = iris.util.broadcast_to_shape(time_weights, cube.shape, cube.coord_dims('time')) return time_weights
[docs] def ignore_warnings(): """Ignore warnings given by ``WARNINGS_TO_IGNORE``.""" for warning_kwargs in WARNINGS_TO_IGNORE: warning_kwargs.setdefault('action', 'ignore') warnings.filterwarnings(**warning_kwargs)
[docs] def square_root_metadata(cube): """Take the square root of the cube metadata. Parameters ---------- cube : iris.cube.Cube Cube (will be modified in-place). """ if 'squared_' in cube.var_name: cube.var_name = cube.var_name.replace('squared_', '') elif '_squared' in cube.var_name: cube.var_name = cube.var_name.replace('_squared', '') else: cube.var_name = 'root_' + cube.var_name if 'squared ' in cube.long_name: cube.long_name = cube.long_name.replace('squared ', '') elif 'Squared ' in cube.long_name: cube.long_name = cube.long_name.replace('Squared ', '') elif ' squared' in cube.long_name: cube.long_name = cube.long_name.replace(' squared', '') elif ' Squared' in cube.long_name: cube.long_name = cube.long_name.replace(' Squared', '') elif ' (squared)' in cube.long_name: cube.long_name = cube.long_name.replace(' (squared)', '') elif ' (Squared)' in cube.long_name: cube.long_name = cube.long_name.replace(' (Squared)', '') else: cube.long_name = 'Root ' + cube.long_name cube.units = cube.units.root(2) if cube.attributes.get('squared'): cube.attributes.pop('squared')
[docs] def units_power(units, power): """Raise a :class:`cf_units.Unit` to given power preserving symbols. Raise :class:`cf_units.Unit` to given power without expanding it first. For example, using ``units_power(Unit('J'), 2)`` gives ``Unit('J2')``. In contrast, simply using ``Unit('J')**2`` would yield ``'kg2 m4 s-4'``. Parameters ---------- units : cf_units.Unit Input units. power : int Desired exponent. Returns ------- cf_units.Unit Input units raised to given power. Raises ------ TypeError Argument ``power`` is not :obj:`int`-like. ValueError Invalid unit given. """ if round(power) != power: raise TypeError( f"Expected integer-like power for units exponentiation, got " f"{power}") power = int(power) if any([units.is_no_unit(), units.is_unknown()]): raise ValueError( f"Cannot raise units '{}' to power {power:d}") if units.origin is None: logger.warning( "Symbol-preserving exponentiation of units '%s' is not " "supported, origin is not given", units) return units**power if units.origin.isdigit(): return units**power if units.origin.split()[0][0].isdigit(): logger.warning( "Symbol-preserving exponentiation of units '%s' is not " "supported yet because of leading numbers", units) return units**power new_units_list = [] for split in units.origin.split(): for elem in split.split('.'): if elem[-1].isdigit(): exp = [int(d) for d in re.findall(r'-?\d+', elem)][0] val = ''.join(list(re.findall(r'[A-Za-z]', elem))) new_units_list.append(f'{val}{exp * power}') else: new_units_list.append(f'{elem}{power}') new_units = ' '.join(new_units_list) return Unit(new_units)