"""Base class for MLR models.
Example recipe
--------------
The :ref:`MLR main diagnostic script<api.esmvaltool.diag_scripts.mlr.main>`
provides an interface for using MLR models in recipes. The following recipe
shows a typical example on how to setup MLR recipes/diagnostics with the
following properties:
#. Setup an MLR model with target variable ``y`` (using the tag ``Y``) and
three predictors ``x1``, ``x2`` and ``latitude`` (with tags ``X1``, ``X2``
and ``latitude``, respectively). The target variable needs the attribute
``var_type: label``; the predictors ``x1`` and ``x2`` the attribute
``var_type: feature``. The coordinate feature ``latitude`` is added via the
option ``coords_as_features: [latitude]``.
#. Suppose ``y`` and ``x1`` are 3D fields (pressure, latitude, longitude);
``x2`` is a 2D field (latitude, longitude). Thus, it is necessary to add the
attribute ``broadcast_from: [1, 2]`` to it (see ``dim_map`` parameter in
:func:`iris.util.broadcast_to_shape` for details). In order to consider
multiple climate models (``A``, ``B`` and ``C``) at once, the option
``group_datasets_by_attributes: [dataset]`` is necessary. Otherwise the
diagnostic will complain about duplicate data.
#. For the prediction, data from dataset ``D`` is used (with
``var_type: prediction_input``). For the feature ``X1`` additional input
error (with ``var_type: prediction_input_error``) is used.
.. code-block:: yaml
diag_feature_x1:
variables:
feature:
... # specify project, mip, start_year, end_year, etc.
short_name: x1
var_type: feature
tag: X1
additional_datasets:
- {dataset: A, ...}
- {dataset: B, ...}
- {dataset: C, ...}
prediction_input:
... # specify project, mip, start_year, end_year, etc.
short_name: x1
var_type: prediction_input
tag: X1
additional_datasets:
- {dataset: D, ...}
prediction_input_error:
... # specify project, mip, start_year, end_year, etc.
short_name: x1Stderr
var_type: prediction_input_error
tag: X1
additional_datasets:
- {dataset: D, ...}
scripts:
null
diag_feature_x2:
variables:
feature:
... # specify project, mip, start_year, end_year, etc.
short_name: x2
var_type: feature
broadcast_from: [1, 2]
tag: X2
additional_datasets:
- {dataset: A, ...}
- {dataset: B, ...}
- {dataset: C, ...}
prediction_input:
... # specify project, mip, start_year, end_year, etc.
short_name: x2
var_type: prediction_input
broadcast_from: [1, 2]
tag: X2
additional_datasets:
- {dataset: D, ...}
scripts:
null
diag_label:
variables:
label:
... # specify project, mip, start_year, end_year, etc.
short_name: y
var_type: label
tag: Y
additional_datasets:
- {dataset: A, ...}
- {dataset: B, ...}
- {dataset: C, ...}
scripts:
null
#. In this example, a
`GBRT model
<https://scikit-learn.org/stable/modules/ensemble.html
#gradient-tree-boosting>`_ (with ``mlr_model_type: gbr_sklearn``) is used.
Parameters for this are specified via ``parameters_final_regressor``. Apart
from the best-estimate prediction, the estimated MLR model error
(``save_mlr_model_error: test``) and the propagated prediction input error
(``save_propagated_errors: true``) are returned.
#. With ``postprocess.py``, the global mean of the best estimate prediction and
the corresponding errors (MLR model + propagated input error) are calculted.
.. code-block:: yaml
diag_mlr_gbrt:
scripts:
mlr:
script: mlr/main.py
ancestors: [
'diag_label/y',
'diag_feature_*/*',
]
coords_as_features: [latitude]
group_datasets_by_attributes: [dataset]
mlr_model_name: GBRT
mlr_model_type: gbr_sklearn
parameters_final_regressor:
learning_rate: 0.1
n_estimators: 100
save_mlr_model_error: test
save_propagated_errors: true
postprocess:
script: mlr/postprocess.py
ancestors: ['diag_mlr_gbrt/mlr']
ignore:
- {var_type: null}
mean: [pressure, latitude, longitude]
#. Plots of the global distribution (latitude, longitude) are created with
``plot.py`` after calculating the mean over the pressure coordinate using
``preprocess.py``.
.. code-block:: yaml
diag_plot:
scripts:
preprocess:
script: mlr/preprocess.py
ancestors: ['diag_mlr_gbrt/mlr']
collapse: [pressure]
ignore:
- {var_type: null}
plot:
script: mlr/plot.py
ancestors: ['diag_plot/preprocess']
plot_map:
plot_kwargs:
cbar_label: 'Y'
cbar_ticks: [0, 1, 2, 3]
vmin: 0
vmax: 3
All datasets must have the attribute ``var_type`` which specifies the type of
the dataset. Possible values are ``feature`` (independent variables used for
training/testing), ``label`` (dependent variables, y-axis),
``prediction_input`` (independent variables used for prediction of dependent
variables, usually observational data), ``prediction_input_error`` (standard
error of the ``prediction_input`` data, optional) or ``prediction_reference``
(`true` values for the ``prediction_input`` data, optional). In addition, all
datasets must habe the attribute ``tag``, which specifies the name of
variable/diagnostic. All datasets can be converted to new units in the loading
step by specifying the key ``convert_units_to`` in the respective dataset(s).
Training data
-------------
All groups (specified in ``group_datasets_by_attributes``, if desired) given
for ``label`` datasets must also be given for the ``feature`` datasets. Within
these groups, all ``feature`` and ``label`` datasets must have the same shape,
except the attribute ``broadcast_from`` is set to a list of suitable coordinate
indices to map this dataset to regular datasets (see parameter ``dim_map`` in
:func:`iris.util.broadcast_to_shape`).
Prediction data
---------------
All ``tag`` s specified for ``prediction_input`` datasets must also be given
for the ``feature`` datasets (except ``allow_missing_features`` is set to
``True``). Multiple predictions can be specified by ``prediction_name``.
Within these predictions, all ``prediction_input`` datasets must have the same
shape, except the attribute ``broadcast_from`` is given. Errors in the
prediction input data can be specified by ``prediction_input_error``. If given,
these errors are used to calculate errors in the final prediction using linear
error propagation given by `LIME <https://arxiv.org/abs/1602.04938>`_.
Additionally, `true` values for ``prediction_input`` can be specified with
``prediction_reference`` datasets (together with the respective
``prediction_name``). This allows an evaluation of the performance of the MLR
model by calculating residuals (`true` minus predicted values).
Available MLR models
--------------------
MLR models are subclasses of this base class. A list of all available MLR
models can be found :ref:`here <availableMLRModels>`. To add a new MLR model,
create a new file in ``esmvaltool/diag_scripts/mlr/models/`` with a child class
of :class:`esmvaltool.diag_scripts.mlr.models.MLRModel` decorated with
:meth:`esmvaltool.diag_scripts.mlr.models.MLRModel.register_mlr_model`.
.. _MLRModeloptionalparameters:
Optional parameters for class initialization
--------------------------------------------
accept_only_scalar_data: bool (default: False)
If set to ``True``, only accept scalar input data. Should be used together
with the option ``group_datasets_by_attributes``.
allow_missing_features: bool (default: False)
Allow missing features in the training data.
cache_intermediate_results: bool (default: True)
Cache the intermediate results of the pipeline's transformers.
categorical_features: list of str
Names of features which are interpreted as categorical features (in
contrast to numerical features).
coords_as_features: list of str
If given, specify a list of coordinates which should be used as features.
dtype: str (default: 'float64')
Internal data type which is used for all calculations, see
`<https://docs.scipy.org/doc/numpy/user/basics.types.html>`_ for a list of
allowed values.
fit_kwargs: dict
Optional keyword arguments for the pipeline's ``fit()`` function. These
arguments have to be given for each step of the pipeline separated by two
underscores, i.e. ``s__p`` is the parameter ``p`` for step ``s``.
group_datasets_by_attributes: list of str
List of dataset attributes which are used to group input data for
``feature`` s and ``label`` s. For example, this is necessary if the MLR
model should consider multiple climate models in the training phase. If
this option is not given, specifying multiple datasets with identical
``var_type`` and ``tag`` entries results in an error. If given, all the
input data is first grouped by the given attributes and then checked for
uniqueness within this group. After that, all groups are stacked to form a
single set of training data.
imputation_strategy: str (default: 'remove')
Strategy for the imputation of missing values in the features. Must be one
of ``'remove'``, ``'mean'``, ``'median'``, ``'most_frequent'`` or
``'constant'``.
log_level: str (default: 'info')
Verbosity for the logger. Must be one of ``'debug'``, ``'info'``,
``'warning'`` or ``'error'``.
mlr_model_name: str
Human-readable name of the MLR model instance (e.g used for labels).
n_jobs: int (default: 1)
Maximum number of jobs spawned by this class. Use ``-1`` to use all
processors. More details are given `here
<https://scikit-learn.org/stable/glossary.html#term-n-jobs>`_.
output_file_type: str (default: 'png')
File type for the plots.
parameters: dict
Parameters used for the whole pipeline. Have to be given for each step of
the pipeline separated by two underscores, i.e. ``s__p`` is the parameter
``p`` for step ``s``. ``random_state`` parameters are explicitly allowed
here (in contrast to ``parameters_final_regressor``).
parameters_final_regressor: dict
Parameters used for the **final** regressor. If these parameters are
updated using the function :meth:`update_parameters`, the new names have to
be given for each step of the pipeline separated by two underscores, i.e.
``s__p`` is the parameter ``p`` for step ``s``. Note: to pass an argument
for ``random_state``, use the option ``random_state`` of this class.
pca: bool (default: False)
Preprocess numerical input features using PCA. Parameters for this pipeline
step can be given via the ``parameters`` argument.
plot_dir: str (default: ~/plots)
Root directory to save plots.
plot_units: dict
Replace specific units (keys) with other text (values) in plots.
random_state: int or None (default: None)
Random seed for :class:`numpy.random.RandomState` that is used by all
functionalities of this class that require randomness (e.g., probabilistic
ML algorithms like Gradient Boosting Regression models, random train test
splits, etc.). If ``None``, use a random seed. Use an :obj:`int` to get
reproducible results. See `<https://scikit-learn.org/stable/
common_pitfalls.html#controlling-randomness>`__ for more details.
savefig_kwargs: dict
Keyword arguments for :func:`matplotlib.pyplot.savefig`.
seaborn_settings: dict
Options for :func:`seaborn.set_theme` (affects all plots).
standardize_data: bool (default: True)
Linearly standardize numerical input data by removing mean and scaling to
unit variance.
sub_dir: str
Create additional subdirectory for output in ``work_dir`` and ``plot_dir``.
test_size: float (default: 0.25)
If given, randomly exclude the desired fraction of input data from training
and use it as test data.
weighted_samples: dict
If specified, use weighted samples in the loss function used for the
training of the MLR model. The given keyword arguments are directly passed
to :func:`esmvaltool.diag_scripts.mlr.get_all_weights` to calculate the
sample weights. By default, no weights are used. Raises errors if the
desired weights cannot be calculated for the data, e.g., when
``time_weighted=True`` is used but the data does not contain a dimension
``time``.
work_dir: str (default: ~/work)
Root directory to save all other files (mainly ``*.nc`` files).
"""
import importlib
import logging
import os
import warnings
from copy import deepcopy
from inspect import getfullargspec
from pprint import pformat
import iris
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from cf_units import Unit
from joblib import Parallel, delayed
from lime.lime_tabular import LimeTabularExplainer
from matplotlib.ticker import ScalarFormatter
from scipy.stats import shapiro
from sklearn import metrics
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from sklearn.impute import SimpleImputer
from sklearn.inspection import PartialDependenceDisplay
from sklearn.model_selection import (
GridSearchCV,
LeaveOneGroupOut,
LeaveOneOut,
train_test_split,
)
from sklearn.preprocessing import StandardScaler
from esmvaltool.diag_scripts import mlr
from esmvaltool.diag_scripts.mlr.custom_sklearn import (
AdvancedPipeline,
AdvancedRFECV,
AdvancedTransformedTargetRegressor,
cross_val_score_weighted,
get_rfecv_transformer,
perform_efecv,
)
from esmvaltool.diag_scripts.shared import (
ProvenanceLogger,
group_metadata,
io,
select_metadata,
)
logger = logging.getLogger(os.path.basename(__file__))
[docs]
class MLRModel():
"""Base class for MLR models."""
_CLF_TYPE = None
_MODELS = {}
_MLR_MODEL_TYPE = None
@staticmethod
def _load_mlr_models():
"""Load MLR models from :mod:`esmvaltool.diag_scripts.mlr.models`."""
current_path = os.path.dirname(os.path.realpath(__file__))
models_path = os.path.join(current_path)
for (root, _, model_files) in os.walk(models_path):
for model_file in model_files:
rel_path = ('' if root == models_path else os.path.relpath(
root, models_path))
module = os.path.join(rel_path,
os.path.splitext(model_file)[0])
try:
importlib.import_module(
f"esmvaltool.diag_scripts.mlr.models."
f"{module.replace(os.sep, '.')}"
)
except ImportError:
pass
[docs]
@classmethod
def register_mlr_model(cls, mlr_model_type):
"""Add MLR model (subclass of this class) (decorator)."""
logger.debug("Found available MLR model '%s'", mlr_model_type)
def decorator(subclass):
"""Decorate subclass."""
subclass._MLR_MODEL_TYPE = mlr_model_type
cls._MODELS[mlr_model_type] = subclass
return subclass
return decorator
[docs]
@classmethod
def create(cls, mlr_model_type, *args, **kwargs):
"""Create desired MLR model subclass (factory method)."""
cls._load_mlr_models()
if not cls._MODELS:
raise NotImplementedError(
f"Cannot initialize new MLR model with type "
f"'{mlr_model_type}', no MLR models found. Please add "
f"subclasses of {cls} in new files under 'esmvaltool/"
f"diag_scripts/mlr/models/' decorated by 'esmvaltool."
f"diag_scripts.mlr.models.{cls.__name__}."
f"register_mlr_model()'")
if mlr_model_type not in cls._MODELS:
raise NotImplementedError(
f"MLR model type '{mlr_model_type}' not found in 'esmvaltool/"
f"diag_scripts/mlr/models/'")
subclass = cls._MODELS[mlr_model_type]
logger.info(
"Initialized MLR model with type '%s' and final regressor %s",
mlr_model_type, subclass._CLF_TYPE)
return subclass(*args, **kwargs)
def __init__(self, input_datasets, **kwargs):
"""Initialize class members.
Parameters
----------
input_datasets : list of dict
List of dataset metadata used as data for the MLR model.
**kwargs
Optional keyword arguments, see next sections.
Raises
------
NotImplementedError
Class is initialized directly without the use of its factory
function ``create()``.
ValueError
Invalid data given.
"""
self._check_clf()
# Private attributes
self._cfg = deepcopy(kwargs)
self._clf = None
self._lime_explainer = None
self._data = {}
self._data['pred'] = {}
self._datasets = {}
self._classes = {}
self._parameters = {}
# Set default settings
self._set_default_settings()
# Random state
self._random_state = np.random.RandomState(self._cfg['random_state'])
# Seaborn
sns.set_theme(**self._cfg.get('seaborn_settings', {}))
# Adapt output directories
self._cfg['mlr_work_dir'] = os.path.join(self._cfg['work_dir'],
self._cfg['sub_dir'])
self._cfg['mlr_plot_dir'] = os.path.join(self._cfg['plot_dir'],
self._cfg['sub_dir'])
if not os.path.exists(self._cfg['mlr_work_dir']):
os.makedirs(self._cfg['mlr_work_dir'])
logger.info("Created %s", self._cfg['mlr_work_dir'])
if not os.path.exists(self._cfg['mlr_plot_dir']):
os.makedirs(self._cfg['mlr_plot_dir'])
logger.info("Created %s", self._cfg['mlr_plot_dir'])
# Load datasets, classes and training data
self._load_input_datasets(input_datasets)
self._load_classes()
self._load_data()
# Create pipeline (with all preprocessor steps and final regressor)
self.reset_pipeline()
if self._cfg['parameters']:
logger.debug("Using parameter(s): %s", self._cfg['parameters'])
self.update_parameters(**self._cfg['parameters'])
# Log successful initialization
logger.info("Initialized MLR model (using at most %i processes)",
self._cfg['n_jobs'])
logger.debug("With parameters")
logger.debug(pformat(self.parameters))
@property
def categorical_features(self):
"""numpy.ndarray: Categorical features."""
return self.features[self._classes['features'].categorical]
@property
def data(self):
"""dict: Input data of the MLR model."""
return self._data
@property
def features(self):
"""numpy.ndarray: Features of the input data."""
return self._classes['features'].index.values
@property
def features_after_preprocessing(self):
"""numpy.ndarray: Features of the input data after preprocessing."""
x_train = self.data['train'].x
y_train = self.get_y_array('train')
try:
self._check_fit_status('Calculating features after preprocessing')
except NotFittedError:
self._clf.fit_transformers_only(x_train, y_train,
**self.fit_kwargs)
x_trans = self._clf.transform_only(x_train)
features = self.features
n_features_may_drop = False
if 'feature_selection' in self._clf.named_steps:
support = self._clf.named_steps['feature_selection'].support
features = features[support]
n_features_may_drop = True
if 'pca' in self._clf.named_steps:
categorical_features = np.array([
f for f in features if f in self.categorical_features])
n_numerical_features = x_trans.shape[1] - categorical_features.size
features = [
f'Principal component {idx}'
for idx in range(n_numerical_features)
]
features.extend(categorical_features)
n_features_may_drop = True
if not n_features_may_drop and x_trans.shape[1] != self.features.size:
logger.warning(
"Number of features decreased from %i to %i during "
"preprocessing for unknown reasons (neither feature selection "
"using recursive feature elimination nor PCA is performed)",
self.features.size, x_trans.shape[1])
features = [
f'Unknown feature {idx}' for idx in range(x_trans.shape[1])
]
return np.array(features, dtype='str')
@property
def features_types(self):
"""pandas.Series: Types of the features."""
return self._classes['features'].types
@property
def features_units(self):
"""pandas.Series: Units of the features."""
return self._classes['features'].units
@property
def fit_kwargs(self):
"""dict: Keyword arguments for :meth:`fit`."""
fit_kwargs = self._cfg['fit_kwargs']
fit_kwargs = self._update_fit_kwargs(fit_kwargs)
verbosity_kwargs = self._get_verbosity_parameters(self._clf.fit)
for (key, val) in verbosity_kwargs.items():
fit_kwargs.setdefault(key, val)
return fit_kwargs
@property
def group_attributes(self):
"""numpy.ndarray: Group attributes of the input data."""
return self._classes['group_attributes']
@property
def label(self):
"""str: Label of the input data."""
return self._classes['label'].index.values[0]
@property
def label_units(self):
"""str: Units of the label."""
return self._classes['label'].units.values[0]
@property
def mlr_model_type(self):
"""str: MLR model type."""
return self._MLR_MODEL_TYPE
@property
def numerical_features(self):
"""numpy.ndarray: Numerical features."""
return self.features[~self._classes['features'].categorical]
@property
def parameters(self):
"""dict: Parameters of the complete MLR model pipeline."""
return self._parameters
@property
def random_state(self):
"""numpy.random.RandomState: Random state instance."""
return self._random_state
[docs]
def efecv(self, **kwargs):
"""Perform exhaustive feature elimination using cross-validation.
Parameters
----------
**kwargs : keyword arguments, optional
Additional options for :func:`esmvaltool.diag_scripts.mlr.
custom_sklearn.cross_val_score_weighted`.
"""
logger.info(
"Performing exhaustive feature elimination using cross-validation "
"with final regressor %s on %i training points (thiy may take a "
"while...)", self._CLF_TYPE,
len(self.data['train'].index))
# Get fit parameters
fit_kwargs = deepcopy(self.fit_kwargs)
keys_to_remove = []
for key in fit_kwargs:
if key.endswith('eval_set'):
keys_to_remove.append(key)
for key in keys_to_remove:
logger.warning(
"Fit parameter '%s' is not supported for efecv()", key)
fit_kwargs.pop(key)
# Get other keyword arguments
kwargs = deepcopy(kwargs)
verbosity_kwargs = self._get_verbosity_parameters(
cross_val_score_weighted)
for (key, val) in verbosity_kwargs.items():
kwargs.setdefault(key, val)
kwargs.setdefault('n_jobs', self._cfg['n_jobs'])
kwargs['fit_params'] = fit_kwargs
kwargs['sample_weights'] = self._get_sample_weights('train')
if kwargs.get('cv') == 'logo':
kwargs.update(self._get_logo_cv_kwargs())
# Exhaustive feature selection
(self._clf, transformer) = perform_efecv(
self._clf, self.data['train'].x, self.get_y_array('train'),
**kwargs)
self._clf.steps.insert(0, ('feature_selection', transformer))
# Log results
new_features = self.features[transformer.support]
logger.info(
"Exhaustive feature elimination was successful, %i of the %i "
"features remain", new_features.size, self.features.size)
logger.info("Old features: %s", self.features)
logger.info("New features: %s", new_features)
logger.info("Successfully fitted MLR model on %i training point(s)",
len(self.data['train'].index))
logger.debug("Pipeline steps:")
logger.debug(pformat(list(self._clf.named_steps.keys())))
logger.debug("Parameters:")
logger.debug(pformat(self.parameters))
# LIME
self._load_lime_explainer()
[docs]
def export_prediction_data(self, filename=None):
"""Export all prediction data contained in `self._data`.
Parameters
----------
filename : str, optional (default: '{data_type}_{pred_name}.csv')
Name of the exported files.
"""
for pred_name in self.data['pred']:
self._save_csv_file('pred', filename, pred_name=pred_name)
[docs]
def export_training_data(self, filename=None):
"""Export all training data contained in `self._data`.
Parameters
----------
filename : str, optional (default: '{data_type}.csv')
Name of the exported files.
"""
for data_type in ('all', 'train', 'test'):
self._save_csv_file(data_type, filename)
[docs]
def fit(self):
"""Fit MLR model.
Note
----
Specifying keyword arguments for this function is not allowed here
since :attr:`features_after_preprocessing` might be altered by
that. Use the keyword argument ``fit_kwargs`` during class
initialization instead.
"""
logger.info(
"Fitting MLR model with final regressor %s on %i training "
"point(s)", self._CLF_TYPE, len(self.data['train'].index))
# Create MLR model with desired parameters and fit it
self._clf.fit(self.data['train'].x, self.data['train'].y,
**self.fit_kwargs)
self._parameters = self._get_clf_parameters()
logger.info("Successfully fitted MLR model on %i training point(s)",
len(self.data['train'].index))
logger.debug("Pipeline steps:")
logger.debug(pformat(list(self._clf.named_steps.keys())))
logger.debug("Parameters:")
logger.debug(pformat(self.parameters))
# LIME
self._load_lime_explainer()
[docs]
def get_ancestors(self, label=True, features=None, prediction_names=None,
prediction_reference=False):
"""Return ancestor files.
Parameters
----------
label : bool, optional (default: True)
Return ``label`` files.
features : list of str, optional (default: None)
Features for which files should be returned. If ``None``, return
files for all features.
prediction_names : list of str, optional (default: None)
Prediction names for which files should be returned. If ``None``,
return files for all prediction names.
prediction_reference : bool, optional (default: False)
Return ``prediction_reference`` files if available for given
``prediction_names``.
Returns
-------
list of str
Ancestor files.
Raises
------
ValueError
Invalid ``feature`` or ``prediction_name`` given.
"""
ancestors = []
# Label files
if label:
ancestors.extend([d['filename'] for d in self._datasets['label']])
# Feature files
if features is None:
features = self.features
for feature in features:
if feature not in self.features:
raise ValueError(
f"Got invalid feature '{feature}', expected one of "
f"{self.features}")
ancestors.extend(
[d['filename'] for d in self._datasets['feature']
if d['tag'] == feature]
)
# Prediction files
available_pred_names = list(self._datasets['prediction_input'].keys())
if prediction_names is None:
prediction_names = available_pred_names
for pred_name in prediction_names:
if pred_name not in available_pred_names:
raise ValueError(
f"Got invalid prediction name '{pred_name}', expected one "
f"of {available_pred_names}")
ancestors.extend(
[d['filename'] for d in
self._datasets['prediction_input'][pred_name]]
)
ancestors.extend(
[d['filename'] for d in
self._datasets['prediction_input_error'].get(pred_name, [])]
)
if prediction_reference:
ancestors.extend(
[d['filename'] for d in
self._datasets['prediction_reference'].get(pred_name, [])]
)
return ancestors
[docs]
def get_data_frame(self, data_type, impute_nans=False):
"""Return data frame of specified type.
Parameters
----------
data_type : str
Data type to be returned. Must be one of ``'all'``, ``'train'`` or
``'test'``.
impute_nans : bool, optional (default: False)
Impute nans if desired.
Returns
-------
pandas.DataFrame
Desired data.
Raises
------
TypeError
``data_type`` is invalid or data does not exist (e.g. test data is
not set).
"""
allowed_types = ('all', 'train', 'test')
if data_type not in allowed_types:
raise TypeError(
f"'{data_type}' is not an allowed type, specify one of "
f"'{allowed_types}'")
if data_type not in self.data:
raise TypeError(f"No '{data_type}' data available")
data_frame = self.data[data_type]
if impute_nans:
data_frame = self._impute_nans(data_frame)
return data_frame
[docs]
def get_x_array(self, data_type, impute_nans=False):
"""Return x data of specific type.
Parameters
----------
data_type : str
Data type to be returned. Must be one of ``'all'``, ``'train'`` or
``'test'``.
impute_nans : bool, optional (default: False)
Impute nans if desired.
Returns
-------
numpy.ndarray
Desired data.
Raises
------
TypeError
``data_type`` is invalid or data does not exist (e.g. test data is
not set).
"""
data_frame = self.get_data_frame(data_type, impute_nans=impute_nans)
return data_frame.x.values
[docs]
def get_y_array(self, data_type, impute_nans=False):
"""Return y data of specific type.
Parameters
----------
data_type : str
Data type to be returned. Must be one of ``'all'``, ``'train'`` or
``'test'``.
impute_nans : bool, optional (default: False)
Impute nans if desired.
Returns
-------
numpy.ndarray
Desired data.
Raises
------
TypeError
``data_type`` is invalid or data does not exist (e.g. test data is
not set).
"""
data_frame = self.get_data_frame(data_type, impute_nans=impute_nans)
return data_frame.y.squeeze().values
[docs]
def grid_search_cv(self, param_grid, **kwargs):
"""Perform exhaustive parameter search using cross-validation.
Parameters
----------
param_grid : dict or list of dict
Parameter names (keys) and ranges (values) for the search. Have to
be given for each step of the pipeline separated by two
underscores, i.e. ``s__p`` is the parameter ``p`` for step ``s``.
**kwargs : keyword arguments, optional
Additional options for
:class:`sklearn.model_selection.GridSearchCV`.
Raises
------
ValueError
Final regressor does not supply the attributes ``best_estimator_``
or ``best_params_``.
"""
logger.info(
"Performing exhaustive grid search cross-validation with final "
"regressor %s and parameter grid %s on %i training points",
self._CLF_TYPE, param_grid, len(self.data['train'].index))
# Get keyword arguments
(cv_kwargs, fit_kwargs) = self._get_cv_estimator_kwargs(GridSearchCV,
**kwargs)
# Create and fit GridSearchCV instance
clf = GridSearchCV(self._clf, param_grid, **cv_kwargs)
clf.fit(self.data['train'].x, self.data['train'].y, **fit_kwargs)
# Try to find best estimator
if hasattr(clf, 'best_estimator_'):
self._clf = clf.best_estimator_
elif hasattr(clf, 'best_params_'):
self.update_parameters(**clf.best_params_)
self._clf.fit(self.data['train'].x, self.data['train'].y,
**fit_kwargs)
else:
raise ValueError(
"GridSearchCV not successful, cannot determine best estimator "
"(neither using 'best_estimator_' nor 'best_params_'), "
"adapt keyword arguments accordingly (see "
"https://scikit-learn.org/stable/modules/generated/"
"sklearn.model_selection.GridSearchCV.html for more help)")
self._parameters = self._get_clf_parameters()
logger.info(
"Exhaustive grid search successful, found best parameter(s) %s",
clf.best_params_)
logger.debug("CV results:")
logger.debug(pformat(clf.cv_results_))
logger.info("Successfully fitted MLR model on %i training point(s)",
len(self.data['train'].index))
logger.debug("Pipeline steps:")
logger.debug(pformat(list(self._clf.named_steps.keys())))
logger.debug("Parameters:")
logger.debug(pformat(self.parameters))
# LIME
self._load_lime_explainer()
[docs]
def plot_1d_model(self, filename=None, n_points=1000):
"""Plot lineplot that represents the MLR model.
Note
----
This only works for a model with a single feature.
Parameters
----------
filename : str, optional (default: '1d_mlr_model')
Name of the plot file.
n_points : int, optional (default: 1000)
Number of sampled points for the single feature (using linear
spacing between minimum and maximum value).
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
ValueError
MLR model is built from more than 1 feature.
"""
if not self._is_ready_for_plotting():
return
n_features = self.features.size
if n_features > 1:
raise ValueError(
f"Plotting lineplot of MLR model using 'plot_1d_model' is not "
f"possible, MLR model {self._cfg['mlr_model_name']} contains "
f"more than one feature ({n_features:d} features: "
f"{self.features})")
feature = self.features[0]
logger.info("Plotting 1D MLR model (sampling %i points for single "
"feature '%s')", n_points, feature)
if filename is None:
filename = '1d_mlr_model'
(_, axes) = plt.subplots()
# Get available datasets
data_to_plot = ['train']
if 'test' in self.data:
data_to_plot.append('test')
# Plot training and test data (if available)
for data_type in data_to_plot:
x_data = self.data[data_type].x[feature].values
y_data = self.get_y_array(data_type)
axes.scatter(
x_data, y_data,
**self._get_plot_kwargs(data_type, plot_type='scatter'))
# Plot MLR model
x_lin = pd.DataFrame.from_dict(
{feature: np.linspace(self.data['all'].x[feature].values.min(),
self.data['all'].x[feature].values.max(),
n_points)}
)
y_pred = self._clf.predict(x_lin)
x_lin_1d = x_lin.values[:, 0]
axes.plot(x_lin_1d, y_pred, color='k', linewidth=2,
label=self._cfg['mlr_model_name'])
# Plot appearance
title = (f"Predicted {self.label} by MLR model "
f"{self._cfg['mlr_model_name']}")
axes.set_title(title)
axes.set_xlabel(self._get_plot_feature(feature))
axes.set_ylabel(self._get_plot_label())
axes.legend(loc='best')
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename + '.' + self._cfg['output_file_type'],
)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
x_lin_1d,
y_pred,
x_kwargs={'var_name': feature,
'long_name': feature,
'units': self.features_units[feature]},
y_kwargs={'var_name': self.label,
'long_name': title,
'units': self.label_units,
'attributes': {'project': '', 'dataset': ''}},
)
self._write_plot_provenance(
cube, plot_path, ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['line'])
[docs]
def plot_partial_dependences(self, filename=None):
"""Plot partial dependences for every feature.
Parameters
----------
filename : str, optional (default: 'partial_dependece_{feature}')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting partial dependences")
if filename is None:
filename = 'partial_dependece_{feature}'
# Plot for every feature
# Note: Ignore warnings about missing feature names here because they
# are not used.
x_train = self.get_x_array('train', impute_nans=True)
verbosity = self._get_verbosity_parameters(
PartialDependenceDisplay.from_estimator
)
for feature_name in self.features:
logger.debug("Plotting partial dependence of '%s'", feature_name)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=('X does not have valid feature names, but '
'SimpleImputer was fitted with feature names'),
category=UserWarning,
module='sklearn',
)
display = PartialDependenceDisplay.from_estimator(
self._clf,
x_train,
features=[feature_name],
feature_names=self.features,
method='brute',
line_kw={'color': 'b'},
random_state=self.random_state,
**verbosity,
)
title = (f"Partial dependence of {self.label} on {feature_name} "
f"for MLR model {self._cfg['mlr_model_name']}")
plt.title(title)
plt.xlabel(self._get_plot_feature(feature_name))
plt.ylabel(self._get_plot_label())
# Save plot
new_filename = (filename.format(feature=feature_name) + '.' +
self._cfg['output_file_type'])
plot_path = os.path.join(self._cfg['mlr_plot_dir'], new_filename)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
display.lines_[0, 0].get_xdata(),
display.lines_[0, 0].get_ydata(),
x_kwargs={'var_name': feature_name,
'long_name': feature_name,
'units': self.features_units[feature_name]},
y_kwargs={'var_name': self.label,
'long_name': self.label,
'units': self.label_units,
'attributes': {'project': '', 'dataset': ''}},
)
self._write_plot_provenance(
cube, plot_path,
ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['line'])
[docs]
def plot_prediction_errors(self, filename=None):
"""Plot predicted vs. true values.
Parameters
----------
filename : str, optional (default: 'prediction_errors')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting prediction errors")
if filename is None:
filename = 'prediction_errors'
(_, axes) = plt.subplots()
# Get available datasets
data_to_plot = ['train']
if 'test' in self.data:
data_to_plot.append('test')
# Create plot
y_pred_all = []
y_true_all = []
data_types = []
for data_type in data_to_plot:
logger.debug("Plotting prediction error of '%s' data", data_type)
x_data = self.data[data_type].x
y_pred = self._clf.predict(x_data)
y_true = self.get_y_array(data_type)
axes.scatter(
y_pred, y_true,
**self._get_plot_kwargs(data_type, plot_type='scatter'))
# Collect data
y_pred_all.append(y_pred)
y_true_all.append(y_true)
data_types.append(np.full(y_pred.shape, data_type))
# Plot appearance
lims = [
np.min([axes.get_xlim(), axes.get_ylim()]),
np.max([axes.get_xlim(), axes.get_ylim()]),
]
axes.plot(lims, lims, linestyle='--', color='k', alpha=0.75)
axes.set_aspect('equal')
axes.set_xlim(lims)
axes.set_ylim(lims)
title = (f"Prediction errors of {self.label} "
f"({self._cfg['mlr_model_name']})")
axes.set_title(title)
axes.set_xlabel(f'Predicted {self._get_plot_label()}')
axes.set_ylabel(f'True {self._get_plot_label()}')
axes.legend(loc='upper left')
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename + '.' + self._cfg['output_file_type'],
)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
np.concatenate(y_pred_all),
np.concatenate(y_true_all),
x_kwargs={'var_name': self.label,
'long_name': f'Predicted {self.label}',
'units': self.label_units},
y_kwargs={'var_name': self.label,
'long_name': f'True {self.label}',
'units': self.label_units,
'attributes': {'project': '', 'dataset': ''}},
)
cube.add_aux_coord(
self._get_data_type_coord(np.concatenate(data_types)), 0)
self._write_plot_provenance(
cube, plot_path, ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['scatter'])
[docs]
def plot_residuals(self, filename=None):
"""Plot residuals of training and test (if available) data.
Parameters
----------
filename : str, optional (default: 'residuals')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting residuals")
if filename is None:
filename = 'residuals'
(_, axes) = plt.subplots()
# Get available datasets
data_to_plot = ['train']
if 'test' in self.data:
data_to_plot.append('test')
# Create plot
y_pred_all = []
y_res_all = []
data_types = []
for data_type in data_to_plot:
logger.debug("Plotting residuals of '%s' data", data_type)
x_data = self.data[data_type].x
y_pred = self._clf.predict(x_data)
y_true = self.get_y_array(data_type)
y_res = self._get_residuals(y_true, y_pred)
axes.scatter(
y_pred, y_res,
**self._get_plot_kwargs(data_type, plot_type='scatter'))
# Collect data
y_pred_all.append(y_pred)
y_res_all.append(y_res)
data_types.append(np.full(y_pred.shape, data_type))
# Plot appearance
axes.axhline(0.0, linestyle='--', color='k', alpha=0.75)
axes.set_aspect('equal')
title = (f"Residuals of {self.label} ({self._cfg['mlr_model_name']})")
axes.set_title(title)
axes.set_xlabel(f'Predicted {self._get_plot_label()}')
axes.set_ylabel(f'Residuals of {self._get_plot_label()}')
self._set_axis_lim_symmetric(axes, 'y')
axes.legend(loc='best')
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename + '.' + self._cfg['output_file_type'],
)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
np.concatenate(y_pred_all),
np.concatenate(y_res_all),
x_kwargs={'var_name': self.label,
'long_name': f'Predicted {self.label}',
'units': self.label_units},
y_kwargs={'var_name': self.label,
'long_name': f'Residuals of {self.label}',
'units': self.label_units,
'attributes': {'project': '', 'dataset': ''}},
)
cube.add_aux_coord(
self._get_data_type_coord(np.concatenate(data_types)), 0)
self._write_plot_provenance(
cube, plot_path, ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['scatter'])
[docs]
def plot_residuals_histogram(self, filename=None):
"""Plot histogram of residuals of training and test data.
Parameters
----------
filename : str, optional (default: 'residuals_histogram')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting residuals histogram")
if filename is None:
filename = 'residuals_histogram'
(_, axes) = plt.subplots()
# Get available datasets
data_to_plot = ['train']
if 'test' in self.data:
data_to_plot.append('test')
# Create plot (centralize bins around the zero)
y_res_all = []
freq_all = []
data_types = []
for data_type in data_to_plot:
logger.debug("Plotting residuals histogram of '%s' data",
data_type)
x_data = self.data[data_type].x
y_pred = self._clf.predict(x_data)
y_true = self.get_y_array(data_type)
y_res = self._get_residuals(y_true, y_pred)
bins = self._get_centralized_bins(y_res, n_bins=20)
hist = axes.hist(y_res, bins=bins,
**self._get_plot_kwargs(data_type))
# Collect data
y_res_all.append(np.convolve(hist[1], (1, 1), 'valid') / 2.0)
freq_all.append(hist[0])
data_types.append(np.full(hist[0].shape, data_type))
# Plot appearance
axes.axvline(0.0, linestyle='--', color='k', alpha=0.75)
title = (f"Histogram for residuals of {self.label} "
f"({self._cfg['mlr_model_name']})")
axes.set_title(title)
axes.set_xlabel(f'Residuals of {self._get_plot_label()}')
axes.set_ylabel('Frequency')
self._set_axis_lim_symmetric(axes, 'x')
axes.legend(loc='best')
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename + '.' + self._cfg['output_file_type'],
)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
np.concatenate(y_res_all),
np.concatenate(freq_all),
x_kwargs={'var_name': self.label,
'long_name': f'Residuals of {self.label}',
'units': self.label_units},
y_kwargs={'var_name': 'frequency',
'long_name': 'Frequency',
'units': '1',
'attributes': {'project': '', 'dataset': ''}},
)
cube.add_aux_coord(
self._get_data_type_coord(np.concatenate(data_types)), 0)
self._write_plot_provenance(
cube, plot_path, ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['histogram'])
[docs]
def plot_residuals_distribution(self, filename=None):
"""Plot distribution of residuals of training and test data (KDE).
Parameters
----------
filename : str, optional (default: 'residuals_distribution')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting residuals distribution")
if filename is None:
filename = 'residuals_distribution'
# Get available datasets
data_to_plot = ['train']
if 'test' in self.data:
data_to_plot.append('test')
# Create plot (centralize bins around the zero)
data_types = []
for data_type in data_to_plot:
logger.debug("Plotting residuals distribution of '%s' data",
data_type)
x_data = self.data[data_type].x
y_pred = self._clf.predict(x_data)
y_true = self.get_y_array(data_type)
y_res = self._get_residuals(y_true, y_pred)
axes = sns.kdeplot(y_res, **self._get_plot_kwargs(data_type))
# Collect data
data_types.append(np.full(axes.lines[-1].get_xdata().shape,
data_type))
# Plot appearance
axes.axvline(0.0, linestyle='--', color='k', alpha=0.75)
title = (f"Probability distribution of residuals of {self.label} "
f"({self._cfg['mlr_model_name']})")
axes.set_title(title)
axes.set_xlabel(f'Residuals of {self._get_plot_label()}')
axes.set_ylabel('Probability density')
self._set_axis_lim_symmetric(axes, 'x')
axes.legend(loc='best')
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename + '.' + self._cfg['output_file_type'],
)
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
np.concatenate([line.get_xdata() for line in axes.lines[:-1]]),
np.concatenate([line.get_ydata() for line in axes.lines[:-1]]),
x_kwargs={'var_name': self.label,
'long_name': f'Residuals of {self.label}',
'units': self.label_units},
y_kwargs={'var_name': 'probability_density',
'long_name': 'Probability Density',
'units': '1',
'attributes': {'project': '', 'dataset': ''}},
)
cube.add_aux_coord(
self._get_data_type_coord(np.concatenate(data_types)), 0)
self._write_plot_provenance(
cube, plot_path, ancestors=self.get_ancestors(prediction_names=[]),
caption=title + '.', plot_types=['probability'])
[docs]
def plot_scatterplots(self, filename=None):
"""Plot scatterplots label vs. feature for every feature.
Parameters
----------
filename : str, optional (default: 'scatterplot_{feature}')
Name of the plot file.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
logger.info("Plotting scatterplots")
if filename is None:
filename = 'scatterplot_{feature}'
# Plot scatterplot for every feature
for feature in self.features:
logger.debug("Plotting scatterplot of '%s'", feature)
(_, axes) = plt.subplots()
# Iterate over group attributes
for group_attr in self.group_attributes:
group_attr = self._group_attr_to_pandas_index_str(group_attr)
axes.plot(self.data['all'].x.loc[group_attr, feature],
self.data['all'].y.loc[group_attr, self.label],
'.', label=group_attr)
# Plot appearance
axes.legend(loc='center left', ncol=2, bbox_to_anchor=[1.05, 0.5],
borderaxespad=0.0)
title = f"Target variable {self.label} vs. feature {feature}"
axes.set_title(title)
axes.set_xlabel(self._get_plot_feature(feature))
axes.set_ylabel(self._get_plot_label())
# Save plot
plot_path = os.path.join(
self._cfg['mlr_plot_dir'],
filename.format(feature=feature) + '.' +
self._cfg['output_file_type'])
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
plt.close()
# Save provenance
cube = mlr.get_1d_cube(
self.data['all'].x.loc[:, feature].values,
self.get_y_array('all'),
x_kwargs={'var_name': feature,
'long_name': feature,
'units': self.features_units[feature]},
y_kwargs={'var_name': self.label,
'long_name': self.label,
'units': self.label_units,
'attributes': {'project': '', 'dataset': ''}},
)
ancestors = self.get_ancestors(features=[feature],
prediction_names=[])
self._write_plot_provenance(
cube, plot_path, ancestors=ancestors, caption=title + '.',
plot_types=['scatter'])
[docs]
def predict(self,
save_mlr_model_error=None,
save_lime_importance=False,
save_propagated_errors=False,
**kwargs):
"""Perform prediction using the MLR model(s) and write ``*.nc`` files.
Parameters
----------
save_mlr_model_error : str or int, optional
Additionally saves estimated squared MLR model error. This error
represents the uncertainty of the prediction caused by the MLR
model itself and not by errors in the prediction input data (errors
in that will be considered by including datasets with ``var_type``
set to ``prediction_input_error`` and setting
``save_propagated_errors`` to ``True``). If the option is set to
``'test'``, the (constant) error is estimated as RMSEP using a
(hold-out) test data set. Only possible if test data is available,
i.e. the option ``test_size`` is not set to ``False`` during class
initialization. If the option is set to ``'logo'``, the (constant)
error is estimated as RMSEP using leave-one-group-out
cross-validation using the group_attributes. Only possible if
``group_datasets_by_attributes`` is given. If the option is set to
an integer ``n`` (!= 0), the (constant) error is estimated as RMSEP
using n-fold cross-validation.
save_lime_importance : bool, optional (default: False)
Additionally saves local feature importance given by LIME (Local
Interpretable Model-agnostic Explanations).
save_propagated_errors : bool, optional (default: False)
Additionally saves propagated errors from
``prediction_input_error`` datasets. Only possible when these are
available.
**kwargs : keyword arguments, optional
Additional options for the final regressors ``predict()`` function.
Raises
------
RuntimeError
``return_var`` and ``return_cov`` are both set to ``True``.
sklearn.exceptions.NotFittedError
MLR model is not fitted.
ValueError
An invalid value for ``save_mlr_model_error`` is given.
ValueError
``save_propagated_errors`` is ``True`` and no
``prediction_input_error`` data is available.
"""
self._check_fit_status('Prediction')
logger.info("Started prediction")
mlr.check_predict_kwargs(kwargs)
if kwargs:
logger.info(
"Using additional keyword argument(s) %s for predict() "
"function", kwargs)
# Iterate over different predictions
for pred_name in self._datasets['prediction_input']:
logger.info("Predicting '%s'", self._get_name(pred_name))
# Prediction
(x_pred, x_err, y_ref,
x_cube) = self._extract_prediction_input(pred_name)
pred_dict = self._get_prediction_dict(
pred_name, x_pred, x_err, y_ref,
get_mlr_model_error=save_mlr_model_error,
get_lime_importance=save_lime_importance,
get_propagated_errors=save_propagated_errors, **kwargs)
# Save data in class member
y_pred = pd.DataFrame(pred_dict[None],
columns=[self.label],
index=x_pred.index,
dtype=self._cfg['dtype'])
self._data['pred'][pred_name] = pd.concat([x_pred, y_pred],
axis=1,
keys=['x', 'y'])
# Save prediction cubes
self._save_prediction_cubes(pred_dict, pred_name, x_cube)
[docs]
def print_correlation_matrices(self):
"""Print correlation matrices for all datasets."""
self._check_fit_status('Printing correlation matrices')
for data_type in ('all', 'train', 'test'):
if data_type not in self.data:
continue
logger.info("Correlation matrix for %s data:\n%s", data_type,
self.data[data_type][['x', 'y']].corr())
[docs]
def print_regression_metrics(self, logo=False):
"""Print all available regression metrics for training data.
Parameters
----------
logo : bool, optional (default: False)
Print regression metrics using
:class:`sklearn.model_selection.LeaveOneGroupOut` cross-validation.
Only possible when `group_datasets_by_attributes` was given during
class initialization.
"""
self._check_fit_status('Printing regression metrics')
regression_metrics = [
'explained_variance_score',
'mean_absolute_error',
'mean_squared_error',
'r2_score',
]
# Metrics on train and test data
for data_type in ('all', 'train', 'test'):
self._print_metrics(regression_metrics, data_type)
logger.info("")
# Metrics on CV data
if logo:
logger.info(
"Evaluating regression metrics using 'LeaveOneGroupOut' "
"cross-validation using group attributes %s on training data",
self._cfg['group_datasets_by_attributes'])
regression_metrics = {
'explained_variance_score': 'explained_variance',
'mean_absolute_error': 'neg_mean_absolute_error',
'root_mean_squared_error': 'neg_root_mean_squared_error',
'r2_score': 'r2',
}
x_data = self.data['train'].x
y_data = self.get_y_array('train')
sample_weights = self._get_sample_weights('train')
for (metric, scoring) in regression_metrics.items():
value = cross_val_score_weighted(
self._clf, x_data, y_data, scoring=scoring,
n_jobs=self._cfg['n_jobs'], fit_params=self.fit_kwargs,
**self._get_verbosity_parameters(cross_val_score_weighted),
**self._get_logo_cv_kwargs())
value = np.mean(value)
if 'neg_' in scoring:
value = -value
logger.info("%s: %s", metric, value)
if sample_weights is None:
return
for (metric, scoring) in regression_metrics.items():
value = cross_val_score_weighted(
self._clf, x_data, y_data, scoring=scoring,
n_jobs=self._cfg['n_jobs'], fit_params=self.fit_kwargs,
sample_weights=sample_weights,
**self._get_verbosity_parameters(cross_val_score_weighted),
**self._get_logo_cv_kwargs())
value = np.mean(value)
if 'neg_' in scoring:
value = -value
logger.info("Weighted %s: %s", metric, value)
[docs]
def reset_pipeline(self):
"""Reset regressor pipeline."""
steps = []
numerical_features_idx = [
int(np.where(self.features == tag)[0][0])
for tag in self.numerical_features
]
# Imputer
if self._cfg['imputation_strategy'] != 'remove':
imputer = SimpleImputer(strategy=self._cfg['imputation_strategy'])
steps.append(('imputer', imputer))
# Scaler for numerical features
if self._cfg['standardize_data']:
x_scaler = ColumnTransformer(
[('', StandardScaler(), numerical_features_idx)],
remainder='passthrough',
)
steps.append(('x_scaler', x_scaler))
# PCA for numerical features
if self._cfg.get('pca'):
pca = ColumnTransformer(
[('', PCA(random_state=self.random_state),
numerical_features_idx)],
remainder='passthrough',
)
steps.append(('pca', pca))
# Final regressor
final_parameters = self._load_final_parameters()
final_regressor = self._CLF_TYPE(**final_parameters)
# Transformer for labels if desired (if not, add pd to np converter)
if self._cfg['standardize_data']:
y_scaler = StandardScaler()
else:
y_scaler = StandardScaler(with_mean=False, with_std=False)
transformed_target_regressor = AdvancedTransformedTargetRegressor(
transformer=y_scaler, regressor=final_regressor)
steps.append(('final', transformed_target_regressor))
# Final pipeline
if self._cfg['cache_intermediate_results']:
if self._cfg['n_jobs'] is None or self._cfg['n_jobs'] == 1:
memory = self._cfg['mlr_work_dir']
else:
logger.debug(
"Caching intermediate results of Pipeline is not "
"supported for multiple processes (using at most %i "
"processes)", self._cfg['n_jobs'])
memory = None
else:
memory = None
self._clf = AdvancedPipeline(steps, memory=memory)
logger.info("Created pipeline with steps %s",
list(self._clf.named_steps.keys()))
[docs]
def rfecv(self, **kwargs):
"""Perform recursive feature elimination using cross-validation.
Note
----
This only works for final estimators that provide information about
feature importance either through a ``coef_`` attribute or through a
``feature_importances_`` attribute.
Parameters
----------
**kwargs : keyword arguments, optional
Additional options for :class:`sklearn.feature_selection.RFECV`.
Raises
------
RuntimeError
Final estimator does not provide ``coef_`` or
``feature_importances_`` attribute.
"""
logger.info(
"Performing recursive feature elimination using cross-validation "
"with final regressor %s on %i training points", self._CLF_TYPE,
len(self.data['train'].index))
# Get keyword arguments
(cv_kwargs, fit_kwargs) = self._get_cv_estimator_kwargs(AdvancedRFECV,
**kwargs)
fit_kwargs = deepcopy(fit_kwargs)
keys_to_remove = []
for key in fit_kwargs:
if key.endswith('eval_set'):
keys_to_remove.append(key)
for key in keys_to_remove:
logger.warning(
"Fit parameter '%s' is not supported for rfecv()", key)
fit_kwargs.pop(key)
# Create and fit AdvancedRFECV instance
rfecv = AdvancedRFECV(self._clf, **cv_kwargs)
rfecv.fit(self.data['train'].x, self.get_y_array('train'),
**fit_kwargs)
# Add feature selection step to pipeline
self._clf = rfecv.estimator_
transformer = get_rfecv_transformer(rfecv)
self._clf.steps.insert(0, ('feature_selection', transformer))
# Log results
new_features = self.features[rfecv.support_]
logger.info(
"Recursive feature elimination was successful, %i of the %i "
"features remain", new_features.size, self.features.size)
logger.info("Old features: %s", self.features)
logger.info("New features: %s", new_features)
logger.info("Successfully fitted MLR model on %i training point(s)",
len(self.data['train'].index))
logger.debug("Pipeline steps:")
logger.debug(pformat(list(self._clf.named_steps.keys())))
logger.debug("Parameters:")
logger.debug(pformat(self.parameters))
# LIME
self._load_lime_explainer()
[docs]
def test_normality_of_residuals(self):
"""Perform Shapiro-Wilk test to normality of residuals.
Raises
------
sklearn.exceptions.NotFittedError
MLR model is not fitted.
"""
if not self._is_ready_for_plotting():
return
# Get available datasets
data_to_check = ['train']
if 'test' in self.data:
data_to_check.append('test')
# Perform Shapiro-Wilk test
for data_type in data_to_check:
x_data = self.data[data_type].x
y_pred = self._clf.predict(x_data)
y_true = self.get_y_array(data_type)
y_res = self._get_residuals(y_true, y_pred)
(w_value, p_value) = shapiro(y_res)
logger.info(
"Result of Shapiro-Wilk test for normality of residuals: W = "
"%.5f, p = %.5f", w_value, p_value)
[docs]
def update_parameters(self, **params):
"""Update parameters of the whole pipeline.
Note
----
Parameter names have to be given for each step of the pipeline
separated by two underscores, i.e. ``s__p`` is the parameter ``p`` for
step ``s``.
Parameters
----------
**params : keyword arguments, optional
Parameters for the pipeline which should be updated.
Raises
------
ValueError
Invalid parameter for pipeline given.
"""
allowed_params = self._get_clf_parameters()
new_params = {}
for (key, val) in params.items():
if key in allowed_params:
new_params[key] = val
else:
raise ValueError(
f"'{key}' is not a valid parameter for the pipeline")
self._clf.set_params(**new_params)
self._parameters = self._get_clf_parameters()
if new_params:
logger.info("Updated pipeline with parameters %s", new_params)
def _calculate_sample_weights(self, cube, var_type, group_attr=None):
"""Calculate sample weights if desired."""
if not self._cfg['weighted_samples']:
return None
if var_type != 'feature':
return None
weights = mlr.get_all_weights(cube, **self._cfg['weighted_samples'])
weights = weights.astype(self._cfg['dtype'], casting='same_kind')
weights = pd.DataFrame(
{'sample_weight': weights.ravel()},
index=self._get_multiindex(cube, group_attr=group_attr),
dtype=self._cfg['dtype'],
)
msg = '' if group_attr is None else f" of '{group_attr}'"
logger.debug(
"Successfully calculated %i sample weights for training data%s "
"using %s", len(weights.index), msg, self._cfg['weighted_samples'])
return weights
def _check_clf(self):
"""Check if valid regressor type is given."""
class_name = self.__class__.__name__
if self._CLF_TYPE is None:
raise NotImplementedError(
f"No MLR model type specified, please use the factory "
f"function 'esmvaltool.diag_scripts.mlr.models.{class_name}."
f"create()' to initialize this class")
def _check_cube_dimensions(self, cube, ref_cube, text=None):
"""Check shape and coordinates of a given cube."""
msg = '' if text is None else f' for {text}'
if self._cfg.get('accept_only_scalar_data'):
allowed_shapes = [(), (1, )]
if cube.shape not in allowed_shapes:
raise ValueError(
f"Expected only cubes with shapes {allowed_shapes} when "
f"option 'accept_only_scalar_data' is set to 'True', got "
f"{cube.shape}{msg}")
else:
if ref_cube is None:
return
if cube.shape != ref_cube.shape:
raise ValueError(
f"Expected cubes with shapes {ref_cube.shape}{msg}, got "
f"{cube.shape}. Consider regridding, pre-selecting data "
f"at class initialization (argument 'input_datasets') or "
f"the options 'broadcast_from' or 'group_datasets_by_"
f"attributes'")
cube_coords = cube.coords(dim_coords=True)
ref_coords = ref_cube.coords(dim_coords=True)
cube_coords_str = [
f'{coord.name()}, shape {coord.shape}' for coord in cube_coords
]
ref_coords_str = [
f'{coord.name()}, shape {coord.shape}' for coord in ref_coords
]
if cube_coords_str != ref_coords_str:
logger.warning(
"Cube coordinates differ, expected %s%s, got %s. Check "
"input cubes", ref_coords_str, msg, cube_coords_str)
return
for (idx, cube_coord) in enumerate(cube_coords):
ref_coord = ref_coords[idx]
if not np.allclose(cube_coord.points, ref_coord.points):
logger.warning(
"'%s' coordinate for different cubes does not "
"match, got %s%s, expected %s (values differ by "
"more than allowed tolerance, check input cubes)",
cube_coord.name(), cube_coord.points, msg,
ref_coord.points)
def _check_dataset(self, datasets, var_type, tag, text=None):
"""Check if datasets exist and are valid."""
datasets = select_metadata(datasets, tag=tag, var_type=var_type)
msg = '' if text is None else text
if not datasets:
if var_type == 'prediction_input_error':
return None
if var_type == 'prediction_reference':
return None
if var_type == 'label':
raise ValueError(f"Label '{tag}'{msg} not found")
if not self._cfg.get('allow_missing_features'):
raise ValueError(
f"{var_type} '{tag}'{msg} not found, use 'allow_missing_"
f"features' to ignore this")
logger.info(
"Ignored missing %s '%s'%s since 'allow_missing_features' is "
"set to 'True'", var_type, tag, msg)
return None
if len(datasets) > 1:
raise ValueError(
f"{var_type} '{tag}'{msg} not unique, consider adapting the "
f"argument 'input_datasets' at class initialization to "
f"pre-select datasets or specify suitable attributes to group "
f"datasets with the option 'group_datasets_by_attributes'")
if var_type in ('label', 'prediction_reference'):
units = self.label_units
else:
units = self.features_units[tag]
if units != Unit(datasets[0]['units']):
raise ValueError(
f"Expected units '{units}' for {var_type} '{tag}'{msg}, got "
f"'{datasets[0]['units']}'")
return datasets[0]
def _check_fit_status(self, text):
"""Check if MLR model is fitted and raise exception otherwise."""
x_dummy = pd.DataFrame(
np.ones((1, self.features.size), dtype=self._cfg['dtype']),
columns=self.features,
)
try:
self._clf.predict(x_dummy)
except NotFittedError as exc:
raise NotFittedError(
f"{text} not possible, MLR model {self._CLF_TYPE} is not "
f"fitted yet, call fit(), grid_search_cv() or rfecv() "
f"first") from exc
def _estimate_mlr_model_error(self, target_length, strategy):
"""Estimate squared error of MLR model (using CV or test data)."""
logger.info(
"Estimating squared error of MLR model using strategy '%s'",
strategy)
# Estimate MLR model error
if strategy == 'test':
if 'test' not in self.data:
raise ValueError(
f"'save_mlr_model_error' using strategy 'test' is not "
f"possible because no test data is available ('test_size' "
f"was set to '{self._cfg['test_size']}' during class "
f"initialization)")
y_pred = self._clf.predict(self.data['test'].x)
error = metrics.mean_squared_error(
self.get_y_array('test'),
y_pred,
sample_weight=self._get_sample_weights('test'),
)
else:
if strategy == 'logo':
cv_kwargs = self._get_logo_cv_kwargs()
elif isinstance(strategy, int):
cv_kwargs = {'cv': strategy}
else:
raise ValueError(
f"Expected 'test', 'logo' or an integer as strategy for "
f"estimating MLR model error (argument "
f"'save_mlr_model_error'), got '{strategy}'")
x_data = self.data['train'].x
y_data = self.get_y_array('train')
error = cross_val_score_weighted(
self._clf, x_data, y_data, scoring='neg_mean_squared_error',
n_jobs=self._cfg['n_jobs'], fit_params=self.fit_kwargs,
sample_weights=self._get_sample_weights('train'),
**self._get_verbosity_parameters(cross_val_score_weighted),
**cv_kwargs)
error = -np.mean(error)
# Reshape error
error_array = np.full(target_length, error, dtype=self._cfg['dtype'])
units = mlr.units_power(self.label_units, 2)
logger.info(
"Estimated squared MLR model error by %s %s using strategy '%s'",
error, units, strategy)
return error_array
def _extract_features_and_labels(self):
"""Extract feature and label data points from training data."""
(x_data, _,
sample_weights) = self._extract_x_data(self._datasets['feature'],
'feature')
y_data = self._extract_y_data(self._datasets['label'], 'label')
# Check number of input points
if not x_data.index.equals(y_data.index):
raise ValueError(
f"Got differing point(s) for features and labels ("
f"{len(x_data.index):d} feature points and "
f"{len(y_data.index):d} label points):\n"
f"{x_data.index.difference(y_data.index)}")
logger.info("Found %i raw input data point(s) with data type '%s'",
len(y_data.index), self._cfg['dtype'])
# Remove missing values in labels
(x_data, y_data,
sample_weights) = self._remove_missing_labels(x_data, y_data,
sample_weights)
# Remove missing values in features (if desired)
(x_data, y_data, sample_weights) = self._remove_missing_features(
x_data, y_data, sample_weights)
return (x_data, y_data, sample_weights)
def _extract_prediction_input(self, prediction_name):
"""Extract prediction input data points for ``prediction_name``."""
(x_pred, x_cube, _) = self._extract_x_data(
self._datasets['prediction_input'][prediction_name],
'prediction_input')
logger.info(
"Found %i raw prediction input data point(s) with data type '%s'",
len(x_pred.index), self._cfg['dtype'])
# Prediction reference
if prediction_name not in self._datasets['prediction_reference']:
y_ref = None
logger.debug(
"No prediction reference for prediction '%s' available",
self._get_name(prediction_name))
else:
y_ref = self._extract_y_data(
self._datasets['prediction_reference'][prediction_name],
'prediction_reference')
if y_ref is not None:
if not x_pred.index.equals(y_ref.index):
raise ValueError(
f"Got differing point(s) for prediction input and "
f"prediction output ({len(x_pred.index):d} "
f"prediction input points and {len(y_ref.index):d} "
f"prediction output points):\n"
f"{x_pred.index.difference(y_ref.index)}")
logger.info(
"Found %i raw prediction output data point(s) with data "
"type '%s'", len(y_ref.index), self._cfg['dtype'])
# Error
if prediction_name not in self._datasets['prediction_input_error']:
x_err = None
logger.debug(
"Propagating prediction input errors for prediction '%s' not "
"possible, no 'prediction_input_error' datasets given",
self._get_name(prediction_name))
else:
(x_err, _, _) = self._extract_x_data(
self._datasets['prediction_input_error'][prediction_name],
'prediction_input_error')
if not x_pred.index.equals(x_err.index):
raise ValueError(
f"Got differing point(s) for prediction input and "
f"prediction input error ({len(x_pred.index):d} "
f"prediction input points and {len(x_err.index):d} "
f"prediction input error points):\n"
f"{x_pred.index.difference(x_err.index)}")
logger.info(
"Found %i raw prediction input error data point(s) with data "
"type '%s'", len(x_err.index), self._cfg['dtype'])
# Remove missing values if necessary
(x_pred, x_err, y_ref,
mask) = self._remove_missing_pred_input(x_pred, x_err, y_ref)
# Create cube with appropriate mask for output
mask = mask.reshape(x_cube.shape)
cube_data = np.empty(mask.shape, dtype=self._cfg['dtype'])
x_cube.data = np.ma.array(cube_data, mask=mask)
return (x_pred, x_err, y_ref, x_cube)
def _extract_x_data(self, datasets, var_type):
"""Extract required x data of type ``var_type`` from ``datasets``."""
allowed_types = ('feature', 'prediction_input',
'prediction_input_error')
if var_type not in allowed_types:
raise ValueError(
f"Excepted one of '{allowed_types}' for 'var_type', got "
f"'{var_type}'")
x_data_for_groups = []
x_cube = None
if self._cfg['weighted_samples'] and var_type == 'feature':
sample_weights_for_groups = []
else:
sample_weights_for_groups = None
# Iterate over datasets
datasets = select_metadata(datasets, var_type=var_type)
if var_type == 'feature':
groups = self.group_attributes
else:
groups = [None]
for group_attr in groups:
group_datasets = select_metadata(datasets,
group_attribute=group_attr)
if group_attr is not None:
logger.info("Loading '%s' data of '%s'", var_type, group_attr)
msg = '' if group_attr is None else f" for '{group_attr}'"
if not group_datasets:
raise ValueError(f"No '{var_type}' data{msg} found")
(group_data, x_cube,
weights) = self._get_x_data_for_group(group_datasets, var_type,
group_attr)
x_data_for_groups.append(group_data)
# Append weights if desired
if sample_weights_for_groups is not None:
sample_weights_for_groups.append(weights)
# Adapt sample_weights if necessary
if sample_weights_for_groups is not None:
sample_weights = pd.concat(sample_weights_for_groups)
sample_weights.index = pd.MultiIndex.from_tuples(
sample_weights.index, names=self._get_multiindex_names())
logger.info(
"Successfully calculated sample weights for training data "
"using %s", self._cfg['weighted_samples'])
if (sample_weights.max().values[0] /
sample_weights.min().values[0]) > 150.0:
logger.warning(
"Sample weights differ by more than a factor of 150, got "
"a minimum value of %e and a maximum value of %e. This "
"might be caused by differing coordinates in the training "
"cubes",
sample_weights.min().values[0],
sample_weights.max().values[0])
else:
sample_weights = None
# Convert index back to MultiIndex
x_data = pd.concat(x_data_for_groups)
x_data.index = pd.MultiIndex.from_tuples(
x_data.index, names=self._get_multiindex_names())
return (x_data, x_cube, sample_weights)
def _extract_y_data(self, datasets, var_type):
"""Extract required y data of type ``var_type`` from ``datasets``."""
allowed_types = ('label', 'prediction_reference')
if var_type not in allowed_types:
raise ValueError(
f"Excepted one of '{allowed_types}' for 'var_type', got "
f"'{var_type}'")
y_data_for_groups = []
# Iterate over datasets
datasets = select_metadata(datasets, var_type=var_type)
if var_type == 'label':
groups = self.group_attributes
else:
groups = [None]
for group_attr in groups:
if group_attr is not None:
logger.info("Loading '%s' data of '%s'", var_type, group_attr)
msg = '' if group_attr is None else f" for '{group_attr}'"
group_datasets = select_metadata(datasets,
group_attribute=group_attr)
dataset = self._check_dataset(group_datasets, var_type, self.label,
msg)
if dataset is None:
return None
cube = self._load_cube(dataset)
text = f"{var_type} '{self.label}'{msg}"
self._check_cube_dimensions(cube, None, text)
cube_data = pd.DataFrame(
self._get_cube_data(cube),
columns=[self.label],
index=self._get_multiindex(cube, group_attr=group_attr),
dtype=self._cfg['dtype'],
)
y_data_for_groups.append(cube_data)
# Convert index back to MultiIndex
y_data = pd.concat(y_data_for_groups)
y_data.index = pd.MultiIndex.from_tuples(
y_data.index, names=self._get_multiindex_names())
return y_data
def _get_broadcasted_cube(self, dataset, ref_cube, text=None):
"""Get broadcasted cube."""
msg = '' if text is None else text
target_shape = ref_cube.shape
cube_to_broadcast = self._load_cube(dataset)
data_to_broadcast = np.ma.filled(cube_to_broadcast.data, np.nan)
logger.info("Broadcasting %s from %s to %s", msg,
data_to_broadcast.shape, target_shape)
broadcasted_data = iris.util.broadcast_to_shape(
data_to_broadcast, target_shape, dataset['broadcast_from'])
new_cube = ref_cube.copy(np.ma.masked_invalid(broadcasted_data))
for idx in dataset['broadcast_from']:
new_coord = new_cube.coord(dimensions=idx)
new_coord.points = cube_to_broadcast.coord(new_coord).points
logger.debug("Added broadcasted %s", msg)
return new_cube
def _get_clf_parameters(self, deep=True):
"""Get parameters of pipeline."""
return self._clf.get_params(deep=deep)
def _get_colors_for_features(self, color_coded=True):
"""Get colors for bars of feature importance plot."""
features = self.features_after_preprocessing
if not color_coded:
colors = dict(zip(features, ['b'] * len(features)))
else:
if not np.array_equal(features, self.features):
raise ValueError(
f"Extracting color-coded feature colors is not possible "
f"since features changed after preprocessing, before: "
f"{self.features}, after: {features}")
colors = {}
corrs = self.data['train'][['x', 'y']].corr()
for feature in features:
corr = corrs.loc[('y', self.label), ('x', feature)]
color = 'r' if corr >= 0.0 else 'b'
colors[feature] = color
return colors
def _get_cv_estimator_kwargs(self, cv_estimator, **kwargs):
"""Get keyword arguments for CV estimator class."""
fit_kwargs = self.fit_kwargs
verbosity = self._get_verbosity_parameters(cv_estimator)
cv_kwargs = {
'n_jobs': self._cfg['n_jobs'],
**verbosity,
}
cv_kwargs.update(kwargs)
logger.info("Using keyword argument(s) %s for class %s", cv_kwargs,
cv_estimator)
if isinstance(cv_kwargs.get('cv'), str):
if cv_kwargs['cv'].lower() == 'loo':
cv_kwargs['cv'] = LeaveOneOut()
if cv_kwargs['cv'].lower() == 'logo':
cv_kwargs['cv'] = self._get_logo_cv_kwargs()['cv']
fit_kwargs['groups'] = self._get_logo_cv_kwargs()['groups']
return (cv_kwargs, fit_kwargs)
def _get_features(self):
"""Extract all features from the ``prediction_input`` datasets."""
logger.debug("Extracting features from 'prediction_input' datasets")
pred_name = list(self._datasets['prediction_input'].keys())[0]
pred_name_str = self._get_name(pred_name)
datasets = self._datasets['prediction_input'][pred_name]
(units,
types) = self._get_features_of_datasets(datasets, 'prediction_input',
pred_name)
# Mark categorical variables
categorical = {feature: False for feature in types}
for tag in self._cfg.get('categorical_features', []):
if tag in categorical:
logger.debug("Treating '%s' as categorical feature", tag)
categorical[tag] = True
else:
raise ValueError(
f"Cannot treat '{tag}' as categorical variable, feature "
f"not found")
# Check if features were found
if not units:
raise ValueError(
f"No features for 'prediction_input' data for prediction "
f"'{pred_name_str}' found")
# Check for wrong options
if self._cfg.get('accept_only_scalar_data'):
if 'broadcasted' in types.values():
raise TypeError(
"The use of 'broadcast_from' is not possible if "
"'accept_only_scalar_data' is given")
if 'coordinate' in types.values():
raise TypeError(
"The use of 'coords_as_features' is not possible if "
"'accept_only_scalar_data' is given")
# Convert to DataFrame and sort it
units = pd.DataFrame.from_dict(units,
orient='index',
columns=['units'])
types = pd.DataFrame.from_dict(types,
orient='index',
columns=['types'])
categorical = pd.DataFrame.from_dict(categorical,
orient='index',
columns=['categorical'])
features = pd.concat([units, types, categorical], axis=1).sort_index()
# Return features
logger.info(
"Found %i feature(s) (defined in 'prediction_input' data for "
"prediction '%s')", len(features.index), pred_name_str)
for feature in features.index:
logger.debug("'%s' with units '%s' and type '%s'", feature,
features.units.loc[feature],
features.types.loc[feature])
return features
def _get_features_of_datasets(self, datasets, var_type, pred_name):
"""Extract all features (with units and types) of given datasets."""
pred_name_str = self._get_name(pred_name)
units = {}
types = {}
cube = None
ref_cube = None
for (tag, datasets_) in group_metadata(datasets, 'tag').items():
dataset = datasets_[0]
cube = self._load_cube(dataset)
if 'broadcast_from' not in dataset:
ref_cube = cube
units[tag] = Unit(dataset['units'])
if 'broadcast_from' in dataset:
types[tag] = 'broadcasted'
else:
types[tag] = 'regular'
# Check if reference cube was given
if ref_cube is None:
if cube is None:
raise ValueError(
f"Expected at least one '{var_type}' dataset for "
f" prediction '{pred_name_str}'")
raise ValueError(
f"Expected at least one '{var_type}' dataset for prediction "
f"'{pred_name_str}' without the option 'broadcast_from'")
# Coordinate features
for coord_name in self._cfg.get('coords_as_features', []):
try:
coord = ref_cube.coord(coord_name)
except iris.exceptions.CoordinateNotFoundError as exc:
raise iris.exceptions.CoordinateNotFoundError(
f"Coordinate '{coord_name}' given in 'coords_as_features' "
f"not found in '{var_type}' data for prediction "
f"'{pred_name_str}'") from exc
units[coord_name] = coord.units
types[coord_name] = 'coordinate'
return (units, types)
def _get_group_attributes(self):
"""Get all group attributes from ``label`` datasets."""
logger.debug("Extracting group attributes from 'label' datasets")
grouped_datasets = group_metadata(self._datasets['label'],
'group_attribute',
sort=True)
group_attributes = list(grouped_datasets.keys())
if group_attributes == [None]:
logger.debug("No group attributes given")
else:
logger.info(
"Found %i group attribute(s) (defined in 'label' data)",
len(group_attributes))
logger.debug(pformat(group_attributes))
return np.array(group_attributes)
def _get_label(self):
"""Extract label from training data."""
logger.debug("Extracting label from training datasets")
grouped_datasets = group_metadata(self._datasets['label'], 'tag')
labels = list(grouped_datasets.keys())
if len(labels) > 1:
raise ValueError(f"Expected unique label tag, got {labels}")
units = Unit(self._datasets['label'][0]['units'])
logger.info(
"Found label '%s' with units '%s' (defined in 'label' "
"data)", labels[0], units)
label = pd.DataFrame.from_dict({labels[0]: units},
orient='index',
columns=['units'])
return label
def _get_lime_feature_importance(self, x_pred):
"""Get most important feature given by LIME."""
logger.info(
"Calculating local feature importance using LIME (this may take "
"a while...)")
x_pred = self._impute_nans(x_pred)
# Most important feature for single input
def _most_important_feature(x_single_pred, explainer, predict_fn):
"""Get most important feature for single input.
Note
----
Ignore warnings about missing feature names here because they are
not used.
"""
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=('X does not have valid feature names, but '
'SimpleImputer was fitted with feature names'),
category=UserWarning,
module='sklearn',
)
explanation = explainer.explain_instance(x_single_pred,
predict_fn)
local_exp = explanation.local_exp[1]
sorted_exp = sorted(local_exp, key=lambda elem: elem[0])
norm = sum(abs(elem[1]) for elem in sorted_exp)
return [abs(elem[1]) / norm for elem in sorted_exp]
# Apply on whole input (using multiple processes)
parallel = Parallel(n_jobs=self._cfg['n_jobs'])
lime_feature_importance = parallel(
[
delayed(_most_important_feature)(
x,
explainer=self._lime_explainer,
predict_fn=self._clf.predict,
) for x in x_pred.values
]
)
lime_feature_importance = np.array(lime_feature_importance,
dtype=self._cfg['dtype'])
lime_feature_importance = np.moveaxis(lime_feature_importance, -1, 0)
lime_feature_importance = dict(zip(self.features,
lime_feature_importance))
return lime_feature_importance
def _get_logo_cv_kwargs(self):
"""Get :class:`sklearn.model_selection.LeaveOneGroupOut` CV."""
if not self._cfg['group_datasets_by_attributes']:
raise ValueError(
"Cannot create 'LeaveOneGroupOut' CV splitter, "
"'group_datasets_by_attributes' was not given during "
"class initialization")
kwargs = {
'cv': LeaveOneGroupOut(),
'groups': self.data['train'].y.index.get_level_values(0).values,
}
return kwargs
def _get_mask(self, x_data, data_type):
"""Get mask for missing features."""
x_regular = x_data[self.features[self.features_types == 'regular']]
# Get points where no regular feature is given
mask = x_regular.isnull().all(axis=1).values
logger.debug(
"Removing %i %s point(s) where all regular features are missing",
mask.sum(), data_type)
# Get other missing points if desired
if self._cfg['imputation_strategy'] == 'remove':
mask = x_data.isnull().any(axis=1).values
logger.debug(
"Removing total %i %s point(s) where at least one feature is "
"missing (because imputation_strategy = 'remove')", mask.sum(),
data_type)
return mask
def _get_multiindex(self, ref_cube, group_attr=None):
"""Get :class:`pandas.MultiIndex` for data."""
group_attr = self._group_attr_to_pandas_index_str(group_attr)
index = pd.MultiIndex.from_product(
[[group_attr], np.arange(ref_cube.data.size)],
names=self._get_multiindex_names(),
)
return index
def _get_multiindex_names(self):
"""Get names for :class:`pandas.MultiIndex` for data."""
return ['-'.join(self._cfg['group_datasets_by_attributes']), 'index']
def _get_plot_feature(self, feature):
"""Get :obj:`str` of selected ``feature`` and respective units."""
units = self._get_plot_units(self.features_units[feature])
return f'{feature} [{units}]'
def _get_plot_label(self):
"""Get :obj:`str` of label and respective units."""
return f'{self.label} [{self._get_plot_units(self.label_units)}]'
def _get_plot_units(self, units):
"""Get plot units version of specified ``units``."""
return self._cfg['plot_units'].get(str(units), str(units))
def _get_prediction_dict(self, pred_name, x_pred, x_err, y_ref,
get_mlr_model_error=None,
get_lime_importance=False,
get_propagated_errors=False, **kwargs):
"""Get prediction output in a dictionary."""
logger.info("Predicting %i point(s)", len(x_pred.index))
y_preds = self._clf.predict(x_pred, **kwargs)
pred_dict = self._prediction_to_dict(y_preds, **kwargs)
# Estimate error of MLR model itself
if get_mlr_model_error:
pred_dict['squared_mlr_model_error_estim'] = (
self._estimate_mlr_model_error(len(x_pred.index),
get_mlr_model_error))
# LIME feature importance
if get_lime_importance:
lime_importance = self._get_lime_feature_importance(x_pred)
for (feature, importance) in lime_importance.items():
pred_dict[f'lime_importance___{feature}'] = importance
# Propagate prediction input errors
if get_propagated_errors:
if x_err is None:
raise ValueError(
f"'save_propagated_errors' is not possible because no "
f"'prediction_input_error' data for prediction "
f"'{self._get_name(pred_name)}' is available")
pred_dict['squared_propagated_input_error'] = (
self._propagate_input_errors(x_pred, x_err))
# Calculate residuals relative to reference if possible
if y_ref is not None:
y_ref = y_ref.values
if y_ref.ndim == 2 and y_ref.shape[1] == 1:
y_ref = np.squeeze(y_ref, axis=1)
pred_dict['residual'] = self._get_residuals(y_ref, pred_dict[None])
# Return dictionary
for pred_type in pred_dict:
if pred_type is not None:
logger.debug("Found additional prediction type '%s'",
pred_type)
logger.info(
"Successfully created prediction array(s) with %i point(s)",
pred_dict[None].size)
return pred_dict
def _get_prediction_dtype(self):
"""Get ``dtype`` of the output of final regressor's ``predict()``."""
x_data = self.data['train'].x.iloc[:1]
y_pred = self._clf.predict(x_data)
return y_pred.values.dtype
def _get_prediction_properties(self):
"""Get important properties of prediction input."""
properties = {}
for attr in ('dataset', 'exp', 'project', 'start_year', 'end_year'):
attrs = list(group_metadata(self._datasets['label'], attr).keys())
properties[attr] = attrs[0]
if len(attrs) > 1:
if attr == 'start_year':
properties[attr] = min(attrs)
elif attr == 'end_year':
properties[attr] = max(attrs)
else:
properties[attr] = '|'.join(sorted(attrs))
logger.debug(
"Attribute '%s' of label data is not unique, got values "
"%s, using '%s' for prediction cubes", attr, attrs,
properties[attr])
return properties
def _get_reference_cube(self, datasets, var_type, text=None):
"""Get reference cube for ``datasets``."""
msg = '' if text is None else text
regular_features = self.features[self.features_types == 'regular']
for tag in regular_features:
dataset = self._check_dataset(datasets, var_type, tag, msg)
if dataset is not None:
ref_cube = self._load_cube(dataset)
logger.debug(
"For var_type '%s'%s, use reference cube with tag '%s'",
var_type, msg, tag)
logger.debug(ref_cube.summary(shorten=True))
return ref_cube
raise ValueError(f"No {var_type} data{msg} without the option "
f"'broadcast_from' found")
def _get_sample_weights(self, data_type):
"""Get sample weights of desired data."""
data_frame = self.data[data_type]
if 'sample_weight' not in data_frame:
return None
return data_frame.sample_weight.squeeze().values
def _get_verbosity_parameters(self, function, boolean=False):
"""Get verbosity parameters for class initialization."""
verbosity_params = {
'silent': {
'debug': False,
'info': False,
'default': True,
},
'verbose': {
'debug': 1,
'info': 0,
'default': 0,
},
'verbosity': {
'debug': 2,
'info': 1,
'default': 0,
},
}
parameters = {}
for (param, log_levels) in verbosity_params.items():
all_params = (
getfullargspec(function).args +
getfullargspec(function).kwonlyargs
)
if param in all_params:
parameters[param] = log_levels.get(self._cfg['log_level'],
log_levels['default'])
if boolean:
parameters[param] = bool(parameters[param])
logger.debug("Set verbosity parameter '%s' of %s to '%s'",
param, str(function), parameters[param])
return parameters
def _get_x_data_for_group(self, datasets, var_type, group_attr=None):
"""Get x data for a group of datasets."""
msg = '' if group_attr is None else f" for '{group_attr}'"
ref_cube = self._get_reference_cube(datasets, var_type, msg)
group_data = pd.DataFrame(
columns=self.features,
index=self._get_multiindex(ref_cube, group_attr=group_attr),
dtype=self._cfg['dtype'],
)
sample_weights = self._calculate_sample_weights(ref_cube,
var_type,
group_attr=group_attr)
# Iterate over all features
for tag in self.features:
if self.features_types[tag] != 'coordinate':
dataset = self._check_dataset(datasets, var_type, tag, msg)
# No dataset found
if dataset is None:
if var_type == 'prediction_input_error':
logger.debug(
"Prediction input error of '%s'%s not available, "
"setting it to 0.0", tag, msg)
new_data = 0.0
else:
new_data = np.nan
# Found exactly one dataset
else:
text = f"{var_type} '{tag}'{msg}"
# Broadcast if necessary
if 'broadcast_from' in dataset:
cube = self._get_broadcasted_cube(
dataset, ref_cube, text)
else:
cube = self._load_cube(dataset)
self._check_cube_dimensions(cube, ref_cube, text)
# Do not accept errors for categorical features
if (var_type == 'prediction_input_error'
and tag in self.categorical_features):
raise ValueError(
f"Specifying prediction input error for "
f"categorical feature '{tag}'{msg} is not "
f"possible")
new_data = self._get_cube_data(cube)
# Load coordinate feature data
else:
new_data = self._get_coordinate_data(ref_cube, var_type, tag,
msg)
# Save data
new_data = np.array(new_data)
if new_data.size != ref_cube.data.size:
new_data = np.broadcast_to(new_data, (ref_cube.data.size,))
group_data[tag] = new_data
# Return data and reference cube
logger.debug("Found %i raw '%s' input data points%s",
len(group_data.index), var_type, msg)
return (group_data, ref_cube, sample_weights)
def _group_by_attributes(self, datasets):
"""Group datasets by specified attributes."""
attributes = self._cfg['group_datasets_by_attributes']
if not attributes:
if self._cfg.get('accept_only_scalar_data'):
attributes = ['dataset']
logger.warning("Automatically set 'group_datasets_by_'"
"attributes' to ['dataset'] because 'accept_"
"only_scalar_data' is given")
else:
for dataset in datasets:
dataset['group_attribute'] = None
return datasets
for dataset in datasets:
dataset['group_attribute'] = mlr.create_alias(dataset, attributes)
logger.info("Grouped feature and label datasets by %s", attributes)
return datasets
def _impute_nans(self, data_frame, copy=True):
"""Impute all nans of a given :class:`pandas.DataFrame`."""
if copy:
data_frame = data_frame.copy()
if 'feature_selection' in self._clf.named_steps:
support = self._clf.named_steps['feature_selection'].support
else:
support = None
if 'imputer' in self._clf.named_steps:
transform = self._clf.named_steps['imputer'].transform
if 'x' in data_frame.columns:
if support is not None:
data_frame.x.values[:, support] = transform(
data_frame.x.iloc[:, support])
data_frame = data_frame.fillna(data_frame.mean())
else:
data_frame.x.values[:] = transform(data_frame.x)
else:
if support is not None:
data_frame.values[:, support] = transform(
data_frame.iloc[:, support])
data_frame = data_frame.fillna(data_frame.mean())
else:
data_frame.values[:] = transform(data_frame)
return data_frame
def _is_ready_for_plotting(self):
"""Check if the class is ready for plotting."""
self._check_fit_status('Plotting')
return True
def _load_classes(self):
"""Populate :attribute:`_classes` and check for errors."""
self._classes['group_attributes'] = self._get_group_attributes()
self._classes['features'] = self._get_features()
self._classes['label'] = self._get_label()
def _load_cube(self, dataset):
"""Load iris cube, check data type and convert units if desired."""
logger.debug("Loading %s", dataset['filename'])
cube = iris.load_cube(dataset['filename'])
# Check dtype
if not np.issubdtype(cube.dtype, np.number):
raise TypeError(
f"Data type of cube loaded from '{dataset['filename']}' is "
f"'{cube.dtype}', at the moment only numeric data is "
f"supported")
# Convert dtypes
cube.data = cube.core_data().astype(self._cfg['dtype'],
casting='same_kind')
for coord in cube.coords():
try:
coord.points = coord.points.astype(self._cfg['dtype'],
casting='same_kind')
except TypeError:
logger.debug(
"Cannot convert dtype of coordinate array '%s' from '%s' "
"to '%s'", coord.name(), coord.points.dtype,
self._cfg['dtype'])
# Convert and check units
if dataset.get('convert_units_to'):
self._convert_units_in_cube(cube, dataset['convert_units_to'])
if not cube.units == Unit(dataset['units']):
raise ValueError(
f"Units of cube '{dataset['filename']}' for "
f"{dataset['var_type']} '{dataset['tag']}' differ from units "
f"given in dataset list, got '{cube.units}' in cube and "
f"'{dataset['units']}' in dataset list")
return cube
def _load_data(self):
"""Load train/test data (features/labels)."""
(x_all, y_all, sample_weights) = self._extract_features_and_labels()
# Normalize and add sample weights if necessary
objs = [x_all, y_all]
keys = ['x', 'y']
if sample_weights is not None:
sample_weights /= sample_weights.mean()
objs.append(sample_weights)
keys.append('sample_weight')
# Save complete data
self._data['all'] = pd.concat(objs, axis=1, keys=keys)
if len(y_all.index) < 2:
raise ValueError(
f"Need at least 2 data points for MLR training, got only "
f"{len(y_all.index)}")
logger.info("Loaded %i input data point(s)", len(y_all.index))
# Split train/test data if desired
test_size = self._cfg['test_size']
if test_size:
(self._data['train'], self._data['test']) = train_test_split(
self._data['all'].copy(),
test_size=test_size,
random_state=self.random_state,
)
self._data['train'] = self._data['train'].sort_index()
self._data['test'] = self._data['test'].sort_index()
for data_type in ('train', 'test'):
if len(self.data[data_type].index) < 2:
raise ValueError(
f"Need at least 2 datasets for '{data_type}' data, "
f"got {len(self.data[data_type].index)}")
logger.info(
"Using %i%% of the input data as test data (%i point(s))",
int(test_size * 100), len(self.data['test'].index))
logger.info("%i point(s) remain(s) for training",
len(self.data['train'].index))
else:
self._data['train'] = self.data['all'].copy()
logger.info("Using all %i input data point(s) for training",
len(y_all.index))
def _load_final_parameters(self):
"""Load parameters for final regressor."""
parameters = self._cfg.get('parameters_final_regressor', {})
# Update parameters
self._update_random_state_parameter(self._CLF_TYPE, parameters)
verbosity_params = self._get_verbosity_parameters(self._CLF_TYPE)
for (param, verbosity) in verbosity_params.items():
parameters.setdefault(param, verbosity)
logger.debug("Using parameter(s) for final regressor: %s", parameters)
return parameters
def _load_input_datasets(self, input_datasets):
"""Load input datasets."""
input_datasets = deepcopy(input_datasets)
# Catch invalid var_types
if not mlr.datasets_have_mlr_attributes(
input_datasets, log_level='error', mode='only_var_type'):
raise ValueError("Data with invalid 'var_type' given")
# Training datasets
feature_datasets = select_metadata(input_datasets, var_type='feature')
label_datasets = select_metadata(input_datasets, var_type='label')
# Prediction datasets
pred_in_datasets = select_metadata(input_datasets,
var_type='prediction_input')
pred_in_err_datasets = select_metadata(
input_datasets, var_type='prediction_input_error')
pred_ref_datasets = select_metadata(input_datasets,
var_type='prediction_reference')
# Check datasets
msg = ("At least one '{}' dataset does not have necessary MLR "
"attributes")
datasets_to_check = {
'feature': feature_datasets,
'label': label_datasets,
'prediction_input': pred_in_datasets,
'prediction_input_error': pred_in_err_datasets,
'prediction_reference': pred_ref_datasets,
}
for (label, datasets) in datasets_to_check.items():
if not mlr.datasets_have_mlr_attributes(datasets,
log_level='error'):
raise ValueError(msg.format(label))
# Check if data was found
if not feature_datasets:
raise ValueError("No 'feature' data found")
if not label_datasets:
raise ValueError("No 'label' data found")
if not pred_in_datasets:
raise ValueError("No 'prediction_input' data found")
# Convert units
self._convert_units_in_metadata(feature_datasets)
self._convert_units_in_metadata(label_datasets)
self._convert_units_in_metadata(pred_in_datasets)
self._convert_units_in_metadata(pred_in_err_datasets)
self._convert_units_in_metadata(pred_ref_datasets)
# Save datasets
logger.info(
"Found %i 'feature' dataset(s), %i 'label' dataset(s), %i "
"'prediction_input' dataset(s), %i 'prediction_input_error' "
"dataset(s) and %i 'prediction_reference' datasets(s)",
len(feature_datasets), len(label_datasets), len(pred_in_datasets),
len(pred_in_err_datasets), len(pred_ref_datasets))
labeled_datasets = {
'Feature': feature_datasets,
'Label': label_datasets,
'Prediction input': pred_in_datasets,
'Prediction input error': pred_in_err_datasets,
'Prediction output': pred_ref_datasets,
}
for (msg, datasets) in labeled_datasets.items():
logger.debug("%s datasets:", msg)
logger.debug(pformat([d['filename'] for d in datasets]))
self._datasets['feature'] = self._group_by_attributes(feature_datasets)
self._datasets['label'] = self._group_by_attributes(label_datasets)
self._datasets['prediction_input'] = self._group_prediction_datasets(
pred_in_datasets)
self._datasets['prediction_input_error'] = (
self._group_prediction_datasets(pred_in_err_datasets))
self._datasets['prediction_reference'] = (
self._group_prediction_datasets(pred_ref_datasets))
def _load_lime_explainer(self):
"""Load :class:`lime.lime_tabular.LimeTabularExplainer`."""
x_train = self.get_x_array('train', impute_nans=True)
y_train = self.get_y_array('train', impute_nans=True)
verbosity = self._get_verbosity_parameters(LimeTabularExplainer,
boolean=True)
verbosity = {param: False for param in verbosity}
categorical_features_idx = [
int(np.where(self.features == tag)[0][0])
for tag in self.categorical_features
]
self._lime_explainer = LimeTabularExplainer(
x_train,
mode='regression',
training_labels=y_train,
feature_names=self.features,
categorical_features=categorical_features_idx,
discretize_continuous=False,
sample_around_instance=True,
random_state=self.random_state,
**verbosity,
)
logger.debug(
"Loaded %s with new training data", str(LimeTabularExplainer))
def _mask_prediction_array(self, y_pred, ref_cube):
"""Apply mask of reference cube to prediction array."""
mask = np.ma.getmaskarray(ref_cube.data).ravel()
if y_pred.ndim == 1 and y_pred.shape[0] != mask.shape[0]:
new_y_pred = np.empty(mask.shape[0], dtype=self._cfg['dtype'])
new_y_pred[mask] = np.nan
new_y_pred[~mask] = y_pred
else:
new_y_pred = y_pred
return np.ma.masked_invalid(new_y_pred)
def _plot_feature_importance(self, feature_importance_dict, colors,
plot_path):
"""Plot feature importance."""
logger.info("Plotting feature importance")
(_, axes) = plt.subplots()
# Sort data and get position of bars
features = np.array(list(feature_importance_dict.keys()))
feature_importances = np.array(list(feature_importance_dict.values()))
sorted_idx = np.argsort(feature_importances)
pos = np.arange(sorted_idx.shape[0]) + 0.5
# Write cube with feature importance for provenance tracking
ancestors = self.get_ancestors(prediction_names=[])
cube = mlr.get_1d_cube(
features,
feature_importances,
x_kwargs={'var_name': 'feature',
'long_name': 'Feature name',
'units': 'no unit'},
y_kwargs={'var_name': 'feature_importance',
'long_name': 'Relative Feature Importance',
'units': '1',
'attributes': {'project': '', 'dataset': ''}},
)
# Plot
for (idx, importance) in enumerate(feature_importances[sorted_idx]):
feature = features[sorted_idx][idx]
axes.barh(pos[idx], importance, align='center',
color=colors[feature])
# Plot appearance
axes.tick_params(axis='y', which='minor', left=False, right=False)
axes.tick_params(axis='y', which='major', left=True, right=False)
title = f"Global feature importance ({self._cfg['mlr_model_name']})"
axes.set_title(title)
axes.set_xlabel('Relative Importance')
axes.set_yticks(pos)
axes.set_yticklabels(features[sorted_idx])
# Save plot and provenance
plt.savefig(plot_path, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path)
self._write_plot_provenance(cube, plot_path, ancestors=ancestors,
caption=title + '.', plot_types=['bar'])
# Save additional plot with logarithmic X axis
axes.set_xscale('log')
axes.xaxis.set_major_formatter(ScalarFormatter())
ext = os.path.splitext(plot_path)[1]
plot_path_log = plot_path.replace(ext, f'_log{ext}')
plt.savefig(plot_path_log, **self._cfg['savefig_kwargs'])
logger.info("Wrote %s", plot_path_log)
self._write_plot_provenance(cube, plot_path_log, ancestors=ancestors,
caption=title + '.', plot_types=['bar'])
plt.close()
def _prediction_to_dict(self, pred_out, **kwargs):
"""Convert output of final regressor's ``predict()`` to :obj:`dict`."""
if not isinstance(pred_out, (list, tuple)):
pred_out = [pred_out]
idx_to_name = {0: None}
if 'return_var' in kwargs:
idx_to_name[1] = 'var'
elif 'return_cov' in kwargs:
idx_to_name[1] = 'cov'
pred_dict = {}
for (idx, pred) in enumerate(pred_out):
pred = pred.astype(self._cfg['dtype'], casting='same_kind')
if pred.ndim == 2 and pred.shape[1] == 1:
logger.warning(
"Prediction output is 2D and length of second axis is 1, "
"squeezing second axis")
pred = np.squeeze(pred, axis=1)
pred_dict[idx_to_name.get(idx, idx)] = pred
return pred_dict
def _pred_type_to_metadata(self, pred_type, cube):
"""Get correct :mod:`iris.cube.CubeMetadata` of prediction cube."""
standard_name = cube.standard_name
var_name = cube.var_name
long_name = cube.long_name
units = cube.units
attributes = cube.attributes
suffix = '' if pred_type is None else f'_{pred_type}'
error_types = {
'var': ' (variance)',
'cov': ' (covariance)',
'squared_mlr_model_error_estim': (' (squared MLR model error '
'estimation using hold-out test '
'data set)'),
'squared_propagated_input_error': (' (squared propagated error of '
'prediction input estimated by '
'LIME)'),
}
if pred_type is None:
attributes['var_type'] = 'prediction_output'
elif isinstance(pred_type, int):
var_name += f'_{pred_type:d}'
long_name += f' {pred_type:d}'
logger.warning("Got unknown prediction type with index %i",
pred_type)
attributes['var_type'] = 'prediction_output_misc'
elif pred_type in error_types:
var_name += suffix
long_name += error_types[pred_type]
units = mlr.units_power(cube.units, 2)
attributes['var_type'] = 'prediction_output_error'
attributes['squared'] = 1
elif 'lime_importance___' in pred_type:
standard_name = None
feature = pred_type.replace('lime_importance___', '')
var_name = f'importance_of_feature_{feature}'
long_name = (f'Local importance of feature {feature} for '
f'predicting {self.label} given by LIME')
units = Unit('1')
attributes['var_type'] = 'prediction_output_misc'
elif pred_type == 'residual':
var_name += suffix
long_name += ' (residual)'
attributes['residual'] = 'true minus predicted values'
attributes['var_type'] = 'prediction_residual'
else:
raise ValueError(f"Got unknown prediction type '{pred_type}'")
return iris.cube.CubeMetadata(
standard_name=standard_name,
long_name=long_name,
var_name=var_name,
units=units,
attributes=attributes,
cell_methods=cube.cell_methods,
)
def _print_metrics(self, regression_metrics, data_type):
"""Print regression metrics."""
if data_type not in self.data:
return
logger.info("Evaluating regression metrics for %s data", data_type)
x_data = self.data[data_type].x
y_true = self.get_y_array(data_type)
y_pred = self._clf.predict(x_data)
sample_weights = self._get_sample_weights(data_type)
for metric in regression_metrics:
metric_function = getattr(metrics, metric)
value = metric_function(y_true, y_pred)
if 'squared' in metric:
value = np.sqrt(value)
metric = f'root_{metric}'
logger.info("%s: %s", metric, value)
if sample_weights is None:
return
for metric in regression_metrics:
metric_function = getattr(metrics, metric)
value = metric_function(y_true, y_pred,
sample_weight=sample_weights)
if 'squared' in metric:
value = np.sqrt(value)
metric = f'root_{metric}'
logger.info("Weighted %s: %s", metric, value)
def _propagate_input_errors(self, x_pred, x_err):
"""Propagate errors from prediction input."""
logger.info(
"Propagating prediction input errors using LIME (this may take a "
"while...)")
if 'feature_selection' in self._clf.named_steps:
logger.warning(
"Propagating input errors might not work correctly when a "
"'feature_selection' step is present (usually because of "
"calling rfecv())")
x_pred = self._impute_nans(x_pred)
# Propagated error for single input
def _propagated_error(x_single_pred, x_single_err, explainer,
predict_fn, features, categorical_features):
"""Get propagated prediction input error for single input.
Note
----
Ignore warnings about missing feature names here because they are
not used.
"""
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=('X does not have valid feature names, but '
'SimpleImputer was fitted with feature names'),
category=UserWarning,
module='sklearn',
)
exp = explainer.explain_instance(x_single_pred, predict_fn)
x_single_err = np.nan_to_num(x_single_err)
x_err_scaled = x_single_err / explainer.scaler.scale_
squared_error = 0.0
for (idx, coef) in exp.local_exp[1]:
if features[idx] in categorical_features:
continue
squared_error += (x_err_scaled[idx] * coef)**2
return squared_error
# Apply on whole input (using multiple processes)
parallel = Parallel(n_jobs=self._cfg['n_jobs'])
errors = parallel(
[delayed(_propagated_error)(
x, x_e, explainer=self._lime_explainer,
predict_fn=self._clf.predict,
features=self.features,
categorical_features=self.categorical_features,
) for (x, x_e) in zip(x_pred.values, x_err.values)]
)
return np.array(errors, dtype=self._cfg['dtype'])
def _remove_missing_features(self, x_data, y_data, sample_weights):
"""Remove missing values in the features data (if desired)."""
mask = self._get_mask(x_data, 'training')
x_data = x_data[~mask]
y_data = y_data[~mask]
if sample_weights is not None:
sample_weights = sample_weights[~mask]
diff = mask.sum()
if diff:
msg = ('Removed %i training point(s) where features were '
'missing')
if self._cfg.get('accept_only_scalar_data'):
removed_groups = self.group_attributes[mask]
msg += f' ({removed_groups})'
self._classes['group_attributes'] = (
self.group_attributes[~mask])
logger.info(msg, diff)
return (x_data, y_data, sample_weights)
def _remove_missing_pred_input(self, x_pred, x_err=None, y_ref=None):
"""Remove missing values in the prediction input data."""
mask = self._get_mask(x_pred, 'prediction input')
x_pred = x_pred[~mask]
if x_err is not None:
x_err = x_err[~mask]
if y_ref is not None:
y_ref = y_ref[~mask]
diff = mask.sum()
if diff:
logger.info(
"Removed %i prediction input point(s) where features were "
"missing", diff)
return (x_pred, x_err, y_ref, mask)
def _save_prediction_cubes(self, pred_dict, pred_name, x_cube):
"""Save (multi-dimensional) prediction output."""
logger.debug("Creating output cubes")
for (pred_type, y_pred) in pred_dict.items():
y_pred = self._mask_prediction_array(y_pred, x_cube)
if y_pred.size == np.prod(x_cube.shape, dtype=np.int64):
pred_cube = x_cube.copy(y_pred.reshape(x_cube.shape))
else:
dim_coords = []
for (dim_idx, dim_size) in enumerate(y_pred.shape):
dim_coords.append((iris.coords.DimCoord(
np.arange(dim_size, dtype=np.float64),
long_name=f'MLR prediction index {dim_idx}',
var_name=f'idx_{dim_idx}'), dim_idx))
pred_cube = iris.cube.Cube(y_pred,
dim_coords_and_dims=dim_coords)
new_path = self._set_prediction_cube_attributes(
pred_cube, pred_type, pred_name=pred_name)
io.iris_save(pred_cube, new_path)
# Save provenance
ancestors = self.get_ancestors(
prediction_names=[pred_name],
prediction_reference=pred_type == 'residual')
record = {
'ancestors': ancestors,
'authors': ['schlund_manuel'],
'caption': (f"{pred_cube.long_name} of MLR model "
f"{self._cfg['mlr_model_name']} for prediction "
f"{pred_name}."),
'references': ['schlund20jgr'],
}
with ProvenanceLogger(self._cfg) as provenance_logger:
provenance_logger.log(new_path, record)
def _save_csv_file(self, data_type, filename, pred_name=None):
"""Save CSV file."""
if data_type not in self.data:
return
if data_type == 'pred':
csv_data = self.data[data_type][pred_name]
else:
csv_data = self.data[data_type]
# Filename and path
if filename is None:
if data_type == 'pred':
filename = '{data_type}_{pred_name}.csv'
format_kwargs = {
'data_type': data_type,
'pred_name': self._get_name(pred_name),
}
else:
filename = '{data_type}.csv'
format_kwargs = {'data_type': data_type}
filename = filename.format(**format_kwargs)
path = os.path.join(self._cfg['mlr_work_dir'], filename)
# Save file
csv_data.to_csv(path, na_rep='nan')
logger.info("Wrote %s", path)
def _set_default_settings(self):
"""Set default (non-``False``) keyword arguments."""
self._cfg.setdefault('weighted_samples', {})
self._cfg.setdefault('cache_intermediate_results', True)
self._cfg.setdefault('dtype', 'float64')
self._cfg.setdefault('fit_kwargs', {})
self._cfg.setdefault('group_datasets_by_attributes', [])
self._cfg.setdefault('imputation_strategy', 'remove')
self._cfg.setdefault('log_level', 'info')
self._cfg.setdefault('mlr_model_name', f'{self._CLF_TYPE} model')
self._cfg.setdefault('n_jobs', 1)
self._cfg.setdefault('output_file_type', 'png')
self._cfg.setdefault('parameters', {})
self._cfg.setdefault('plot_dir',
os.path.expanduser(os.path.join('~', 'plots')))
self._cfg.setdefault('plot_units', {})
self._cfg.setdefault('random_state', None)
self._cfg.setdefault('savefig_kwargs', {
'bbox_inches': 'tight',
'dpi': 300,
'orientation': 'landscape',
})
self._cfg.setdefault('standardize_data', True)
self._cfg.setdefault('sub_dir', '')
self._cfg.setdefault('test_size', 0.25)
self._cfg.setdefault('work_dir',
os.path.expanduser(os.path.join('~', 'work')))
logger.info("Using imputation strategy '%s'",
self._cfg['imputation_strategy'])
if self._cfg['fit_kwargs']:
logger.info(
"Using additional keyword argument(s) %s for fit() function",
self._cfg['fit_kwargs'])
def _set_prediction_cube_attributes(self, cube, pred_type, pred_name=None):
"""Set the attributes of the prediction cube."""
cube.cell_methods = None
cube.attributes = {
'description': 'MLR model prediction',
'mlr_model_name': self._cfg['mlr_model_name'],
'mlr_model_type': self.mlr_model_type,
'final_regressor': str(self._CLF_TYPE),
'prediction_name': self._get_name(pred_name),
'tag': self.label,
}
cube.attributes.update(self._get_prediction_properties())
for (key, val) in self.parameters.items():
cube.attributes[key] = str(val)
cube.attributes['mlr_parameters'] = list(self.parameters.keys())
label_cube = self._load_cube(self._datasets['label'][0])
for attr in ('standard_name', 'var_name', 'long_name', 'units'):
setattr(cube, attr, getattr(label_cube, attr))
# Modify cube metadata depending on prediction type
cube.metadata = self._pred_type_to_metadata(pred_type, cube)
# Get new path
suffix = '' if pred_type is None else f'_{pred_type}'
pred_str = f'_for_prediction_{self._get_name(pred_name)}'
sub_str = ('' if self._cfg['sub_dir'] == '' else
f"_of_group_{self._cfg['sub_dir']}")
filename = (f'{self.mlr_model_type}_{self.label}_prediction{suffix}'
f'{pred_str}{sub_str}.nc')
new_path = os.path.join(self._cfg['mlr_work_dir'], filename)
cube.attributes['filename'] = new_path
return new_path
def _update_fit_kwargs(self, fit_kwargs):
"""Check and update fit kwargs."""
new_fit_kwargs = {}
# Sort out wrong fit kwargs
for (param_name, param_val) in fit_kwargs.items():
step = param_name.split('__')[0]
if step in self._clf.named_steps:
new_fit_kwargs[param_name] = param_val
else:
raise ValueError(
f"Got invalid pipeline step '{step}' in fit parameter "
f"'{param_name}'")
# Add sample weights if possible
allowed_fit_kwargs = (
getfullargspec(self._CLF_TYPE.fit).args +
getfullargspec(self._CLF_TYPE.fit).kwonlyargs
)
for kwarg in ('sample_weight', 'sample_weights'):
if kwarg not in allowed_fit_kwargs:
continue
long_kwarg = f'{self._clf.steps[-1][0]}__regressor__{kwarg}'
sample_weights = self._get_sample_weights('train')
new_fit_kwargs[long_kwarg] = sample_weights
if sample_weights is not None:
logger.debug(
"Updated keyword arguments of final regressor's fit() "
"function with '%s'", kwarg)
break
return new_fit_kwargs
def _update_random_state_parameter(self, function, parameters):
"""Update ``random_state`` parameter if necessary."""
all_params = (
getfullargspec(function).args +
getfullargspec(function).kwonlyargs
)
if 'random_state' in all_params:
if 'random_state' in parameters:
logger.warning(
"Parameter 'random_state=%s' is ignored for '%s', use the "
"'random_state' option to initialize the MLRModel class "
"instead",
parameters['random_state'],
self._CLF_TYPE,
)
parameters['random_state'] = self.random_state
logger.debug(
"Updated 'random_state' parameter of '%s' to '%s'",
self._CLF_TYPE,
self.random_state,
)
return parameters
def _write_plot_provenance(self, cube, plot_path, **additional_info):
"""Write provenance information for plots."""
netcdf_path = mlr.get_new_path(self._cfg, plot_path)
io.iris_save(cube, netcdf_path)
record = {
'authors': ['schlund_manuel'],
'references': ['schlund20jgr'],
**additional_info,
}
with ProvenanceLogger(self._cfg) as provenance_logger:
provenance_logger.log(netcdf_path, record)
provenance_logger.log(plot_path, record)
@staticmethod
def _convert_units_in_cube(cube, new_units, power=None, text=None):
"""Convert units of cube if possible."""
msg = '' if text is None else f' of {text}'
if isinstance(new_units, str):
new_units = Unit(new_units)
if power:
logger.debug("Raising target units of cube '%s' by power of %i",
cube.summary(shorten=True), power)
new_units = mlr.units_power(new_units, power)
logger.debug("Converting units%s from '%s' to '%s'", msg, cube.units,
new_units)
try:
cube.convert_units(new_units)
except ValueError as exc:
raise ValueError(
f"Cannot convert units{msg} from '{cube.units}' to "
f"'{new_units}'") from exc
@staticmethod
def _convert_units_in_metadata(datasets):
"""Convert units of datasets if desired."""
for dataset in datasets:
if not dataset.get('convert_units_to'):
continue
units_from = Unit(dataset['units'])
units_to = Unit(dataset['convert_units_to'])
try:
units_from.convert(0.0, units_to)
except ValueError as exc:
raise ValueError(
f"Cannot convert units of {dataset['var_type']} "
f"'{dataset['tag']}' from '{units_from}' to "
f"'{units_to}'") from exc
dataset['units'] = dataset['convert_units_to']
@staticmethod
def _get_centralized_bins(array, n_bins=None, ref=0.0):
"""Get bins for array centralized around a reference value."""
diff = max([ref - array.min(), array.max() - ref])
if n_bins is None:
auto_bins = np.histogram_bin_edges(array)
if len(auto_bins) < 2:
raise ValueError(
f"Expected at least 2 bins, got {len(auto_bins):d}")
delta = auto_bins[1] - auto_bins[0]
n_bins = 2.0 * diff / delta
if not n_bins % 2:
n_bins += 1
return np.linspace(ref - diff, ref + diff, n_bins + 1)
@staticmethod
def _get_coordinate_data(ref_cube, var_type, tag, text=None):
"""Get coordinate variable ``ref_cube`` which can be used as x data."""
msg = '' if text is None else text
if var_type == 'prediction_input_error':
logger.debug(
"Prediction input error of coordinate feature '%s'%s is set "
"to 0.0", tag, msg)
return 0.0
try:
coord = ref_cube.coord(tag)
except iris.exceptions.CoordinateNotFoundError as exc:
raise iris.exceptions.CoordinateNotFoundError(
f"Coordinate '{tag}' given in 'coords_as_features' not found "
f"in reference cube for '{var_type}'{msg}") from exc
coord_array = np.ma.filled(coord.points, np.nan)
coord_dims = ref_cube.coord_dims(coord)
if coord_dims == ():
logger.warning(
"Coordinate '%s' is scalar, including it as feature does not "
"add any information to the model (array is constant)", tag)
coord_array = np.broadcast_to(coord_array, ref_cube.shape)
else:
coord_array = iris.util.broadcast_to_shape(coord_array,
ref_cube.shape,
coord_dims)
logger.debug("Added %s coordinate '%s'%s", var_type, tag, msg)
return coord_array.ravel()
@staticmethod
def _get_cube_data(cube):
"""Get data from cube."""
cube_data = np.ma.filled(cube.data, np.nan)
return cube_data.ravel()
@staticmethod
def _get_data_type_coord(data_types):
"""Get :class:`iris.coords.AuxCoord` ``data_type``."""
aux_coord = iris.coords.AuxCoord(data_types,
var_name='data_type',
long_name='Data type',
units='no unit')
return aux_coord
@staticmethod
def _get_name(string):
"""Convert ``None`` to :obj:`str` if necessary."""
return 'unnamed' if string is None else string
@staticmethod
def _get_plot_kwargs(data_type, plot_type=None):
"""Get plot kwargs for a data type."""
plot_kwargs = {
'all': {
'color': 'r',
'label': 'All data',
},
'train': {
'color': 'b',
'label': 'Train data',
},
'test': {
'color': 'g',
'label': 'Test data',
},
}
allowed_data_types = list(plot_kwargs.keys())
if data_type not in allowed_data_types:
raise NotImplementedError(
f"Plot kwargs for data type '{data_type}' not implemented "
f"yet, only {allowed_data_types} are supported yet")
kwargs = deepcopy(plot_kwargs[data_type])
if plot_type == 'scatter':
kwargs.update({'alpha': 0.5, 'marker': 'o', 's': 6})
return kwargs
@staticmethod
def _get_residuals(y_true, y_pred):
"""Calculate residuals (true minus predicted values)."""
logger.debug("Calculating residuals")
return y_true - y_pred
@staticmethod
def _group_attr_to_pandas_index_str(group_attr):
"""Convert group attribute to :obj:`str` used in pandas index."""
if group_attr is None:
return 'none'
return group_attr
@staticmethod
def _group_prediction_datasets(datasets):
"""Group prediction datasets (use ``prediction_name`` key)."""
for dataset in datasets:
dataset['group_attribute'] = None
return group_metadata(datasets, 'prediction_name')
@staticmethod
def _remove_missing_labels(x_data, y_data, sample_weights):
"""Remove missing values in the label data."""
mask = y_data.isnull().values
x_data = x_data[~mask]
y_data = y_data[~mask]
if sample_weights is not None:
sample_weights = sample_weights[~mask]
diff = mask.sum()
if diff:
logger.info(
"Removed %i training point(s) where labels were missing", diff)
return (x_data, y_data, sample_weights)
@staticmethod
def _set_axis_lim_symmetric(axes, axis):
"""Make axis range of plot symmetric around 0."""
if axis == 'x':
getter = getattr(axes, 'get_xlim')
setter = getattr(axes, 'set_xlim')
elif axis == 'y':
getter = getattr(axes, 'get_ylim')
setter = getattr(axes, 'set_ylim')
else:
raise ValueError(f"Expected 'x' or 'y' for axis, got '{axis}'")
maximum = np.max(np.abs(getter()))
setter([-maximum, maximum])