"""Custom expansions of :mod:`sklearn` functionalities.
Note
----
This module provides custom expansions of some :mod:`sklearn` classes and
functions which are necessary to fit the purposes for the desired
functionalities of the :ref:`MLR module <api.esmvaltool.diag_scripts.mlr>`. As
long-term goal we would like to include these functionalities to the
:mod:`sklearn` package since we believe these additions might be helpful for
everyone. This module serves as interim solution. To ensure that all features
are properly working this module is also covered by tests, which will also be
expanded in the future.
"""
# pylint: disable=arguments-differ
# pylint: disable=attribute-defined-outside-init
# pylint: disable=protected-access
# pylint: disable=super-init-not-called
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
import itertools
import logging
import numbers
import os
import time
import warnings
from contextlib import suppress
from copy import deepcopy
from inspect import getfullargspec
from traceback import format_exception_only
import numpy as np
from joblib import Parallel, delayed, effective_n_jobs
from sklearn.base import BaseEstimator, clone, is_classifier
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.exceptions import FitFailedWarning, NotFittedError
from sklearn.feature_selection import RFE
from sklearn.feature_selection._base import SelectorMixin
from sklearn.linear_model import LinearRegression
from sklearn.metrics import check_scoring
from sklearn.metrics._scorer import (
_check_multimetric_scoring,
_MultimetricScorer,
)
from sklearn.model_selection import check_cv
from sklearn.model_selection._validation import _aggregate_score_dicts, _score
from sklearn.pipeline import Pipeline
from sklearn.utils import (
_message_with_time,
check_array,
check_X_y,
indexable,
safe_sqr,
)
from sklearn.utils.metaestimators import _safe_split, if_delegate_has_method
from sklearn.utils.validation import _check_fit_params, check_is_fitted
from esmvaltool.diag_scripts import mlr
logger = logging.getLogger(os.path.basename(__file__))
def _fit_and_score_weighted(estimator, x_data, y_data, scorer, train, test,
verbose, parameters, fit_params,
error_score=np.nan, sample_weights=None):
"""Expand :func:`sklearn.model_selection._validation._fit_and_score`."""
if verbose > 1:
if parameters is None:
msg = ''
else:
msg = '%s' % (', '.join('%s=%s' % (k, v)
for k, v in parameters.items()))
print("[CV] %s %s" % (msg, (64 - len(msg)) * '.'))
# Adjust length of sample weights
fit_params = fit_params if fit_params is not None else {}
fit_params = _check_fit_params(x_data, fit_params, train)
if parameters is not None:
# clone after setting parameters in case any parameters
# are estimators (like pipeline steps)
# because pipeline doesn't clone steps in fit
cloned_parameters = {}
for (key, val) in parameters.items():
cloned_parameters[key] = clone(val, safe=False)
estimator = estimator.set_params(**cloned_parameters)
start_time = time.time()
x_train, y_train = _safe_split(estimator, x_data, y_data, train)
x_test, y_test = _safe_split(estimator, x_data, y_data, test, train)
if sample_weights is not None:
sample_weights_test = sample_weights[test]
else:
sample_weights_test = None
try:
if y_train is None:
estimator.fit(x_train, **fit_params)
else:
estimator.fit(x_train, y_train, **fit_params)
except Exception as exc:
# Note fit time as time until error
fit_time = time.time() - start_time
score_time = 0.0
if error_score == 'raise':
raise
if isinstance(error_score, numbers.Number):
if isinstance(scorer, dict):
test_scores = {name: error_score for name in scorer}
else:
test_scores = error_score
warnings.warn("Estimator fit failed. The score on this train-test"
" partition for these parameters will be set to %f. "
"Details: \n%s" %
(error_score, format_exception_only(type(exc),
exc)[0]),
FitFailedWarning)
else:
raise ValueError("error_score must be the string 'raise' or a"
" numeric value. (Hint: if using 'raise', please"
" make sure that it has been spelled correctly.)")
else:
fit_time = time.time() - start_time
test_scores = _score_weighted(estimator, x_test, y_test, scorer,
sample_weights=sample_weights_test)
score_time = time.time() - start_time - fit_time
if verbose > 2:
if isinstance(test_scores, dict):
for scorer_name in sorted(test_scores):
msg += ", %s=" % scorer_name
msg += "%.3f" % test_scores[scorer_name]
else:
msg += ", score="
msg += "%.3f" % test_scores
if verbose > 1:
total_time = score_time + fit_time
print(_message_with_time('CV', msg, total_time))
return [test_scores]
def _get_fit_parameters(fit_kwargs, steps, cls):
"""Retrieve fit parameters from ``fit_kwargs``."""
params = {name: {} for (name, step) in steps if step is not None}
step_names = list(params.keys())
for (param_name, param_val) in fit_kwargs.items():
param_split = param_name.split('__', 1)
if len(param_split) != 2:
raise ValueError(
f"Fit parameters for {cls} have to be given in the form "
f"'s__p', where 's' is the name of the step and 'p' the name "
f"of the parameter, got '{param_name}'")
try:
params[param_split[0]][param_split[1]] = param_val
except KeyError:
raise ValueError(
f"Expected one of {step_names} for step of fit parameter, got "
f"'{param_split[0]}' for parameter '{param_name}'")
return params
def _map_features(features, support):
"""Map old features indices to new ones using boolean mask."""
feature_mapping = {}
new_idx = 0
for (old_idx, supported) in enumerate(support):
if supported:
val = new_idx
new_idx += 1
else:
val = None
feature_mapping[old_idx] = val
new_features = []
for feature in features:
new_feature = feature_mapping[feature]
if new_feature is not None:
new_features.append(new_feature)
return new_features
def _rfe_single_fit(rfe, estimator, x_data, y_data, train, test, scorer,
**fit_kwargs):
"""Return the score for a fit across one fold."""
(x_train, y_train) = _safe_split(estimator, x_data, y_data, train)
(x_test, y_test) = _safe_split(estimator, x_data, y_data, test, train)
(fit_kwargs_train, _) = _split_fit_kwargs(fit_kwargs, train, test)
def step_score(estimator, features):
"""Score for a single step in the recursive feature elimination."""
return _score(estimator, x_test[:, features], y_test, scorer)
return rfe._fit(x_train, y_train, step_score=step_score,
**fit_kwargs_train).scores_
def _score_weighted(estimator, x_test, y_test, scorer, sample_weights=None):
"""Expand :func:`sklearn.model_selection._validation._score`."""
if isinstance(scorer, dict):
# will cache method calls if needed. scorer() returns a dict
scorer = _MultimetricScorer(**scorer)
if y_test is None:
scores = scorer(estimator, x_test, sample_weight=sample_weights)
else:
scores = scorer(estimator, x_test, y_test,
sample_weight=sample_weights)
error_msg = ("scoring must return a number, got %s (%s) "
"instead. (scorer=%s)")
if isinstance(scores, dict):
for name, score in scores.items():
if hasattr(score, 'item'):
with suppress(ValueError):
# e.g. unwrap memmapped scalars
score = score.item()
if not isinstance(score, numbers.Number):
raise ValueError(error_msg % (score, type(score), name))
scores[name] = score
else: # scalar
if hasattr(scores, 'item'):
with suppress(ValueError):
# e.g. unwrap memmapped scalars
scores = scores.item()
if not isinstance(scores, numbers.Number):
raise ValueError(error_msg % (scores, type(scores), scorer))
return scores
def _split_fit_kwargs(fit_kwargs, train_idx, test_idx):
"""Get split fit kwargs for single CV step."""
fit_kwargs_train = {}
fit_kwargs_test = {}
for (key, val) in fit_kwargs.items():
if 'sample_weight' in key and 'sample_weight_eval_set' not in key:
fit_kwargs_train[key] = deepcopy(val)[train_idx]
fit_kwargs_test[key] = deepcopy(val)[test_idx]
else:
fit_kwargs_train[key] = deepcopy(val)
fit_kwargs_test[key] = deepcopy(val)
return (fit_kwargs_train, fit_kwargs_test)
def _update_transformers_param(estimator, support):
"""Update ``transformers`` argument of ``ColumnTransformer`` steps."""
all_params = estimator.get_params()
params = []
for key in all_params:
if key.endswith('transformers'):
params.append(key)
if isinstance(estimator, (Pipeline, AdvancedPipeline)):
step = estimator.named_steps[key.split('__')[0]]
if not isinstance(step, ColumnTransformer):
raise TypeError(
f"Found 'transformers' parameter ('{key}'), but the "
f"corresponding pipeline step is not a "
f"ColumnTransformer (got '{type(step)}')")
else:
raise TypeError(
f"Found 'transformers' parameter ('{key}'), but the "
f"corresponding estimator is not a Pipeline or "
f"AdvancedPipeline")
new_params = {}
for param in params:
new_transformers = []
for transformer in all_params[param]:
new_columns = _map_features(transformer[2], support)
new_transformers.append(
(transformer[0], transformer[1], new_columns))
new_params[param] = new_transformers
estimator.set_params(**new_params)
[docs]def cross_val_score_weighted(estimator, x_data, y_data=None, groups=None,
scoring=None, cv=None, n_jobs=None, verbose=0,
fit_params=None, pre_dispatch='2*n_jobs',
error_score=np.nan, sample_weights=None):
"""Expand :func:`sklearn.model_selection.cross_val_score`."""
scorer = check_scoring(estimator, scoring=scoring)
scorer_name = 'score'
scoring = {scorer_name: scorer}
x_data, y_data, groups = indexable(x_data, y_data, groups)
cv = check_cv(cv, y_data, classifier=is_classifier(estimator))
scorers, _ = _check_multimetric_scoring(estimator, scoring=scoring)
# We clone the estimator to make sure that all the folds are
# independent, and that it is pickle-able.
parallel = Parallel(n_jobs=n_jobs, verbose=verbose,
pre_dispatch=pre_dispatch)
scores = parallel(
delayed(_fit_and_score_weighted)(
clone(estimator), x_data, y_data, scorers, train, test, verbose,
None, fit_params, error_score=error_score,
sample_weights=sample_weights)
for train, test in cv.split(x_data, y_data, groups))
test_scores = list(zip(*scores))[0]
test_scores = _aggregate_score_dicts(test_scores)
return np.array(test_scores[scorer_name])
[docs]class AdvancedPipeline(Pipeline):
"""Expand :class:`sklearn.pipeline.Pipeline`."""
@property
def coef_(self):
"""numpy.ndarray: Model coefficients."""
return self.steps[-1][1].coef_
@property
def feature_importances_(self):
"""numpy.ndarray: Feature importances."""
return self.steps[-1][1].feature_importances_
def _check_final_step(self):
"""Check type of final step of pipeline."""
final_step = self.steps[-1][1]
if not isinstance(final_step, AdvancedTransformedTargetRegressor):
raise TypeError(
f"Expected estimator of type "
f"{AdvancedTransformedTargetRegressor} for final step of "
f"pipeline, got {final_step.__class__}")
[docs]class AdvancedRFE(RFE):
"""Expand :class:`sklearn.feature_selection.RFE`."""
[docs] def fit(self, x_data, y_data, **fit_kwargs):
"""Expand :meth:`fit` to accept kwargs."""
return self._fit(x_data, y_data, **fit_kwargs)
def _fit(self, x_data, y_data, step_score=None, **fit_kwargs):
"""Expand :meth:`_fit` to accept kwargs."""
# Parameter step_score controls the calculation of self.scores_
# step_score is not exposed to users
# and is used when implementing AdvancedRFECV
# self.scores_ will not be calculated when calling _fit through fit
tags = self._get_tags()
x_data, y_data = check_X_y(
x_data, y_data, "csc", ensure_min_features=2,
force_all_finite=not tags.get('allow_nan', True))
# Initialization
n_features = x_data.shape[1]
if self.n_features_to_select is None:
n_features_to_select = n_features // 2
else:
n_features_to_select = self.n_features_to_select
if 0.0 < self.step < 1.0:
step = int(max(1, self.step * n_features))
else:
step = int(self.step)
if step <= 0:
raise ValueError("Step must be >0")
support_ = np.ones(n_features, dtype=np.bool)
ranking_ = np.ones(n_features, dtype=np.int)
if step_score:
self.scores_ = []
# Elimination
while np.sum(support_) > n_features_to_select:
# Remaining features
features = np.arange(n_features)[support_]
# Rank the remaining features
estimator = clone(self.estimator)
if self.verbose > 0:
print("Fitting estimator with %d features." % np.sum(support_))
_update_transformers_param(estimator, support_)
estimator.fit(x_data[:, features], y_data, **fit_kwargs)
# Get coefs (hasattr(estimator, 'coef_') raises a KeyError for
# XGBRegressor models
try:
coefs = estimator.coef_
except (AttributeError, KeyError):
coefs = getattr(estimator, 'feature_importances_', None)
if coefs is None:
raise RuntimeError('The classifier does not expose '
'"coef_" or "feature_importances_" '
'attributes')
# Get ranks
if coefs.ndim > 1:
ranks = np.argsort(safe_sqr(coefs).sum(axis=0))
else:
ranks = np.argsort(safe_sqr(coefs))
# for sparse case ranks is matrix
ranks = np.ravel(ranks)
# Eliminate the worse features
threshold = min(step, np.sum(support_) - n_features_to_select)
# Compute step score on the previous selection iteration
# because 'estimator' must use features
# that have not been eliminated yet
if step_score:
self.scores_.append(step_score(estimator, features))
support_[features[ranks][:threshold]] = False
ranking_[np.logical_not(support_)] += 1
# Set final attributes
features = np.arange(n_features)[support_]
self.estimator_ = clone(self.estimator)
_update_transformers_param(self.estimator_, support_)
self.estimator_.fit(x_data[:, features], y_data, **fit_kwargs)
# Compute step score when only n_features_to_select features left
if step_score:
self.scores_.append(step_score(self.estimator_, features))
self.n_features_ = support_.sum()
self.support_ = support_
self.ranking_ = ranking_
return self
@if_delegate_has_method(delegate='estimator')
def predict(self, x_data, **predict_kwargs):
"""Expand :meth:`predict()` to accept kwargs."""
check_is_fitted(self)
return self.estimator_.predict(self.transform(x_data),
**predict_kwargs)
[docs]class AdvancedRFECV(AdvancedRFE):
"""Expand :class:`sklearn.feature_selection.RFECV`."""
def __init__(self, estimator, step=1, min_features_to_select=1, cv=None,
scoring=None, verbose=0, n_jobs=None):
"""Original constructor of :class:`sklearn.feature_selection.RFECV`."""
self.estimator = estimator
self.step = step
self.cv = cv
self.scoring = scoring
self.verbose = verbose
self.n_jobs = n_jobs
self.min_features_to_select = min_features_to_select
[docs] def fit(self, x_data, y_data, groups=None, **fit_kwargs):
"""Expand :meth:`fit` to accept kwargs."""
x_data, y_data = check_X_y(
x_data, y_data, "csr", ensure_min_features=2,
force_all_finite=False)
# Initialization
cv = check_cv(self.cv, y_data, is_classifier(self.estimator))
scorer = check_scoring(self.estimator, scoring=self.scoring)
n_features = x_data.shape[1]
if 0.0 < self.step < 1.0:
step = int(max(1, self.step * n_features))
else:
step = int(self.step)
if step <= 0:
raise ValueError("Step must be >0")
# Build an AdvancedRFE object, which will evaluate and score each
# possible feature count, down to self.min_features_to_select
rfe = AdvancedRFE(estimator=self.estimator,
n_features_to_select=self.min_features_to_select,
step=self.step, verbose=self.verbose)
# Determine the number of subsets of features by fitting across
# the train folds and choosing the "features_to_select" parameter
# that gives the least averaged error across all folds.
# Note that joblib raises a non-picklable error for bound methods
# even if n_jobs is set to 1 with the default multiprocessing
# backend.
# This branching is done so that to
# make sure that user code that sets n_jobs to 1
# and provides bound methods as scorers is not broken with the
# addition of n_jobs parameter in version 0.18.
if effective_n_jobs(self.n_jobs) == 1:
parallel, func = list, _rfe_single_fit
else:
parallel = Parallel(n_jobs=self.n_jobs)
func = delayed(_rfe_single_fit)
scores = parallel(
func(rfe, self.estimator, x_data, y_data, train, test, scorer,
**fit_kwargs)
for train, test in cv.split(x_data, y_data, groups))
scores = np.sum(scores, axis=0)
scores_rev = scores[::-1]
argmax_idx = len(scores) - np.argmax(scores_rev) - 1
n_features_to_select = max(
n_features - (argmax_idx * step),
self.min_features_to_select)
# Re-execute an elimination with best_k over the whole set
rfe = AdvancedRFE(estimator=self.estimator,
n_features_to_select=n_features_to_select,
step=self.step, verbose=self.verbose)
rfe.fit(x_data, y_data, **fit_kwargs)
# Set final attributes
self.support_ = rfe.support_
self.n_features_ = rfe.n_features_
self.ranking_ = rfe.ranking_
self.estimator_ = clone(self.estimator)
_update_transformers_param(self.estimator_, self.support_)
self.estimator_.fit(self.transform(x_data), y_data, **fit_kwargs)
# Fixing a normalization error, n is equal to
# get_n_splits(x_data, y_data) - 1 here, the scores are normalized by
# get_n_splits(x_data, y_data)
self.grid_scores_ = scores[::-1] / cv.get_n_splits(x_data, y_data,
groups)
return self