"""Custom expansions of :mod:`sklearn` functionalities.
Note
----
This module provides custom expansions of some :mod:`sklearn` classes and
functions which are necessary to fit the purposes for the desired
functionalities of the :ref:`MLR module <api.esmvaltool.diag_scripts.mlr>`. As
long-term goal we would like to include these functionalities to the
:mod:`sklearn` package since we believe these additions might be helpful for
everyone. This module serves as interim solution. To ensure that all features
are properly working this module is also covered by extensive tests.
Parts of this code have been copied from :mod:`sklearn`.
License: BSD 3-Clause License
Copyright (c) 2007-2020 The scikit-learn developers.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
# pylint: disable=arguments-differ
# pylint: disable=attribute-defined-outside-init
# pylint: disable=protected-access
# pylint: disable=super-init-not-called
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
# pylint: disable=too-many-return-statements
import itertools
import logging
import numbers
import os
import warnings
from contextlib import suppress
from copy import deepcopy
from inspect import getfullargspec
from traceback import format_exc
import numpy as np
import scipy.sparse as sp
from joblib import Parallel, delayed, effective_n_jobs
from sklearn.base import BaseEstimator, clone, is_classifier
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.exceptions import FitFailedWarning, NotFittedError
from sklearn.feature_selection import RFE, SelectorMixin
from sklearn.linear_model import LinearRegression
from sklearn.metrics import check_scoring
from sklearn.model_selection import check_cv
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.utils import check_array, check_X_y, indexable, safe_sqr
from sklearn.utils.fixes import np_version, parse_version
from sklearn.utils.metaestimators import available_if
from sklearn.utils.validation import check_is_fitted
from esmvaltool.diag_scripts import mlr
logger = logging.getLogger(os.path.basename(__file__))
_DEFAULT_TAGS = {
"array_api_support": False,
"non_deterministic": False,
"requires_positive_X": False,
"requires_positive_y": False,
"X_types": ["2darray"],
"poor_score": False,
"no_validation": False,
"multioutput": False,
"allow_nan": False,
"stateless": False,
"multilabel": False,
"_skip_test": False,
"_xfail_checks": False,
"multioutput_only": False,
"binary_only": False,
"requires_fit": True,
"preserves_dtype": [np.float64],
"requires_y": False,
"pairwise": False,
}
def _estimator_has(attr):
"""Check if we can delegate a method to the underlying estimator.
First, we check the first fitted estimator if available, otherwise we
check the unfitted estimator.
"""
return lambda self: (
hasattr(self.estimator_, attr)
if hasattr(self, "estimator_")
else hasattr(self.estimator, attr)
)
def _determine_key_type(key, accept_slice=True):
"""Determine the data type of key."""
err_msg = (
"No valid specification of the columns. Only a scalar, list or "
"slice of all integers or all strings, or boolean mask is "
"allowed"
)
dtype_to_str = {int: "int", str: "str", bool: "bool", np.bool_: "bool"}
array_dtype_to_str = {
"i": "int",
"u": "int",
"b": "bool",
"O": "str",
"U": "str",
"S": "str",
}
if key is None:
return None
if isinstance(key, tuple(dtype_to_str.keys())):
try:
return dtype_to_str[type(key)]
except KeyError as exc:
raise ValueError(err_msg) from exc
if isinstance(key, slice):
if not accept_slice:
raise TypeError(
"Only array-like or scalar are supported. "
"A Python slice was given."
)
if key.start is None and key.stop is None:
return None
key_start_type = _determine_key_type(key.start)
key_stop_type = _determine_key_type(key.stop)
if key_start_type is not None and key_stop_type is not None:
if key_start_type != key_stop_type:
raise ValueError(err_msg)
if key_start_type is not None:
return key_start_type
return key_stop_type
if isinstance(key, (list, tuple)):
unique_key = set(key)
key_type = {_determine_key_type(elt) for elt in unique_key}
if not key_type:
return None
if len(key_type) != 1:
raise ValueError(err_msg)
return key_type.pop()
if hasattr(key, "dtype"):
try:
return array_dtype_to_str[key.dtype.kind]
except KeyError as exc:
raise ValueError(err_msg) from exc
raise ValueError(err_msg)
def _array_indexing(array, key, key_dtype, axis):
"""Index an array or scipy.sparse consistently across numpy version."""
if np_version < parse_version("1.12") or sp.issparse(array):
if key_dtype == "bool":
key = np.asarray(key)
if isinstance(key, tuple):
key = list(key)
return array[key] if axis == 0 else array[:, key]
def _list_indexing(x_data, key, key_dtype):
"""Index a python list."""
if np.isscalar(key) or isinstance(key, slice):
# key is a slice or a scalar
return x_data[key]
if key_dtype == "bool":
# key is a boolean array-like
return list(itertools.compress(x_data, key))
# key is a integer array-like of key
return [x_data[idx] for idx in key]
def _pandas_indexing(x_data, key, key_dtype, axis):
"""Index a pandas dataframe or a series."""
if hasattr(key, "shape"):
key = np.asarray(key)
key = key if key.flags.writeable else key.copy()
elif isinstance(key, tuple):
key = list(key)
# check whether we should index with loc or iloc
indexer = x_data.iloc if key_dtype == "int" else x_data.loc
return indexer[:, key] if axis else indexer[key]
def _safe_indexing(x_data, indices, *_, axis=0):
"""Return rows, items or columns of x_data using indices."""
if indices is None:
return x_data
if axis not in (0, 1):
raise ValueError(
f"'axis' should be either 0 (to index rows) or 1 (to index "
f"column). Got {axis} instead."
)
indices_dtype = _determine_key_type(indices)
if axis == 0 and indices_dtype == "str":
raise ValueError("String indexing is not supported with 'axis=0'")
if axis == 1 and x_data.ndim != 2:
raise ValueError(
f"'x_data' should be a 2D NumPy array, 2D sparse matrix or pandas "
f"dataframe when indexing the columns (i.e. 'axis=1'). "
f"Got {type(x_data)} instead with {x_data.ndim} dimension(s)."
)
if axis == 1 and indices_dtype == "str" and not hasattr(x_data, "loc"):
raise ValueError(
"Specifying the columns using strings is only supported for "
"pandas DataFrames"
)
if hasattr(x_data, "iloc"):
return _pandas_indexing(x_data, indices, indices_dtype, axis=axis)
if hasattr(x_data, "shape"):
return _array_indexing(x_data, indices, indices_dtype, axis=axis)
return _list_indexing(x_data, indices, indices_dtype)
def _is_arraylike(input_array):
"""Check whether the input is array-like."""
return (
hasattr(input_array, "__len__")
or hasattr(input_array, "shape")
or hasattr(input_array, "__array__")
)
def _make_indexable(iterable):
"""Ensure iterable supports indexing or convert to an indexable variant."""
if sp.issparse(iterable):
return iterable.tocsr()
if hasattr(iterable, "__getitem__") or hasattr(iterable, "iloc"):
return iterable
if iterable is None:
return iterable
return np.array(iterable)
def _num_samples(x_data):
"""Return number of samples in array-like x_data."""
message = f"Expected sequence or array-like, got {type(x_data)}"
if hasattr(x_data, "fit") and callable(x_data.fit):
# Don't get num_samples from an ensembles length!
raise TypeError(message)
if not hasattr(x_data, "__len__") and not hasattr(x_data, "shape"):
if hasattr(x_data, "__array__"):
x_data = np.asarray(x_data)
else:
raise TypeError(message)
if hasattr(x_data, "shape") and x_data.shape is not None:
if len(x_data.shape) == 0:
raise TypeError(
f"Singleton array {x_data!r} cannot be considered a valid "
f"collection."
)
# Check that shape is returning an integer or default to len
# Dask dataframes may not return numeric shape[0] value
if isinstance(x_data.shape[0], numbers.Integral):
return x_data.shape[0]
try:
return len(x_data)
except TypeError as type_error:
raise TypeError(message) from type_error
def _check_fit_params(x_data, fit_params, indices=None):
"""Check and validate the parameters passed during ``fit``."""
fit_params_validated = {}
for param_key, param_value in fit_params.items():
if not _is_arraylike(param_value) or _num_samples(
param_value
) != _num_samples(x_data):
# Non-indexable pass-through (for now for backward-compatibility).
# https://github.com/scikit-learn/scikit-learn/issues/15805
fit_params_validated[param_key] = param_value
else:
# Any other fit_params should support indexing
# (e.g. for cross-validation).
fit_params_validated[param_key] = _make_indexable(param_value)
fit_params_validated[param_key] = _safe_indexing(
fit_params_validated[param_key], indices
)
return fit_params_validated
def _safe_tags(estimator, key=None):
"""Safely get estimator tags."""
if hasattr(estimator, "_get_tags"):
tags_provider = "_get_tags()"
tags = estimator._get_tags()
elif hasattr(estimator, "_more_tags"):
tags_provider = "_more_tags()"
tags = {**_DEFAULT_TAGS, **estimator._more_tags()}
else:
tags_provider = "_DEFAULT_TAGS"
tags = _DEFAULT_TAGS
if key is not None:
if key not in tags:
raise ValueError(
f"The key {key} is not defined in {tags_provider} for the "
f"class {estimator.__class__.__name__}."
)
return tags[key]
return tags
def _is_pairwise(estimator):
"""Return ``True`` if estimator is pairwise."""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
has_pairwise_attribute = hasattr(estimator, "_pairwise")
pairwise_attribute = getattr(estimator, "_pairwise", False)
pairwise_tag = _safe_tags(estimator, key="pairwise")
if has_pairwise_attribute:
if pairwise_attribute != pairwise_tag:
warnings.warn(
"_pairwise attribute is inconsistent with tags. Set the "
"estimator tags of your estimator instead",
FutureWarning,
stacklevel=2,
)
return pairwise_attribute
# Use pairwise tag when the attribute is not present
return pairwise_tag
def _safe_split(estimator, x_data, y_data, indices, train_indices=None):
"""Create subset of dataset and properly handle kernels."""
if _is_pairwise(estimator):
if not hasattr(x_data, "shape"):
raise ValueError(
"Precomputed kernels or affinity matrices have "
"to be passed as arrays or sparse matrices."
)
# x_data is a precomputed square kernel matrix
if x_data.shape[0] != x_data.shape[1]:
raise ValueError("x_data should be a square kernel matrix")
if train_indices is None:
x_subset = x_data[np.ix_(indices, indices)]
else:
x_subset = x_data[np.ix_(indices, train_indices)]
else:
x_subset = _safe_indexing(x_data, indices)
if y_data is not None:
y_subset = _safe_indexing(y_data, indices)
else:
y_subset = None
return (x_subset, y_subset)
def _fit_and_score_weighted(
estimator,
x_data,
y_data,
scorer,
train,
test,
parameters,
fit_params,
error_score=np.nan,
sample_weights=None,
):
"""Expand :func:`sklearn.model_selection._validation._fit_and_score`."""
# Adjust length of sample weights
fit_params = fit_params if fit_params is not None else {}
fit_params = _check_fit_params(x_data, fit_params, train)
if parameters is not None:
# clone after setting parameters in case any parameters
# are estimators (like pipeline steps)
# because pipeline doesn't clone steps in fit
cloned_parameters = {}
for key, val in parameters.items():
cloned_parameters[key] = clone(val, safe=False)
estimator = estimator.set_params(**cloned_parameters)
(x_train, y_train) = _safe_split(estimator, x_data, y_data, train)
(x_test, y_test) = _safe_split(estimator, x_data, y_data, test, train)
if sample_weights is not None:
sample_weights_test = sample_weights[test]
else:
sample_weights_test = None
try:
if y_train is None:
estimator.fit(x_train, **fit_params)
else:
estimator.fit(x_train, y_train, **fit_params)
except Exception as exc:
if error_score == "raise":
raise
if isinstance(error_score, numbers.Number):
test_score = error_score
warnings.warn(
f"Estimator fit failed. The score on this train-test "
f"partition for these parameters will be set to "
f"{error_score:f}. Details: \n{format_exc()}",
FitFailedWarning,
stacklevel=2,
)
else:
raise ValueError(
"error_score must be the string 'raise' or a "
"numeric value. (Hint: if using 'raise', please "
"make sure that it has been spelled correctly.)"
) from exc
else:
test_score = _score_weighted(
estimator,
x_test,
y_test,
scorer,
sample_weights=sample_weights_test,
)
return test_score
def _get_fit_parameters(fit_kwargs, steps, cls):
"""Retrieve fit parameters from ``fit_kwargs``."""
params = {name: {} for (name, step) in steps if step is not None}
step_names = list(params.keys())
for param_name, param_val in fit_kwargs.items():
param_split = param_name.split("__", 1)
if len(param_split) != 2:
raise ValueError(
f"Fit parameters for {cls} have to be given in the form "
f"'s__p', where 's' is the name of the step and 'p' the name "
f"of the parameter, got '{param_name}'"
)
try:
params[param_split[0]][param_split[1]] = param_val
except KeyError as exc:
raise ValueError(
f"Expected one of {step_names} for step of fit parameter, got "
f"'{param_split[0]}' for parameter '{param_name}'"
) from exc
return params
def _score_weighted(estimator, x_test, y_test, scorer, sample_weights=None):
"""Expand :func:`sklearn.model_selection._validation._score`."""
if y_test is None:
score = scorer(estimator, x_test, sample_weight=sample_weights)
else:
score = scorer(estimator, x_test, y_test, sample_weight=sample_weights)
error_msg = (
"Scoring must return a number, got %s (%s) instead. (scorer=%s)"
)
if hasattr(score, "item"):
with suppress(ValueError):
# e.g. unwrap memmapped scalars
score = score.item()
if not isinstance(score, numbers.Number):
raise ValueError(error_msg % (score, type(score), scorer))
return score
def _split_fit_kwargs(fit_kwargs, train_idx, test_idx):
"""Get split fit kwargs for single CV step."""
fit_kwargs_train = {}
fit_kwargs_test = {}
for key, val in fit_kwargs.items():
if "sample_weight" in key and "sample_weight_eval_set" not in key:
fit_kwargs_train[key] = deepcopy(val)[train_idx]
fit_kwargs_test[key] = deepcopy(val)[test_idx]
else:
fit_kwargs_train[key] = deepcopy(val)
fit_kwargs_test[key] = deepcopy(val)
return (fit_kwargs_train, fit_kwargs_test)
def _rfe_single_fit(
rfe, estimator, x_data, y_data, train, test, scorer, **fit_kwargs
):
"""Return the score for a fit across one fold."""
(x_train, y_train) = _safe_split(estimator, x_data, y_data, train)
(x_test, y_test) = _safe_split(estimator, x_data, y_data, test, train)
(fit_kwargs_train, fit_kwargs_test) = _split_fit_kwargs(
fit_kwargs, train, test
)
if "sample_weight" in fit_kwargs_test:
fit_kwargs_test["sample_weights"] = fit_kwargs_test.pop(
"sample_weight"
)
def step_score(estimator, features):
"""Score for a single step in the recursive feature elimination."""
return _score_weighted(
estimator, x_test[:, features], y_test, scorer, **fit_kwargs_test
)
return rfe._fit(
x_train, y_train, step_score=step_score, **fit_kwargs_train
).scores_
def _map_features(features, support):
"""Map old features indices to new ones using boolean mask."""
feature_mapping = {}
new_idx = 0
for old_idx, supported in enumerate(support):
if supported:
val = new_idx
new_idx += 1
else:
val = None
feature_mapping[old_idx] = val
new_features = []
for feature in features:
new_feature = feature_mapping[feature]
if new_feature is not None:
new_features.append(new_feature)
return new_features
def _update_transformers_param(estimator, support):
"""Update ``transformers`` argument of ``ColumnTransformer`` steps."""
all_params = estimator.get_params()
params = []
for key in all_params:
if key.endswith("transformers"):
params.append(key)
if isinstance(estimator, (Pipeline, AdvancedPipeline)):
step = estimator.named_steps[key.split("__")[0]]
if not isinstance(step, ColumnTransformer):
raise TypeError(
f"Found 'transformers' parameter ('{key}'), but the "
f"corresponding pipeline step is not a "
f"ColumnTransformer (got '{type(step)}')"
)
else:
raise TypeError(
f"Found 'transformers' parameter ('{key}'), but the "
f"corresponding estimator is not a Pipeline or "
f"AdvancedPipeline"
)
new_params = {}
for param in params:
new_transformers = []
for transformer in all_params[param]:
new_columns = _map_features(transformer[2], support)
new_transformers.append(
(transformer[0], transformer[1], new_columns)
)
new_params[param] = new_transformers
estimator.set_params(**new_params)
[docs]
def cross_val_score_weighted(
estimator,
x_data,
y_data=None,
groups=None,
scoring=None,
cv=None,
n_jobs=None,
verbose=0,
fit_params=None,
pre_dispatch="2*n_jobs",
error_score=np.nan,
sample_weights=None,
):
"""Expand :func:`sklearn.model_selection.cross_val_score`."""
scorer = check_scoring(estimator, scoring=scoring)
(x_data, y_data, groups) = indexable(x_data, y_data, groups)
cv = check_cv(cv, y_data, classifier=is_classifier(estimator))
# We clone the estimator to make sure that all the folds are
# independent, and that it is pickle-able.
parallel = Parallel(
n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch
)
scores = parallel(
delayed(_fit_and_score_weighted)(
clone(estimator),
x_data,
y_data,
scorer,
train,
test,
None,
fit_params,
error_score=error_score,
sample_weights=sample_weights,
)
for train, test in cv.split(x_data, y_data, groups)
)
return np.array(scores)
[docs]
class AdvancedPipeline(Pipeline):
"""Expand :class:`sklearn.pipeline.Pipeline`."""
@property
def coef_(self):
"""numpy.ndarray: Model coefficients."""
return self.steps[-1][1].coef_
@property
def feature_importances_(self):
"""numpy.ndarray: Feature importances."""
return self.steps[-1][1].feature_importances_
def _check_final_step(self):
"""Check type of final step of pipeline."""
final_step = self.steps[-1][1]
if not isinstance(final_step, AdvancedTransformedTargetRegressor):
raise TypeError(
f"Expected estimator of type "
f"{AdvancedTransformedTargetRegressor} for final step of "
f"pipeline, got {final_step.__class__}"
)
[docs]
class AdvancedRFE(RFE):
"""Expand :class:`sklearn.feature_selection.RFE`."""
[docs]
def fit(self, x_data, y_data, **fit_kwargs):
"""Expand :meth:`fit` to accept kwargs."""
return self._fit(x_data, y_data, **fit_kwargs)
def _fit(self, x_data, y_data, step_score=None, **fit_kwargs):
"""Expand :meth:`_fit` to accept kwargs."""
# Parameter step_score controls the calculation of self.scores_
# step_score is not exposed to users
# and is used when implementing AdvancedRFECV
# self.scores_ will not be calculated when calling _fit through fit
x_data, y_data = check_X_y(
x_data,
y_data,
"csc",
ensure_min_features=2,
force_all_finite=False,
)
# Initialization
n_features = x_data.shape[1]
if self.n_features_to_select is None:
n_features_to_select = n_features // 2
else:
n_features_to_select = self.n_features_to_select
if 0.0 < self.step < 1.0:
step = int(max(1, self.step * n_features))
else:
step = int(self.step)
if step <= 0:
raise ValueError("Step must be >0")
support_ = np.ones(n_features, dtype=bool)
ranking_ = np.ones(n_features, dtype=np.int64)
if step_score:
self.scores_ = []
# Elimination
while np.sum(support_) > n_features_to_select:
# Remaining features
features = np.arange(n_features)[support_]
# Rank the remaining features
estimator = clone(self.estimator)
if self.verbose > 0:
print(f"Fitting estimator with {np.sum(support_):d} features.")
_update_transformers_param(estimator, support_)
estimator.fit(x_data[:, features], y_data, **fit_kwargs)
# Get coefs (hasattr(estimator, 'coef_') raises a KeyError for
# XGBRegressor models
try:
coefs = estimator.coef_
except (AttributeError, KeyError):
coefs = getattr(estimator, "feature_importances_", None)
if coefs is None:
raise RuntimeError(
"The classifier does not expose "
"'coef_' or 'feature_importances_' "
"attributes"
)
# Get ranks
if coefs.ndim > 1:
ranks = np.argsort(safe_sqr(coefs).sum(axis=0))
else:
ranks = np.argsort(safe_sqr(coefs))
# Transformer steps that reduce number of features is not supported
if len(ranks) != len(features):
raise NotImplementedError(
f"Estimators that contain transforming steps that reduce "
f"the number of features are not supported in "
f"{self.__class__}, got {len(features):d} features for ",
f"fit(), but only {len(ranks):d} elements for 'coefs_' / "
f"'feature_importances_' are provided. Estimator:\n"
f"{estimator}",
)
# for sparse case ranks is matrix
ranks = np.ravel(ranks)
# Eliminate the worse features
threshold = min(step, np.sum(support_) - n_features_to_select)
# Compute step score on the previous selection iteration
# because 'estimator' must use features
# that have not been eliminated yet
if step_score:
self.scores_.append(step_score(estimator, features))
support_[features[ranks][:threshold]] = False
ranking_[np.logical_not(support_)] += 1
# Set final attributes
features = np.arange(n_features)[support_]
self.estimator_ = clone(self.estimator)
_update_transformers_param(self.estimator_, support_)
self.estimator_.fit(x_data[:, features], y_data, **fit_kwargs)
# Compute step score when only n_features_to_select features left
if step_score:
self.scores_.append(step_score(self.estimator_, features))
self.n_features_ = support_.sum()
self.support_ = support_
self.ranking_ = ranking_
return self
[docs]
@available_if(_estimator_has("predict"))
def predict(self, x_data, **predict_kwargs):
"""Expand :meth:`predict()` to accept kwargs."""
check_is_fitted(self)
return self.estimator_.predict(
self.transform(x_data), **predict_kwargs
)
[docs]
class AdvancedRFECV(AdvancedRFE):
"""Expand :class:`sklearn.feature_selection.RFECV`."""
def __init__(
self,
estimator,
step=1,
min_features_to_select=1,
cv=None,
scoring=None,
verbose=0,
n_jobs=None,
):
"""Original constructor of :class:`sklearn.feature_selection.RFECV`."""
self.estimator = estimator
self.step = step
self.min_features_to_select = min_features_to_select
self.cv = cv
self.scoring = scoring
self.verbose = verbose
self.n_jobs = n_jobs
[docs]
def fit(self, x_data, y_data, groups=None, **fit_kwargs):
"""Expand :meth:`fit` to accept kwargs."""
x_data, y_data = check_X_y(
x_data,
y_data,
"csr",
ensure_min_features=2,
force_all_finite=False,
)
# Initialization
cv = check_cv(
self.cv, y_data, classifier=is_classifier(self.estimator)
)
scorer = check_scoring(self.estimator, scoring=self.scoring)
n_features = x_data.shape[1]
if 0.0 < self.step < 1.0:
step = int(max(1, self.step * n_features))
else:
step = int(self.step)
if step <= 0:
raise ValueError("Step must be >0")
# Build an AdvancedRFE object, which will evaluate and score each
# possible feature count, down to self.min_features_to_select
rfe = AdvancedRFE(
estimator=self.estimator,
n_features_to_select=self.min_features_to_select,
step=self.step,
verbose=self.verbose,
)
# Determine the number of subsets of features by fitting across
# the train folds and choosing the "features_to_select" parameter
# that gives the least averaged error across all folds.
# Note that joblib raises a non-picklable error for bound methods
# even if n_jobs is set to 1 with the default multiprocessing
# backend.
# This branching is done so that to
# make sure that user code that sets n_jobs to 1
# and provides bound methods as scorers is not broken with the
# addition of n_jobs parameter.
if effective_n_jobs(self.n_jobs) == 1:
(parallel, func) = (list, _rfe_single_fit)
else:
parallel = Parallel(n_jobs=self.n_jobs)
func = delayed(_rfe_single_fit)
scores = parallel(
func(
rfe,
self.estimator,
x_data,
y_data,
train,
test,
scorer,
**fit_kwargs,
)
for train, test in cv.split(x_data, y_data, groups)
)
scores = np.sum(scores, axis=0)
scores_rev = scores[::-1]
argmax_idx = len(scores) - np.argmax(scores_rev) - 1
n_features_to_select = max(
n_features - (argmax_idx * step), self.min_features_to_select
)
# Re-execute an elimination with best_k over the whole set
rfe = AdvancedRFE(
estimator=self.estimator,
n_features_to_select=n_features_to_select,
step=self.step,
verbose=self.verbose,
)
rfe.fit(x_data, y_data, **fit_kwargs)
# Set final attributes
self.support_ = rfe.support_
self.n_features_ = rfe.n_features_
self.ranking_ = rfe.ranking_
self.estimator_ = clone(self.estimator)
_update_transformers_param(self.estimator_, self.support_)
self.estimator_.fit(self.transform(x_data), y_data, **fit_kwargs)
# Fixing a normalization error, n is equal to
# get_n_splits(x_data, y_data) - 1 here, the scores are normalized by
# get_n_splits(x_data, y_data)
self.grid_scores_ = scores[::-1] / cv.get_n_splits(
x_data, y_data, groups
)
return self