"""Area operations on data cubes.
Allows for selecting data subsets using certain latitude and longitude
bounds; selecting geographical regions; constructing area averages; etc.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Literal, Optional
import fiona
import iris
import numpy as np
import shapely
import shapely.ops
from dask import array as da
from iris.coords import AuxCoord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from esmvalcore.preprocessor._shared import (
apply_mask,
get_dims_along_axes,
get_iris_aggregator,
get_normalized_cube,
preserve_float_dtype,
try_adding_calculated_cell_area,
update_weights_kwargs,
)
from esmvalcore.preprocessor._supplementary_vars import (
add_ancillary_variable,
add_cell_measure,
register_supplementaries,
remove_supplementary_variables,
)
if TYPE_CHECKING:
from esmvalcore.config import Session
logger = logging.getLogger(__name__)
SHAPE_ID_KEYS: tuple[str, ...] = ("name", "NAME", "Name", "id", "ID")
def _extract_irregular_region(
cube, start_longitude, end_longitude, start_latitude, end_latitude
):
"""Extract a region from a cube on an irregular grid."""
# Convert longitudes to valid range
if start_longitude != 360.0:
start_longitude %= 360.0
if end_longitude != 360.0:
end_longitude %= 360.0
# Select coordinates inside the region
lats = cube.coord("latitude").points
lons = (cube.coord("longitude").points + 360.0) % 360.0
if start_longitude <= end_longitude:
select_lons = (lons >= start_longitude) & (lons <= end_longitude)
else:
select_lons = (lons >= start_longitude) | (lons <= end_longitude)
if start_latitude <= end_latitude:
select_lats = (lats >= start_latitude) & (lats <= end_latitude)
else:
select_lats = (lats >= start_latitude) | (lats <= end_latitude)
selection = select_lats & select_lons
# Crop the selection, but keep rectangular shape
i_range, j_range = selection.nonzero()
if i_range.size == 0:
raise ValueError("No data points available in selected region")
i_min, i_max = i_range.min(), i_range.max()
j_min, j_max = j_range.min(), j_range.max()
i_slice, j_slice = slice(i_min, i_max + 1), slice(j_min, j_max + 1)
cube = cube[..., i_slice, j_slice]
selection = selection[i_slice, j_slice]
# Mask remaining coordinates outside region
horizontal_dims = get_dims_along_axes(cube, ["X", "Y"])
cube.data = apply_mask(~selection, cube.core_data(), horizontal_dims)
return cube
[docs]
@preserve_float_dtype
def zonal_statistics(
cube: Cube,
operator: str,
normalize: Optional[Literal["subtract", "divide"]] = None,
**operator_kwargs,
) -> Cube:
"""Compute zonal statistics.
Parameters
----------
cube:
Input cube.
operator:
The operation. Used to determine the :class:`iris.analysis.Aggregator`
object used to calculate the statistics. Allowed options are given in
:ref:`this table <supported_stat_operator>`.
normalize:
If given, do not return the statistics cube itself, but rather, the
input cube, normalized with the statistics cube. Can either be
`subtract` (statistics cube is subtracted from the input cube) or
`divide` (input cube is divided by the statistics cube).
**operator_kwargs:
Optional keyword arguments for the :class:`iris.analysis.Aggregator`
object defined by `operator`.
Returns
-------
iris.cube.Cube
Zonal statistics cube or input cube normalized by statistics cube (see
`normalize`).
Raises
------
ValueError
Error raised if computation on irregular grids is attempted.
Zonal statistics not yet implemented for irregular grids.
"""
if cube.coord("longitude").points.ndim >= 2:
raise ValueError(
"Zonal statistics on irregular grids not yet implemented"
)
(agg, agg_kwargs) = get_iris_aggregator(operator, **operator_kwargs)
result = cube.collapsed("longitude", agg, **agg_kwargs)
if normalize is not None:
result = get_normalized_cube(cube, result, normalize)
return result
[docs]
@preserve_float_dtype
def meridional_statistics(
cube: Cube,
operator: str,
normalize: Optional[Literal["subtract", "divide"]] = None,
**operator_kwargs,
) -> Cube:
"""Compute meridional statistics.
Parameters
----------
cube:
Input cube.
operator:
The operation. Used to determine the :class:`iris.analysis.Aggregator`
object used to calculate the statistics. Allowed options are given in
:ref:`this table <supported_stat_operator>`.
normalize:
If given, do not return the statistics cube itself, but rather, the
input cube, normalized with the statistics cube. Can either be
`subtract` (statistics cube is subtracted from the input cube) or
`divide` (input cube is divided by the statistics cube).
**operator_kwargs:
Optional keyword arguments for the :class:`iris.analysis.Aggregator`
object defined by `operator`.
Returns
-------
iris.cube.Cube
Meridional statistics cube.
Raises
------
ValueError
Error raised if computation on irregular grids is attempted.
Zonal statistics not yet implemented for irregular grids.
"""
if cube.coord("latitude").points.ndim >= 2:
raise ValueError(
"Meridional statistics on irregular grids not yet implemented"
)
(agg, agg_kwargs) = get_iris_aggregator(operator, **operator_kwargs)
result = cube.collapsed("latitude", agg, **agg_kwargs)
if normalize is not None:
result = get_normalized_cube(cube, result, normalize)
return result
[docs]
@register_supplementaries(
variables=["areacella", "areacello"],
required="prefer_at_least_one",
)
@preserve_float_dtype
def area_statistics(
cube: Cube,
operator: str,
normalize: Optional[Literal["subtract", "divide"]] = None,
**operator_kwargs,
) -> Cube:
"""Apply a statistical operator in the horizontal plane.
We assume that the horizontal directions are ['longitude', 'latitude'].
:ref:`This table <supported_stat_operator>` shows a list of supported
operators. All operators that support weights are by default weighted with
the grid cell areas. Note that for area-weighted sums, the units of the
resulting cube will be multiplied by m :math:`^2`.
Parameters
----------
cube:
Input cube. The input cube should have a
:class:`iris.coords.CellMeasure` named ``'cell_area'``, unless it has
regular 1D latitude and longitude coordinates so the cell areas can be
computed using :func:`iris.analysis.cartography.area_weights`.
operator:
The operation. Used to determine the :class:`iris.analysis.Aggregator`
object used to calculate the statistics. Allowed options are given in
:ref:`this table <supported_stat_operator>`.
normalize:
If given, do not return the statistics cube itself, but rather, the
input cube, normalized with the statistics cube. Can either be
`subtract` (statistics cube is subtracted from the input cube) or
`divide` (input cube is divided by the statistics cube).
**operator_kwargs:
Optional keyword arguments for the :class:`iris.analysis.Aggregator`
object defined by `operator`.
Returns
-------
iris.cube.Cube
Collapsed cube.
Raises
------
iris.exceptions.CoordinateMultiDimError
Cube has irregular or unstructured grid but supplementary variable
`cell_area` is not available.
"""
has_cell_measure = bool(cube.cell_measures("cell_area"))
# Get aggregator and correct kwargs (incl. weights)
(agg, agg_kwargs) = get_iris_aggregator(operator, **operator_kwargs)
agg_kwargs = update_weights_kwargs(
agg, agg_kwargs, "cell_area", cube, try_adding_calculated_cell_area
)
result = cube.collapsed(["latitude", "longitude"], agg, **agg_kwargs)
if normalize is not None:
result = get_normalized_cube(cube, result, normalize)
# Make sure input cube has not been modified
if not has_cell_measure and cube.cell_measures("cell_area"):
cube.remove_cell_measure("cell_area")
return result
def _crop_cube(
cube: Cube,
start_longitude: float,
start_latitude: float,
end_longitude: float,
end_latitude: float,
cmor_coords: bool = True,
) -> Cube:
"""Crop cubes on a regular grid."""
lon_coord = cube.coord(axis="X")
lat_coord = cube.coord(axis="Y")
if lon_coord.ndim == 1 and lat_coord.ndim == 1:
# add a padding of one cell around the cropped cube
lon_bound = lon_coord.core_bounds()[0]
lon_step = lon_bound[1] - lon_bound[0]
start_longitude -= lon_step
if not cmor_coords:
if start_longitude < -180.0:
start_longitude = -180.0
else:
if start_longitude < 0:
start_longitude = 0
end_longitude += lon_step
if not cmor_coords:
if end_longitude > 180.0:
end_longitude = 180.0
else:
if end_longitude > 360:
end_longitude = 360.0
lat_bound = lat_coord.core_bounds()[0]
lat_step = lat_bound[1] - lat_bound[0]
start_latitude -= lat_step
if start_latitude < -90:
start_latitude = -90.0
end_latitude += lat_step
if end_latitude > 90.0:
end_latitude = 90.0
cube = extract_region(
cube, start_longitude, end_longitude, start_latitude, end_latitude
)
return cube
def _select_representative_point(
shape,
lon: np.ndarray,
lat: np.ndarray,
) -> np.ndarray:
"""Get mask to select a representative point."""
representative_point = shape.representative_point()
points = shapely.geometry.MultiPoint(
np.stack((np.ravel(lon), np.ravel(lat)), axis=1)
)
nearest_point = shapely.ops.nearest_points(points, representative_point)[0]
nearest_lon, nearest_lat = nearest_point.coords[0]
mask = (lon == nearest_lon) & (lat == nearest_lat)
return mask
def _correct_coords_from_shapefile(
cube: Cube,
cmor_coords: bool,
pad_north_pole: bool,
pad_hawaii: bool,
) -> tuple[np.ndarray, np.ndarray]:
"""Get correct lat and lon from shapefile."""
lon = cube.coord(axis="X").points
lat = cube.coord(axis="Y").points
if cube.coord(axis="X").ndim < 2:
lon, lat = np.meshgrid(lon, lat, copy=False)
if not cmor_coords:
# Wrap around longitude coordinate to match data
lon = lon.copy() # ValueError: assignment destination is read-only
lon[lon >= 180.0] -= 360.0
# the NE mask may not have points at x = -180 and y = +/-90
# so we will fool it and apply the mask at (-179, -89, 89) instead
if pad_hawaii:
lon = np.where(lon == -180.0, lon + 1.0, lon)
if pad_north_pole:
lat_0 = np.where(lat == -90.0, lat + 1.0, lat)
lat = np.where(lat_0 == 90.0, lat_0 - 1.0, lat_0)
return lon, lat
def _process_ids(geometries, ids: list | dict | None) -> tuple:
"""Read requested IDs and ID keys."""
# If ids is a dict, it needs to have length 1 and all geometries needs to
# have the requested attribute key
if isinstance(ids, dict):
if len(ids) != 1:
raise ValueError(
f"If `ids` is given as dict, it needs exactly one entry, got "
f"{ids}"
)
key = list(ids.keys())[0]
for geometry in geometries:
if key not in geometry["properties"]:
raise ValueError(
f"Geometry {dict(geometry['properties'])} does not have "
f"requested attribute {key}"
)
id_keys: tuple[str, ...] = (key,)
ids = ids[key]
# Otherwise, use SHAPE_ID_KEYS to get ID
else:
id_keys = SHAPE_ID_KEYS
# IDs should be strings or None
if not ids:
ids = None
if ids is not None:
ids = [str(id_) for id_ in ids]
return (id_keys, ids)
def _get_requested_geometries(
geometries,
ids: list | dict | None,
shapefile: Path,
) -> dict[str, dict]:
"""Return requested geometries."""
(id_keys, ids) = _process_ids(geometries, ids)
# Iterate through all geometries and select matching elements
requested_geometries = {}
for reading_order, geometry in enumerate(geometries):
for key in id_keys:
if key in geometry["properties"]:
geometry_id = str(geometry["properties"][key])
break
# If none of the attributes are available in the geometry, use reading
# order as last resort
else:
geometry_id = str(reading_order)
logger.debug("Found shape '%s'", geometry_id)
# Select geometry if its ID is requested or all IDs are requested
# (i.e., ids=None)
if ids is None or geometry_id in ids:
requested_geometries[geometry_id] = geometry
# Check if all requested IDs have been found
if ids is not None:
missing = set(ids) - set(requested_geometries.keys())
if missing:
raise ValueError(
f"Requested shapes {missing} not found in shapefile "
f"{shapefile}"
)
return requested_geometries
def _get_masks_from_geometries(
geometries: dict[str, dict],
lon: np.ndarray,
lat: np.ndarray,
method: str = "contains",
decomposed: bool = False,
) -> dict[str, np.ndarray]:
"""Get cube masks from requested regions."""
if method not in {"contains", "representative"}:
raise ValueError(
"Invalid value for `method`. Choose from 'contains', ",
"'representative'.",
)
masks = {}
for id_, geometry in geometries.items():
masks[id_] = _get_single_mask(lon, lat, method, geometry)
if not decomposed and len(masks) > 1:
return _merge_masks(masks, lat.shape)
return masks
def _get_bounds(
geometries: dict[str, dict],
) -> tuple[float, float, float, float]:
"""Get bounds from given geometries.
Parameters
----------
geometries: fiona.collection.Collection
Fiona collection of shapes (geometries).
Returns
-------
lat_min, lon_min, lat_max, lon_max
Coordinates deliminating bounding box for shape ids.
"""
all_bounds = np.vstack(
[fiona.bounds(geom) for geom in geometries.values()]
)
lon_max, lat_max = all_bounds[:, 2:].max(axis=0)
lon_min, lat_min = all_bounds[:, :2].min(axis=0)
return lon_min, lat_min, lon_max, lat_max
def _get_single_mask(
lon: np.ndarray,
lat: np.ndarray,
method: str,
geometry: dict,
) -> np.ndarray:
"""Get single mask from one region."""
shape = shapely.geometry.shape(geometry["geometry"])
if method == "contains":
mask = shapely.vectorized.contains(shape, lon, lat)
if method == "representative" or not mask.any():
mask = _select_representative_point(shape, lon, lat)
return mask
def _merge_masks(
masks: dict[str, np.ndarray],
shape: tuple,
) -> dict[str, np.ndarray]:
"""Merge masks into one."""
merged_mask = np.zeros(shape, dtype=bool)
for mask in masks.values():
merged_mask |= mask
return {"0": merged_mask}
def fix_coordinate_ordering(cube: Cube) -> Cube:
"""Transpose the cube dimensions.
This is done such that the order of dimension is in standard order, i.e.:
[time] [shape_id] [other_coordinates] latitude longitude
where dimensions between brackets are optional.
Parameters
----------
cube:
Input cube.
Returns
-------
iris.cube.Cube
Cube with dimensions transposed to standard order
"""
try:
time_dim = cube.coord_dims("time")
except CoordinateNotFoundError:
time_dim = ()
try:
shape_dim = cube.coord_dims("shape_id")
except CoordinateNotFoundError:
shape_dim = ()
other = list(range(len(cube.shape)))
for dim in [time_dim, shape_dim]:
for i in dim:
other.remove(i)
other_dims = tuple(other)
order = time_dim + shape_dim + other_dims
cube.transpose(new_order=order)
return cube
def _update_shapefile_path(
shapefile: str | Path,
session: Optional[Session] = None,
) -> Path:
"""Update path to shapefile."""
shapefile = str(shapefile)
shapefile_path = Path(shapefile)
# Try absolute path
logger.debug("extract_shape: Looking for shapefile %s", shapefile_path)
if shapefile_path.exists():
return shapefile_path
# Try path relative to auxiliary_data_dir if session is given
if session is not None:
shapefile_path = session["auxiliary_data_dir"] / shapefile
logger.debug("extract_shape: Looking for shapefile %s", shapefile_path)
if shapefile_path.exists():
return shapefile_path
# Try path relative to esmvalcore/preprocessor/shapefiles/
shapefile_path = Path(__file__).parent / "shapefiles" / shapefile
logger.debug("extract_shape: Looking for shapefile %s", shapefile_path)
if shapefile_path.exists():
return shapefile_path
# As final resort, add suffix '.shp' and try path relative to
# esmvalcore/preprocessor/shapefiles/ again
# Note: this will find "special" shapefiles like 'ar6'
shapefile_path = (
Path(__file__).parent / "shapefiles" / f"{shapefile.lower()}.shp"
)
if shapefile_path.exists():
return shapefile_path
# If no valid shapefile has been found, return original input (an error
# will be raised at a later stage)
return Path(shapefile)
def _mask_cube(cube: Cube, masks: dict[str, np.ndarray]) -> Cube:
"""Mask input cube."""
cubelist = CubeList()
for id_, mask in masks.items():
_cube = cube.copy()
remove_supplementary_variables(_cube)
_cube.add_aux_coord(
AuxCoord(id_, units="no_unit", long_name="shape_id")
)
horizontal_dims = get_dims_along_axes(cube, axes=["X", "Y"])
_cube.data = apply_mask(~mask, _cube.core_data(), horizontal_dims)
cubelist.append(_cube)
result = fix_coordinate_ordering(cubelist.merge_cube())
for measure in cube.cell_measures():
# Cell measures that are time-dependent, with 4 dimension and
# an original shape of (time, depth, lat, lon), need to be
# broadcast to the cube with 5 dimensions and shape
# (time, shape_id, depth, lat, lon)
if measure.ndim > 3 and result.ndim > 4:
data = measure.core_data()
if result.has_lazy_data():
# Make the cell measure lazy if the result is lazy.
cube_chunks = cube.lazy_data().chunks
chunk_dims = cube.cell_measure_dims(measure)
data = da.asarray(
data,
chunks=tuple(cube_chunks[i] for i in chunk_dims),
)
chunks = result.lazy_data().chunks
else:
chunks = None
dim_map = get_dims_along_axes(result, ["T", "Z", "Y", "X"])
data = iris.util.broadcast_to_shape(
data,
result.shape,
dim_map=dim_map,
chunks=chunks,
)
measure = iris.coords.CellMeasure(
data,
standard_name=measure.standard_name,
long_name=measure.long_name,
units=measure.units,
measure=measure.measure,
var_name=measure.var_name,
attributes=measure.attributes,
)
add_cell_measure(result, measure, measure.measure)
for ancillary_variable in cube.ancillary_variables():
add_ancillary_variable(result, ancillary_variable)
return result