"""Convenience functions for emergent constraints diagnostics."""
import logging
import os
from copy import deepcopy
from pprint import pformat
import iris
import iris.pandas
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy
import seaborn as sns
import yaml
from packaging.version import Version
from scipy import integrate
from scipy.stats import linregress
if Version(scipy.version.version) < Version('1.14.0'):
from scipy.integrate import simps as simpson
else:
from scipy.integrate import simpson
from esmvaltool.diag_scripts.shared import (
ProvenanceLogger,
get_diagnostic_filename,
get_plot_filename,
io,
)
logger = logging.getLogger(__name__)
ALLOWED_VAR_TYPES = [
'feature',
'label',
'prediction_input',
'prediction_input_error',
]
COLOR_COMBINED_GROUPS = 'gray'
LEGEND_KWARGS = {
'loc': 'center left',
'bbox_to_anchor': [1.05, 0.5],
'borderaxespad': 0.0,
}
PANDAS_PRINT_OPTIONS = [
'display.max_rows', None,
'display.max_colwidth', None,
]
def _check_x_y_arrays(x_array, y_array):
"""Ensure that the X and Y arrays have correct shapes."""
x_array = np.ma.array(x_array)
y_array = np.ma.array(y_array)
# Check shapes
if x_array.ndim != 1:
raise ValueError(
f"Expected 1D array for X training data, got {x_array.ndim:d}D "
f"array")
if y_array.ndim != 1:
raise ValueError(
f"Expected 1D array for Y training data, got {y_array.ndim:d}D "
f"array")
if x_array.shape != y_array.shape:
raise ValueError(
f"Expected identical shapes for X and Y training data, got "
f"{x_array.shape} and {y_array.shape}, respectively")
# Remove masked values
mask = np.ma.getmaskarray(x_array)
mask |= np.ma.getmaskarray(y_array)
x_array = np.array(x_array[~mask])
y_array = np.array(y_array[~mask])
return (x_array, y_array)
def _add_column(data_frame, series, column_name):
"""Add column to :class:`pandas.DataFrame` (expands index if necessary)."""
rows_to_add = [
pd.Series(name=row, dtype=np.float64).to_frame().T for row in
series.index.difference(data_frame.index)
]
data_frame = pd.concat([data_frame] + rows_to_add)
if column_name in data_frame.columns:
for row in series.index:
if np.isnan(data_frame.loc[row, column_name]):
data_frame.loc[row, column_name] = series.loc[row]
else:
if not np.isclose(data_frame.loc[row, column_name],
series.loc[row]):
raise ValueError(
f"Got duplicate data for tag '{column_name}' of "
f"'{row}': {series.loc[row]:e} and "
f"{data_frame.loc[row, column_name]:e}")
else:
data_frame[column_name] = series
return data_frame
def _crop_data_frame(data_frame, ref_data_frame, data_name, ref_data_name):
"""Crop columns of a data_frame so that it matches a given reference."""
diff_not_in_data_frame = list(
ref_data_frame.columns.difference(data_frame.columns))
if diff_not_in_data_frame:
raise ValueError(
f"No '{data_name}' given for tags {diff_not_in_data_frame}")
diff_not_in_ref = list(
data_frame.columns.difference(ref_data_frame.columns))
if diff_not_in_ref:
logger.warning(
"Ignoring '%s' of tags %s: no corresponding '%s' data available",
data_name, diff_not_in_ref, ref_data_name)
data_frame = data_frame[ref_data_frame.columns]
return data_frame
def _check_data_frames(features, label, pred_input, pred_input_err):
"""Check indexes and columns of the input data."""
if not list(features.columns):
raise ValueError("Expected at least one feature")
if len(label.columns) != 1:
raise ValueError(
f"Expected exactly 1 'label' variable, got {len(label.columns):d}")
# Compare features and label
if list(features.index) != list(label.index):
raise ValueError(
f"Expected identical datasets (climate models; independent "
f"observations) for 'feature' and 'label', got "
f"{features.index.values} and {label.index.values}")
if len(features.index) < 2:
raise ValueError("Expected at least two training points for features")
# Compare features and prediction input data
pred_input = _crop_data_frame(pred_input, features, 'prediction_input',
'feature')
pred_input_err = _crop_data_frame(pred_input_err, features,
'prediction_input_error', 'feature')
# Compare prediction_input and prediction_input_error
if not list(pred_input.index):
raise ValueError("Expected at least one prediction input point")
if list(pred_input.index) != list(pred_input_err.index):
raise ValueError(
f"Expected identical training points for 'prediction_input' and "
f"'prediction_input_error', got {pred_input.index.values} "
f"and {pred_input_err.index.values}")
return (features, label, pred_input, pred_input_err)
def _combine_dicts(old_dict, new_dict):
"""Combine two :obj:`dict` by adding values for identical keys to lists."""
old_dict = deepcopy(old_dict)
new_dict = deepcopy(new_dict)
for (key, val) in new_dict.items():
if key not in old_dict:
old_dict[key] = val
continue
if isinstance(old_dict[key], list):
if not isinstance(val, list):
val = [val]
old_dict[key] = list(set([*old_dict[key], *val]))
else:
if not isinstance(val, list):
if not np.array_equal(val, old_dict[key]):
old_dict[key] = [old_dict[key], val]
else:
old_dict[key] = list(set([old_dict[key], *val]))
return old_dict
def _get_additional_data(additional_data, recipe):
"""Get :class:`iris.cube.CubeList` from additional data."""
if additional_data is None:
return iris.cube.CubeList()
cubes = _metadata_list_to_cube_list(additional_data, 'additional_data')
for cube in cubes:
cube.attributes['filename'] = recipe
return cubes
def _get_attributes(cubes):
"""Extract attributes for features and labels."""
attributes = {}
# Extract attributes
for cube in cubes:
cube_attrs = cube.attributes
tag = cube_attrs['tag']
attributes.setdefault(tag, {})
if cube_attrs['var_type'] in ('feature', 'label'):
attributes[tag] = _combine_dicts(attributes[tag],
_metadata_to_dict(cube.metadata))
elif cube_attrs['var_type'] in ('prediction_input',
'prediction_input_error'):
attributes[tag] = _combine_dicts(
attributes[tag], {'filename': cube_attrs['filename']})
else:
raise ValueError(
f"File '{cube_attrs['filename']}' has invalid var_type "
f"'{cube_attrs['var_type']}'")
# Set default attributes and remove lengthy 'provenance' entry
for tag in attributes:
attributes[tag].pop('provenance', None)
attributes[tag].setdefault('plot_title', f"Emergent constraint {tag}")
if 'units' in attributes[tag]:
axis_label = f"{tag} [{attributes[tag]['units']}]"
attributes[tag].setdefault('plot_xlabel', axis_label)
attributes[tag].setdefault('plot_ylabel', axis_label)
else:
attributes[tag].setdefault('plot_xlabel', tag)
attributes[tag].setdefault('plot_ylabel', tag)
attributes[tag].setdefault('plot_xlim', None)
attributes[tag].setdefault('plot_ylim', None)
return attributes
def _get_cube_list(input_files,
recipe,
additional_data=None,
external_file=None,
merge_identical_pred_input=True):
"""Get :class:`iris.cube.CubeList` of input files."""
cubes = iris.cube.CubeList()
# Input files
for filename in input_files:
logger.info("Loading '%s'", filename)
cube = _load_cube_with_dataset_coord(filename)
cube.attributes['filename'] = filename
(feature_cube,
prediction_cube) = _split_cube(cube, merge_identical_pred_input)
if feature_cube is not None:
cubes.append(feature_cube)
if prediction_cube is not None:
cubes.append(prediction_cube)
# Additional data
cubes.extend(_get_additional_data(additional_data, recipe))
# External file
cubes.extend(_get_external_cube_list(external_file))
# Check metadata of cubes
for cube in cubes:
check_metadata(cube.attributes)
return cubes
def _get_external_cube_list(external_file):
"""Get external :class:`iris.cube.CubeList`."""
if external_file is None:
return iris.cube.CubeList()
with open(external_file, 'r') as infile:
metadata_list = yaml.safe_load(infile)
cubes = _metadata_list_to_cube_list(metadata_list, external_file)
for cube in cubes:
cube.attributes['filename'] = external_file
return cubes
def _get_external_file(filepath, auxiliary_data_dir):
"""Get full path to external file (if available)."""
if not filepath:
return None
filepath = os.path.expanduser(os.path.expandvars(filepath))
if not os.path.isabs(filepath):
filepath = os.path.join(auxiliary_data_dir, filepath)
if not os.path.isfile(filepath):
raise FileNotFoundError(
f"Desired external file '{filepath}' does not exist")
logger.info("Found external file '%s'", filepath)
return filepath
def _get_data_frame(var_type, cubes, label_all_data, group_by=None):
"""Extract :class:`pandas.DataFrame` for a given ``var_type``."""
data_frame = pd.DataFrame()
for cube in cubes:
cube_attrs = cube.attributes
if var_type != cube_attrs['var_type']:
continue
if var_type in ('feature', 'label'):
if group_by is not None and group_by not in cube_attrs:
raise AttributeError(
f"Group attribute '{group_by}' not available in input "
f"file '{cube_attrs['filename']}'")
group = cube_attrs.get(group_by, label_all_data)
index = pd.MultiIndex.from_product(
[[group], cube.coord('dataset').points],
names=[group_by, 'dataset'])
else:
index = cube.coord('dataset').points
series = pd.Series(data=cube.data, index=index)
data_frame = _add_column(data_frame, series, cube_attrs['tag'])
return data_frame
def _metadata_to_dict(metadata):
"""Convert :class:`iris.cube.CubeMetadata` to :obj:`dict`."""
new_dict = dict(metadata.attributes)
other_keys = [
'standard_name',
'long_name',
'var_name',
'units',
'cell_methods',
]
for key in other_keys:
new_dict[key] = getattr(metadata, key)
return new_dict
def _split_cube(cube, merge_identical_pred_input=True):
"""Split cube in features and prediction_input."""
if not cube.attributes.get('reference_dataset'):
return (cube, None)
# Get feature and prediction_input datasets
features_datasets = []
predicton_datasets = []
references = cube.attributes['reference_dataset']
for dataset in cube.coord('dataset').points:
if dataset in references:
predicton_datasets.append(dataset)
logger.info(
"Using dataset '%s' as prediction_input for variable '%s' "
"with index %d", dataset, cube.var_name,
len(predicton_datasets) - 1)
else:
features_datasets.append(dataset)
# Extract cubes
feature_cube = cube.extract(iris.Constraint(dataset=features_datasets))
prediction_cube = cube.extract(iris.Constraint(dataset=predicton_datasets))
feature_cube.attributes['var_type'] = 'feature'
prediction_cube.attributes['var_type'] = 'prediction_input'
# Merge identical prediction_input if desired
if merge_identical_pred_input:
(_, unique_idx) = np.unique(prediction_cube.data, return_index=True)
diff = len(prediction_cube.coord('dataset').points) - len(unique_idx)
if diff > 0:
prediction_cube = prediction_cube[unique_idx]
logger.info(
"Removed %d identical prediction_input points for variable "
"'%s'", diff, prediction_cube.var_name)
# Set new index for prediction input
prediction_cube.coord('dataset').points = np.arange(
len(prediction_cube.coord('dataset').points))
return (feature_cube, prediction_cube)
def _cube_to_dataset_coord(cube):
"""Convert :class:`iris.cube.Cube` to :class:`iris.coords.AuxCoord`."""
if cube.ndim == 1:
datasets = cube.data
elif cube.ndim == 2:
cube.data = cube.data.astype(str, casting='same_kind')
datasets = [''.join(d.compressed()) for d in cube.data]
else:
raise ValueError(
f"Only 1D and 2D cubes supported, got {cube.ndim:d}D cube")
return iris.coords.AuxCoord(datasets,
var_name='dataset',
long_name='dataset')
def _get_first_cube_with_coord(cubes, accepted_coord_names):
"""Load single cube of :class:`iris.cube.CubeList` with specific coords."""
returned_cube = None
returned_coord = None
for cube in cubes:
for coord_name in accepted_coord_names:
try:
coord = cube.coord(coord_name)
returned_cube = cube
returned_coord = coord
break
except iris.exceptions.CoordinateNotFoundError:
pass
if returned_cube is not None:
break
else:
raise ValueError(
f"No cube of {cubes} contains 'dataset' coordinate (i.e. one of "
f"{accepted_coord_names})")
return (returned_cube, returned_coord)
def _load_cube_with_dataset_coord(filename):
"""Load cube with single ``dataset``-like coordinate.
Files created by NCL cannot be read using a simple
:func:`iris.load_cube`.
"""
cubes = iris.load(filename)
accepted_coord_names = ('dataset', 'model')
# Handle single cube
if len(cubes) == 1:
(cube, coord) = _get_first_cube_with_coord(cubes, accepted_coord_names)
if cube.ndim != 1:
raise ValueError(
f"Only 1D cubes supported, got {cube.ndim:d}D cube in file "
f"'{filename}'")
coord.var_name = 'dataset'
coord.standard_name = None
coord.long_name = 'dataset'
return cube
# At most two cubes are supported
if len(cubes) > 2:
raise ValueError(
f"Loading NCL file '{filename}' failed, at most 2 cubes are "
f"supported, got {len(cubes):d}")
# Get 'model' or 'dataset' cube
dataset_cube = None
for cube in cubes:
if cube.var_name in accepted_coord_names:
logger.debug("Found coordinate cube '%s'", cube.var_name)
dataset_cube = cube
else:
data_cube = cube
if dataset_cube is None:
raise ValueError(
f"No 'dataset' coordinate (one of {accepted_coord_names}) in "
f"file '{filename}' available")
# Create new coordinate
if data_cube.ndim != 1:
raise ValueError(
f"Only 1D cubes supported, got {data_cube.ndim:d}D cube in file "
f"'{filename}'")
if data_cube.shape[0] != dataset_cube.shape[0]:
raise ValueError(
f"Got differing sizes for first dimension of data cube "
f"({data_cube.shape[0]:d}) and dataset cube "
f"({dataset_cube.shape[0]:d}) in file '{filename}'")
aux_coord = _cube_to_dataset_coord(dataset_cube)
data_cube.add_aux_coord(aux_coord, 0)
return data_cube
def _create_scatterplot(x_data,
y_data,
numbers_as_markers=True,
plot_regression_line_mean=False,
axes=None,
**kwargs):
"""Create single scatterplot including regression line."""
if axes is None:
(_, axes) = plt.subplots()
# Scatterplots
scatter_kwargs = dict(kwargs)
scatter_kwargs.pop('label', None)
for (idx, _) in enumerate(x_data):
if numbers_as_markers:
axes.text(x_data[idx],
y_data[idx],
x_data.index.get_level_values(-1)[idx],
size=7,
**scatter_kwargs)
else:
axes.scatter(x_data[idx], y_data[idx], **scatter_kwargs)
# Regression line
line_kwargs = {**kwargs, 'linestyle': '-'}
fill_between_kwargs = {**kwargs, 'alpha': 0.3}
fill_between_kwargs.pop('label', None)
if plot_regression_line_mean:
mean_kwargs = {**kwargs, 'marker': 'o'}
mean_kwargs.pop('label', None)
mean_kwargs.pop('linestyle', None)
else:
mean_kwargs = None
axes = _create_regplot(x_data,
y_data,
axes=axes,
line_kwargs=line_kwargs,
fill_between_kwargs=fill_between_kwargs,
mean_kwargs=mean_kwargs)
return axes
def _create_pred_input_plot(x_pred,
x_pred_error,
axes,
vline_kwargs=None,
vspan_kwargs=None):
"""Create plot for prediction input data (vertical lines)."""
if vline_kwargs is None:
vline_kwargs = {'color': 'k', 'linestyle': ':', 'label': 'Observation'}
if vspan_kwargs is None:
vspan_kwargs = {'color': 'k', 'alpha': 0.1}
x_pred = x_pred[0]
x_pred_error = x_pred_error[0]
axes.axvline(x_pred, **vline_kwargs)
axes.axvspan(x_pred - x_pred_error, x_pred + x_pred_error, **vspan_kwargs)
return axes
def _create_pred_output_plot(x_data,
y_data,
x_pred,
x_pred_error,
axes,
hline_kwargs=None):
"""Create plot for prediction input data (vertical lines)."""
if hline_kwargs is None:
hline_kwargs = {'color': 'k', 'linestyle': ':'}
(_, y_mean, _) = get_constraint(x_data, y_data, x_pred, x_pred_error)
axes.axhline(y_mean, **hline_kwargs)
return axes
def _create_regplot(x_data,
y_data,
axes=None,
line_kwargs=None,
fill_between_kwargs=None,
mean_kwargs=None):
"""Create single regression line plot."""
if axes is None:
(_, axes) = plt.subplots()
if line_kwargs is None:
line_kwargs = {'linestyle': '-', 'label': 'Linear regression'}
if fill_between_kwargs is None:
fill_between_kwargs = {'alpha': 0.3}
# Create regression line
reg = regression_line(x_data, y_data)
# Add R2 and p-value to label if possible
text = rf"$R^2={reg['rvalue']**2:.2f}, p={reg['pvalue']:.4f}$"
if 'label' in line_kwargs:
line_kwargs['label'] += rf' ({text})'
else:
if reg['rvalue'] < 0.0:
axes.text(0.62, 0.93, text, transform=axes.transAxes)
else:
axes.text(0.02, 0.93, text, transform=axes.transAxes)
# Plots regression
axes.plot(reg['x'], reg['y'], **line_kwargs)
axes.fill_between(reg['x'], reg['y_minus_err'], reg['y_plus_err'],
**fill_between_kwargs)
# Plot means if desired
if mean_kwargs is not None:
x_mean = np.mean(reg['x'])
y_mean = np.mean(reg['y'])
axes.scatter(x_mean, y_mean, **mean_kwargs)
return axes
def _get_pandas_cube(pandas_object):
"""Convert :mod:`pandas` object to cube and fix coordinates."""
cube = iris.pandas.as_cube(pandas_object)
for coord_name in ('index', 'columns'):
try:
names = getattr(pandas_object, coord_name).names
except AttributeError:
continue
coord = cube.coord(coord_name)
if not np.issubdtype(coord.dtype, np.number):
coord.points = coord.points.astype(str)
if coord.bounds is not None:
coord.bounds = coord.bounds.astype(str)
names = [n for n in names if n is not None]
if not names:
continue
new_coord_name = '-'.join(names)
coord.var_name = new_coord_name
coord.long_name = new_coord_name
return cube
def _metadata_list_to_cube_list(metadata_list, source):
"""Convert :obj:`list` of :obj:`dict` to :class:`iris.cube.CubeList`."""
cubes = iris.cube.CubeList()
for metadata in metadata_list:
for attr in ('data', 'dataset'):
if attr not in metadata:
raise AttributeError(
f"Entry {metadata} from source '{source}' does not "
f"contain necessary attribute '{attr}'")
aux_coord = iris.coords.AuxCoord(metadata.pop('dataset'),
var_name='dataset',
long_name='dataset')
data_of_cube = metadata.pop('data')
if data_of_cube is None:
data_of_cube = np.nan
cube = iris.cube.Cube(data_of_cube,
aux_coords_and_dims=[(aux_coord, ())])
for key in ('var_name', 'standard_name', 'long_name', 'units'):
if key in metadata:
setattr(cube, key, metadata.pop(key))
cube.attributes = metadata
cubes.append(cube)
return cubes
def _gaussian_pdf(x_val, x_mean, x_std):
"""Return Gaussian probability density."""
norm = np.sqrt(2.0 * np.pi * x_std**2)
return np.exp(-(x_val - x_mean)**2 / 2.0 / x_std**2) / norm
def _get_target_pdf(x_data,
y_data,
obs_mean,
obs_std,
n_points=1000,
necessary_p_value=None):
"""Get PDF of target variable including linear regression information."""
(x_data, y_data) = _check_x_y_arrays(x_data, y_data)
spe = standard_prediction_error(x_data, y_data)
reg = linregress(x_data, y_data)
# Get evenly spaced range of y
y_range = 1.5 * (np.max(y_data) - np.min(y_data))
y_lin = np.linspace(
np.min(y_data) - y_range,
np.max(y_data) + y_range, n_points)
# Use unconstrained value of desired and necessary
if necessary_p_value is not None:
if reg.pvalue > necessary_p_value:
y_pdf = _gaussian_pdf(y_lin, np.mean(y_data), np.std(y_data))
return (y_lin, y_pdf, reg)
# Helper functions for calculation of constrained target variable
def obs_pdf(x_new):
"""Return PDF of observations P(x)."""
return _gaussian_pdf(x_new, obs_mean, obs_std)
def cond_pdf(x_new, y_new):
"""Return conditional PDF P(y|x)."""
y_pred = reg.slope * x_new + reg.intercept
y_std = spe(x_new)
return _gaussian_pdf(y_new, y_pred, y_std)
def comb_pdf(x_new, y_new):
"""Return combined PDF P(y,x)."""
return obs_pdf(x_new) * cond_pdf(x_new, y_new)
# PDF of target variable P(y)
x_range = 3 * obs_std
y_pdf = [
integrate.quad(comb_pdf,
obs_mean - x_range,
obs_mean + x_range,
args=(y, ))[0] for y in y_lin
]
return (y_lin, np.array(y_pdf), reg)
[docs]
def get_xy_data_without_nans(data_frame, feature, label):
"""Get (X, Y) data for ``(feature, label)`` combination without nans.
Parameters
----------
data_frame : pandas.DataFrame
Training data.
feature : str
Name of the feature data.
label : str
Name of the label data.
Returns
-------
tuple
Tuple containing a :class:`pandas.DataFrame` for the X axis (feature)
and a :class:`pandas.DataFrame` for the Y axis (label) without
missing values.
"""
idx_slice = pd.IndexSlice[:, [feature, label]]
data_frame_xy = data_frame.loc[:, idx_slice]
data_frame_xy.columns = data_frame_xy.columns.droplevel()
data_frame_xy = data_frame_xy.dropna()
x_data = data_frame_xy[feature]
y_data = data_frame_xy[label]
return (x_data, y_data)
[docs]
def combine_groups(groups):
"""Combine :obj:`list` of groups to a single :obj:`str`.
Parameters
----------
groups : list of str
List of group names.
Returns
-------
str
Combined :obj:`str`.
"""
new_str = ', '.join(groups)
return new_str
[docs]
def pandas_object_to_cube(pandas_object,
index_droplevel=None,
columns_droplevel=None,
**kwargs):
"""Convert pandas object to :class:`iris.cube.Cube`.
Parameters
----------
pandas_object : pandas.DataFrame or pandas.Series
Data to convert.
index_droplevel : int or list of int, optional
Drop levels of index if not ``None``.
columns_droplevel : int or list of int, optional
Drop levels of columns if not ``None``. Can only be used if
``pandas_object`` is a :class:`pandas.DataFrame`.
**kwargs : Keyword arguments
Keyword arguments used for the cube metadata, e.g. ``standard_name``,
``var_name``, etc.
Returns
-------
iris.cube.Cube
Data cube.
Raises
------
TypeError
``columns_droplevel`` is used when ``pandas_object`` is not a
:class:`pandas.DataFrame`.
"""
pandas_object = pandas_object.copy()
if index_droplevel is not None:
pandas_object.index = pandas_object.index.droplevel(index_droplevel)
if columns_droplevel is not None:
try:
pandas_object.columns = pandas_object.columns.droplevel(
columns_droplevel)
except AttributeError:
raise TypeError(
f"'columns_droplevel' only supported for pandas.DataFrame "
f"object, got {type(pandas_object)}")
cube = _get_pandas_cube(pandas_object)
for (key, val) in kwargs.items():
setattr(cube, key, val)
return cube
[docs]
def set_plot_appearance(axes, attributes, **kwargs):
"""Set appearance of a plot.
Parameters
----------
axes : matplotlib.axes.Axes
Matplotlib Axes object which contains the plot.
attributes : dict
Plot attributes.
**kwargs : Keyword arguments
Keyword arguments of the form ``plot_option=tag`` where ``plot_option``
is something like ``plot_title``, ``plot_xlabel``, ``plot_xlim``, etc.
and ``tag`` a key for the plot attributes :obj:`dict` that describes
which attributes should be considered for that ``plot_option``.
"""
for (plot_option, tag) in kwargs.items():
plot_func = plot_option.replace('plot_', 'set_')
value = attributes[tag][plot_option]
getattr(axes, plot_func)(value)
[docs]
def get_caption(attributes, feature, label, group=None):
"""Construct caption from plotting attributes for (feature, label) pair.
Parameters
----------
attributes : dict
Plot attributes.
feature : str
Feature.
label : str
Label.
group : str, optional
Group.
Returns
-------
str
Caption.
Raises
------
KeyError
``attributes`` does not include necessary keys.
"""
group_str = '' if group is None else f' ({group})'
if feature not in attributes:
raise KeyError(
f"Attributes do not include necessary key for feature '{feature}'")
if label not in attributes:
raise KeyError(
f"Attributes do not include necessary key for label '{label}'")
feature_attrs = attributes[feature]
label_attrs = attributes[label]
if 'plot_title' not in feature_attrs:
raise KeyError(
f"Attributes for feature '{feature}' does not include necessary "
f"key 'plot_title'")
if 'plot_xlabel' not in feature_attrs:
raise KeyError(
f"Attributes for feature '{feature}' does not include necessary "
f"key 'plot_xlabel'")
if 'plot_ylabel' not in label_attrs:
raise KeyError(
f"Attributes for label '{label}' does not include necessary "
f"key 'plot_ylabel'")
caption = (f"{attributes[feature]['plot_title']}: "
f"{attributes[label]['plot_ylabel']} vs. "
f"{attributes[feature]['plot_xlabel']}{group_str}.")
return caption
[docs]
def get_provenance_record(attributes, tags, **kwargs):
"""Get provenance record.
Parameters
----------
attributes : dict
Plot attributes. All provenance keys need to start with
``'provenance_'``.
tags : list of str
Tags used to retrieve data from the ``attributes`` :obj:`dict`, i.e.
features and/or label.
**kwargs : Keyword arguments
Additional ``key:value`` pairs directly passed to the provenance record
:obj:`dict`. All values may include the format strings ``{feature}``
and ``{label}``.
Returns
-------
dict
Provenance record.
"""
record = {}
for tag in tags:
for (key, value) in attributes[tag].items():
if key.startswith('provenance_'):
key = key.replace('provenance_', '')
record.setdefault(key, [])
if isinstance(value, str):
record[key].append(value)
else:
record[key].extend(value)
record.setdefault('ancestors', [])
if key == 'filename':
if isinstance(value, str):
record['ancestors'].append(value)
else:
record['ancestors'].extend(value)
for (key, value) in record.items():
if isinstance(value, list):
record[key] = list(set(value))
record.update(kwargs)
return record
[docs]
def get_colors(cfg, groups=None):
"""Get color palette.
Parameters
----------
cfg : dict
Recipe configuration.
groups : list, optional
Use to check whether color for combining groups has to be added.
Returns
-------
list
List of colors that can be used for :mod:`matplotlib`.
"""
palette = cfg.get('seaborn_settings', {}).get('palette')
colors = sns.color_palette(palette=palette)
if groups is None:
return colors
if len(groups) > 1 and cfg.get('combine_groups', False):
return [COLOR_COMBINED_GROUPS] + colors
return colors
[docs]
def get_groups(training_data, add_combined_group=False):
"""Extract groups from training data.
Parameters
----------
training_data : pandas.DataFrame
Training data (features, label).
add_combined_group : bool, optional (default: False)
Add combined group of all other groups at the beginning of the
returned :obj:`list`.
Returns
-------
list of str
Groups.
"""
groups = list(set(training_data.index.get_level_values(0)))
groups.sort()
if add_combined_group and len(groups) > 1:
groups.insert(0, combine_groups(groups))
return groups
[docs]
def plot_individual_scatterplots(training_data, pred_input_data, attributes,
basename, cfg):
"""Plot individual scatterplots for the different groups.
Plot scatterplots for all pairs of ``(feature, label)`` data (Separate plot
for each group).
Parameters
----------
training_data : pandas.DataFrame
Training data (features, label).
pred_input_data : pandas.DataFrame
Prediction input data (mean and error).
attributes : dict
Plot attributes for the different features and the label data.
basename : str
Basename for the name of the file.
cfg : dict
Recipe configuration.
"""
logger.info("Plotting individual scatterplots")
label = training_data.y.columns[0]
groups = get_groups(training_data,
add_combined_group=cfg.get('combine_groups', False))
# Iterate over features
for feature in training_data.x.columns:
(x_data, y_data) = get_xy_data_without_nans(training_data, feature,
label)
# Individual plots
colors = get_colors(cfg, groups=groups)
for (idx, group) in enumerate(groups):
try:
x_sub_data = x_data.loc[group]
y_sub_data = y_data.loc[group]
index_droplevel = 1
except KeyError:
x_sub_data = x_data
y_sub_data = y_data
index_droplevel = [0, 2]
axes = _create_scatterplot(
x_sub_data,
y_sub_data,
numbers_as_markers=cfg.get('numbers_as_markers', False),
plot_regression_line_mean=cfg.get('plot_regression_line_mean',
False),
color=colors[idx],
label=group)
axes = _create_pred_input_plot(
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values, axes)
axes = _create_pred_output_plot(
x_sub_data,
y_sub_data,
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
axes,
hline_kwargs={
'color': colors[idx],
'linestyle': ':'
},
)
set_plot_appearance(axes,
attributes,
plot_title=feature,
plot_xlabel=feature,
plot_ylabel=label,
plot_xlim=feature,
plot_ylim=label)
plt.legend(**LEGEND_KWARGS)
filename = (f"scatterplot_{basename}_{feature}_"
f"{group.replace(', ', '-')}")
plot_path = get_plot_filename(filename, cfg)
plt.savefig(plot_path,
**cfg.get('savefig_kwargs', {}))
logger.info("Wrote %s", plot_path)
plt.close()
# Provenance
provenance_record = get_provenance_record(
attributes, [feature, label],
caption=get_caption(attributes, feature, label, group=group),
plot_type='scatter')
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(plot_path, provenance_record)
# Write netCDF file
cubes = iris.cube.CubeList([
pandas_object_to_cube(
x_sub_data,
index_droplevel=index_droplevel,
var_name=feature,
long_name=attributes[feature]['plot_xlabel'],
units=attributes[feature]['units']),
pandas_object_to_cube(
y_sub_data,
index_droplevel=index_droplevel,
var_name=label,
long_name=attributes[label]['plot_ylabel'],
units=attributes[label]['units']),
])
netcdf_path = get_diagnostic_filename(filename, cfg)
io.iris_save(cubes, netcdf_path)
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(netcdf_path, provenance_record)
[docs]
def plot_merged_scatterplots(training_data, pred_input_data, attributes,
basename, cfg):
"""Plot merged scatterplots (all groups in one plot).
Plot scatterplots for all pairs of ``(feature, label)`` data (all groups in
one plot).
Parameters
----------
training_data : pandas.DataFrame
Training data (features, label).
pred_input_data : pandas.DataFrame
Prediction input data (mean and error).
attributes : dict
Plot attributes for the different features and the label data.
basename : str
Basename for the name of the file.
cfg : dict
Recipe configuration.
"""
logger.info("Plotting merged scatterplots")
label = training_data.y.columns[0]
groups = get_groups(training_data,
add_combined_group=cfg.get('combine_groups', False))
numbers_as_markers = cfg.get('numbers_as_markers', False)
plot_regression_line_mean = cfg.get('plot_regression_line_mean', False)
colors = get_colors(cfg)
# Iterate over features
for feature in training_data.x.columns:
(x_data, y_data) = get_xy_data_without_nans(training_data, feature,
label)
(_, axes) = plt.subplots()
if len(groups) > 1 and cfg.get('combine_groups', False):
axes = _create_regplot(
x_data,
y_data,
axes=axes,
line_kwargs={
'color': COLOR_COMBINED_GROUPS,
'label': groups[0],
'linestyle': '-'
},
fill_between_kwargs={
'color': COLOR_COMBINED_GROUPS,
'alpha': 0.3
},
mean_kwargs=(None
if not cfg.get('plot_regression_line_mean') else {
'color': COLOR_COMBINED_GROUPS,
'marker': 'o'
}),
)
axes = _create_pred_output_plot(
x_data,
y_data,
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
axes,
hline_kwargs={
'color': COLOR_COMBINED_GROUPS,
'linestyle': ':'
},
)
for (idx, group) in enumerate(groups[1:]):
axes = _create_scatterplot(
x_data.loc[group],
y_data.loc[group],
numbers_as_markers=numbers_as_markers,
plot_regression_line_mean=plot_regression_line_mean,
axes=axes,
color=colors[idx],
label=group,
)
axes = _create_pred_output_plot(
x_data.loc[group],
y_data.loc[group],
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
axes,
hline_kwargs={
'color': colors[idx],
'linestyle': ':'
},
)
else:
for (idx, group) in enumerate(groups):
axes = _create_scatterplot(
x_data.loc[group],
y_data.loc[group],
numbers_as_markers=numbers_as_markers,
plot_regression_line_mean=plot_regression_line_mean,
axes=axes,
color=colors[idx],
label=group,
)
axes = _create_pred_output_plot(
x_data.loc[group],
y_data.loc[group],
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
axes,
hline_kwargs={
'color': colors[idx],
'linestyle': ':'
},
)
axes = _create_pred_input_plot(
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values, axes)
set_plot_appearance(axes,
attributes,
plot_title=feature,
plot_xlabel=feature,
plot_ylabel=label,
plot_xlim=feature,
plot_ylim=label)
plt.legend(**LEGEND_KWARGS)
filename = f'scatterplot_merged_{basename}_{feature}'
plot_path = get_plot_filename(filename, cfg)
plt.savefig(plot_path,
**cfg.get('savefig_kwargs', {}))
logger.info("Wrote %s", plot_path)
plt.close()
# Provenance
provenance_record = get_provenance_record(attributes, [feature, label],
caption=get_caption(
attributes, feature,
label),
plot_type='scatter')
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(plot_path, provenance_record)
# Write netCDF file
cubes = iris.cube.CubeList([
pandas_object_to_cube(x_data,
index_droplevel=[0, 2],
var_name=feature,
long_name=attributes[feature]['plot_xlabel'],
units=attributes[feature]['units']),
pandas_object_to_cube(y_data,
index_droplevel=[0, 2],
var_name=label,
long_name=attributes[label]['plot_ylabel'],
units=attributes[label]['units']),
])
netcdf_path = get_diagnostic_filename(filename, cfg)
io.iris_save(cubes, netcdf_path)
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(netcdf_path, provenance_record)
[docs]
def create_simple_scatterplot(x_data, y_data, obs_mean, obs_std):
"""Create simple scatterplot of an emergent relationship (without saving).
Parameters
----------
x_data : numpy.ndarray
X data of the emergent constraint.
y_data : numpy.ndarray
Y data of the emergent constraint.
obs_mean : float
Mean of observational data.
obs_std : float
Standard deviation of observational data.
"""
logger.debug("Plotting simple scatterplot")
(fig, axes) = plt.subplots()
axes.scatter(x_data, y_data, color='k', marker='o')
line_kwargs = {'color': 'C1', 'linestyle': '-'}
fill_between_kwargs = {**line_kwargs, 'alpha': 0.3}
axes = _create_regplot(x_data, y_data, axes=axes, line_kwargs=line_kwargs,
fill_between_kwargs=fill_between_kwargs)
axes = _create_pred_input_plot(obs_mean, obs_std, axes)
return (fig, axes)
[docs]
def plot_target_distributions(training_data, pred_input_data, attributes,
basename, cfg):
"""Plot distributions of target variable for every feature.
Parameters
----------
training_data : pandas.DataFrame
Training data (features, label).
pred_input_data : pandas.DataFrame
Prediction input data (mean and error).
attributes : dict
Plot attributes for the different features and the label data.
basename : str
Basename for the name of the file.
cfg : dict
Recipe configuration.
"""
logger.info("Plotting distributions of target variable")
label = training_data.y.columns[0]
groups = get_groups(training_data,
add_combined_group=cfg['combine_groups'])
summary_columns = pd.MultiIndex.from_product(
[groups, ['best estimate', 'range', 'min', 'max']])
summaries = []
# Iterate over features
for feature in training_data.x.columns:
(x_data, y_data) = get_xy_data_without_nans(training_data, feature,
label)
colors = get_colors(cfg, groups=groups)
summary_for_feature = pd.Series(
index=summary_columns, name=feature, dtype=np.float64
)
# Iterate over groups
for (idx, group) in enumerate(groups):
try:
x_sub_data = x_data.loc[group]
y_sub_data = y_data.loc[group]
except KeyError:
x_sub_data = x_data
y_sub_data = y_data
(y_lin, y_pdf) = target_pdf(
x_sub_data,
y_sub_data,
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
)
# Plots
axes = sns.histplot(y_sub_data,
bins=7,
stat='density',
color=colors[idx],
alpha=0.4)
axes.plot(y_lin,
y_pdf,
color=colors[idx],
linestyle='-',
label=group)
# Print results
(y_min, y_mean, y_max) = get_constraint(
x_sub_data,
y_sub_data,
pred_input_data['mean'][feature].values,
pred_input_data['error'][feature].values,
confidence_level=cfg['confidence_level'],
)
y_error = np.max([y_max - y_mean, y_mean - y_min])
reg = linregress(x_sub_data.values, y_sub_data.values)
logger.info(
"Constrained %s for feature '%s' and group '%s': %.2f ± %.2f "
"(%i%% confidence level), R2 = %f, p = %f", label,
feature, group, y_mean, y_error,
int(100.0 * cfg['confidence_level']), reg.rvalue**2,
reg.pvalue)
# Save results of group
summary_for_feature[(group, 'best estimate')] = y_mean
summary_for_feature[(group, 'range')] = y_max - y_min
summary_for_feature[(group, 'min')] = y_min
summary_for_feature[(group, 'max')] = y_max
# Save results for feature
summaries.append(summary_for_feature.to_frame().T)
# Plot appearance
set_plot_appearance(axes, attributes, plot_title=feature)
axes.set_xlabel(attributes[label]['plot_ylabel'])
axes.set_ylabel('Probability density')
if attributes[label]['plot_ylim'] is not None:
axes.set_xlim(attributes[label]['plot_ylim'])
axes.set_ylim([0.0, 1.0])
plt.legend(loc='best')
# Save plot
plot_path = get_plot_filename(
f'target_distribution_{basename}_{feature}', cfg)
plt.savefig(plot_path,
**cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Provenance
provenance_record = get_provenance_record(
attributes, [feature, label],
caption=(f"{attributes[feature]['plot_title']}: Probability "
f"densitiy of {label}."),
plot_type='probability')
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(plot_path, provenance_record)
# Print mean results
summary = pd.concat(summaries)
with pd.option_context(*PANDAS_PRINT_OPTIONS):
logger.info("Constrained ranges:\n%s", summary)
summary = summary.mean(axis=0)
logger.info("Mean of constrained ranges:\n%s", summary)
[docs]
def export_csv(data_frame, attributes, basename, cfg, tags=None):
"""Export CSV file.
Parameters
----------
data_frame : pandas.DataFrame
Data to export.
attributes : dict
Plot attributes for the different features and the label data. Used to
retrieve provenance information.
basename : str
Basename for the name of the file.
cfg : dict
Recipe configuration.
tags : iterable of str, optional
Tags for which provenance information should be retrieved (using
``attributes``). If not specified, use (last level of) columns of the
given ``data_frame``.
Returns
-------
str
Path to the new CSV file.
"""
logger.info("Exporting CSV file for '%s'", basename)
csv_path = get_diagnostic_filename(basename, cfg).replace('.nc', '.csv')
data_frame.to_csv(csv_path)
logger.info("Wrote %s", csv_path)
if tags is None:
tags = data_frame.columns.get_level_values(-1)
provenance_record = get_provenance_record(attributes,
tags,
caption=basename)
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(csv_path, provenance_record)
return csv_path
[docs]
def standard_prediction_error(x_data, y_data):
"""Return a function to calculate standard prediction error.
The standard prediction error of a linear regression is the error when
predicting a data point which was not used to fit the regression line in
the first place.
Parameters
----------
x_data : numpy.ndarray
X data used to fit the linear regression.
y_data : numpy.ndarray
Y data used to fit the linear regression.
Returns
-------
callable
Function that takes a :obj:`float` as single argument (representing the
X value of a new data point) and returns the standard prediction error
for that.
"""
(x_data, y_data) = _check_x_y_arrays(x_data, y_data)
reg = linregress(x_data, y_data)
y_pred = reg.slope * x_data + reg.intercept
n_data = x_data.shape[0]
see = np.sqrt(np.sum(np.square(y_data - y_pred)) / (n_data - 2))
x_mean = np.mean(x_data)
ssx = np.sum(np.square(x_data - x_mean))
def spe(x_new):
"""Return standard prediction error."""
return see * np.sqrt(1.0 + 1.0 / n_data + (x_new - x_mean)**2 / ssx)
return spe
[docs]
def regression_line(x_data, y_data, n_points=1000):
"""Return x and y coordinates of the regression line (mean and error).
Parameters
----------
x_data : numpy.ndarray
X data used to fit the linear regression.
y_data : numpy.ndarray
Y data used to fit the linear regression.
n_points : int, optional (default: 1000)
Number of points for the regression lines.
Returns
-------
dict
:class:`numpy.ndarray` s for the keys ``'x'``, ``'y'``,
``'y_minus_err'``, ``'y_plus_err'``, ``'slope'``, ``'intercept'``,
``'pvalue'`` and ``'rvalue'``.
"""
(x_data, y_data) = _check_x_y_arrays(x_data, y_data)
spe = np.vectorize(standard_prediction_error(x_data, y_data))
out = {}
reg = linregress(x_data, y_data)
x_range = np.max(x_data) - np.min(x_data)
x_lin = np.linspace(
np.min(x_data) - x_range,
np.max(x_data) + x_range, n_points)
out['x'] = x_lin
out['y'] = reg.slope * x_lin + reg.intercept
out['y_minus_err'] = out['y'] - spe(x_lin)
out['y_plus_err'] = out['y'] + spe(x_lin)
out['slope'] = reg.slope
out['intercept'] = reg.intercept
out['pvalue'] = reg.pvalue
out['rvalue'] = reg.rvalue
return out
[docs]
def target_pdf(x_data,
y_data,
obs_mean,
obs_std,
n_points=1000,
necessary_p_value=None):
"""Calculate probability density function (PDF) for target variable.
Parameters
----------
x_data : numpy.ndarray
X data of the emergent constraint.
y_data : numpy.ndarray
Y data of the emergent constraint.
obs_mean : float
Mean of observational data.
obs_std : float
Standard deviation of observational data.
n_points : int, optional (default: 1000)
Number of sampled points for PDF of target variable.
necessary_p_value : float, optional
If given, return unconstrained PDF (using Gaussian distribution with
unconstrained mean and standard deviation) when `p`-value of emergent
relationship is greater than the given necessary `p`-value.
Returns
-------
tuple of numpy.ndarray
x and y values for the PDF.
"""
(y_lin, y_pdf, _) = _get_target_pdf(x_data,
y_data,
obs_mean,
obs_std,
n_points=n_points,
necessary_p_value=necessary_p_value)
return (y_lin, y_pdf)
[docs]
def cdf(data, pdf):
"""Calculate cumulative distribution function for a 1-dimensional PDF.
Parameters
----------
data : numpy.ndarray
Data points (1D array).
pdf : numpy.ndarray
Corresponding probability density function (PDF).
Returns
-------
numpy.ndarray
Corresponding cumulative distribution function (CDF).
"""
idx_range = range(1, len(data) + 1)
cum_dens = [simpson(pdf[:idx], x=data[:idx]) for idx in idx_range]
return np.array(cum_dens)
[docs]
def constraint_info_array(x_data,
y_data,
obs_mean,
obs_std,
n_points=1000,
necessary_p_value=None):
"""Get array with all relevant parameters of emergent constraint.
Parameters
----------
x_data : numpy.ndarray
X data of the emergent constraint.
y_data : numpy.ndarray
Y data of the emergent constraint.
obs_mean : float
Mean of observational data.
obs_std : float
Standard deviation of observational data.
n_points : int, optional (default: 1000)
Number of sampled points for PDF of target variable.
necessary_p_value : float, optional
If given, replace constrained mean and standard deviation with
unconstrained values when `p`-value of emergent relationship is greater
than the given necessary `p`-value.
Returns
-------
numpy.ndarray
Array of shape (8,) with the elements:
0. Constrained mean of target variable.
1. Constrained standard deviation of target variable.
2. Unconstrained mean of target variable.
3. Unconstrained standard deviation of target variable.
4. Slope of emergent relationship.
5. Intercept of emergent relationship.
6. Correlation coefficient `r` of emergent relationship.
7. `p`-value of emergent relationship.
"""
(y_lin, y_pdf, reg) = _get_target_pdf(x_data,
y_data,
obs_mean,
obs_std,
n_points=n_points,
necessary_p_value=necessary_p_value)
norm = np.sum(y_pdf)
y_mean = np.sum(y_lin * y_pdf) / norm
y_std = np.sqrt(np.sum((y_lin - y_mean)**2 * y_pdf) / norm)
info = [
y_mean, y_std,
np.ma.mean(y_data),
np.ma.std(y_data), reg.slope, reg.intercept, reg.rvalue, reg.pvalue
]
return np.array(info)
[docs]
def get_constraint(x_data, y_data, obs_mean, obs_std, confidence_level=0.66):
"""Get constraint on target variable.
Parameters
----------
x_data : numpy.ndarray
X data of the emergent constraint.
y_data : numpy.ndarray
Y data of the emergent constraint.
obs_mean : float
Mean of observational data.
obs_std : float
Standard deviation of observational data.
confidence_level : float, optional (default: 0.66)
Confindence level to estimate the range of the target variable.
Returns
-------
tuple of float
Lower confidence limit, best estimate and upper confidence limit of
target variable.
"""
(x_data, y_data) = _check_x_y_arrays(x_data, y_data)
(y_lin, y_pdf) = target_pdf(x_data, y_data, obs_mean, obs_std)
y_mean = np.sum(y_lin * y_pdf) / np.sum(y_pdf)
y_cdf = cdf(y_lin, y_pdf)
y_index_range = np.nonzero((y_cdf >= (1.0 - confidence_level) / 2.0)
& (y_cdf <= (1.0 + confidence_level) / 2.0))
y_range = y_lin[y_index_range]
return (np.min(y_range), y_mean, np.max(y_range))
[docs]
def get_constraint_from_df(training_data,
pred_input_data,
confidence_level=0.66):
"""Get constraint on target variable from :class:`pandas.DataFrame`.
Parameters
----------
training_data : pandas.DataFrame
Training data (features, label).
pred_input_data : pandas.DataFrame
Prediction input data (mean and error).
confidence_level : float, optional (default: 0.66)
Confindence level to estimate the range of the target variable.
Returns
-------
tuple of float
Lower confidence limit, best estimate and upper confidence limit of
target variable.
"""
if len(training_data.columns) != 2:
raise ValueError(
f"Expected exactly two columns for training data (feature and "
f"label), got {len(training_data.columns):d}")
if len(pred_input_data.columns) != 2:
raise ValueError(
f"Expected exactly two columns for prediction input data (mean "
f"and error, got {len(pred_input_data.columns):d}")
# Extract data
label = training_data.y.columns[0]
feature = training_data.x.columns[0]
(x_data, y_data) = get_xy_data_without_nans(training_data, feature, label)
x_pred = pred_input_data['mean'][feature].values[0]
x_pred_error = pred_input_data['error'][feature].values[0]
# Calculate constraint
constraint = get_constraint(x_data,
y_data,
x_pred,
x_pred_error,
confidence_level=confidence_level)
return constraint