"""
The :mod:`sklearn.pipeline` module implements utilities to build a composite
estimator, as a chain of transforms and estimators.
"""
# Author: Edouard Duchesnay
# Gael Varoquaux
# Virgile Fritsch
# Alexandre Gramfort
# Lars Buitinck
# License: BSD
from collections import defaultdict
from itertools import islice
import warnings
import numpy as np
from scipy import sparse
from joblib import Parallel, delayed
from .base import clone, TransformerMixin
from .utils._estimator_html_repr import _VisualBlock
from .utils.metaestimators import if_delegate_has_method
from .utils import Bunch, _print_elapsed_time
from .utils.validation import check_memory
from .utils.validation import _deprecate_positional_args
from .utils.metaestimators import _BaseComposition
__all__ = ['Pipeline', 'FeatureUnion', 'make_pipeline', 'make_union']
class Pipeline(_BaseComposition):
"""
Pipeline of transforms with a final estimator.
Sequentially apply a list of transforms and a final estimator.
Intermediate steps of the pipeline must be 'transforms', that is, they
must implement fit and transform methods.
The final estimator only needs to implement fit.
The transformers in the pipeline can be cached using ``memory`` argument.
The purpose of the pipeline is to assemble several steps that can be
cross-validated together while setting different parameters.
For this, it enables setting parameters of the various steps using their
names and the parameter name separated by a '__', as in the example below.
A step's estimator may be replaced entirely by setting the parameter
with its name to another estimator, or a transformer removed by setting
it to 'passthrough' or ``None``.
Read more in the :ref:`User Guide <pipeline>`.
.. versionadded:: 0.5
Parameters
----------
steps : list
List of (name, transform) tuples (implementing fit/transform) that are
chained, in the order in which they are chained, with the last object
an estimator.
memory : str or object with the joblib.Memory interface, default=None
Used to cache the fitted transformers of the pipeline. By default,
no caching is performed. If a string is given, it is the path to
the caching directory. Enabling caching triggers a clone of
the transformers before fitting. Therefore, the transformer
instance given to the pipeline cannot be inspected
directly. Use the attribute ``named_steps`` or ``steps`` to
inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.
verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
Attributes
----------
named_steps : :class:`~sklearn.utils.Bunch`
Dictionary-like object, with the following attributes.
Read-only attribute to access any step parameter by user given name.
Keys are step names and values are steps parameters.
See Also
--------
sklearn.pipeline.make_pipeline : Convenience function for simplified
pipeline construction.
Examples
--------
>>> from sklearn.svm import SVC
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.datasets import make_classification
>>> from sklearn.model_selection import train_test_split
>>> from sklearn.pipeline import Pipeline
>>> X, y = make_classification(random_state=0)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
... random_state=0)
>>> pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
>>> # The pipeline can be used as any other estimator
>>> # and avoids leaking the test set into the train set
>>> pipe.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
>>> pipe.score(X_test, y_test)
0.88
"""
# BaseEstimator interface
_required_parameters = ['steps']
@_deprecate_positional_args
def __init__(self, steps, *, memory=None, verbose=False):
self.steps = steps
self.memory = memory
self.verbose = verbose
self._validate_steps()
def get_params(self, deep=True):
"""Get parameters for this estimator.
Parameters
----------
deep : bool, default=True
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
return self._get_params('steps', deep=deep)
def set_params(self, **kwargs):
"""Set the parameters of this estimator.
Valid parameter keys can be listed with ``get_params()``.
Returns
-------
self
"""
self._set_params('steps', **kwargs)
return self
def _validate_steps(self):
names, estimators = zip(*self.steps)
# validate names
self._validate_names(names)
# validate estimators
transformers = estimators[:-1]
estimator = estimators[-1]
for t in transformers:
if t is None or t == 'passthrough':
continue
if (not (hasattr(t, "fit") or hasattr(t, "fit_transform")) or not
hasattr(t, "transform")):
raise TypeError("All intermediate steps should be "
"transformers and implement fit and transform "
"or be the string 'passthrough' "
"'%s' (type %s) doesn't" % (t, type(t)))
# We allow last estimator to be None as an identity transformation
if (estimator is not None and estimator != 'passthrough'
and not hasattr(estimator, "fit")):
raise TypeError(
"Last step of Pipeline should implement fit "
"or be the string 'passthrough'. "
"'%s' (type %s) doesn't" % (estimator, type(estimator)))
def _iter(self, with_final=True, filter_passthrough=True):
"""
Generate (idx, (name, trans)) tuples from self.steps
When filter_passthrough is True, 'passthrough' and None transformers
are filtered out.
"""
stop = len(self.steps)
if not with_final:
stop -= 1
for idx, (name, trans) in enumerate(islice(self.steps, 0, stop)):
if not filter_passthrough:
yield idx, name, trans
elif trans is not None and trans != 'passthrough':
yield idx, name, trans
def __len__(self):
"""
Returns the length of the Pipeline
"""
return len(self.steps)
def __getitem__(self, ind):
"""Returns a sub-pipeline or a single esimtator in the pipeline
Indexing with an integer will return an estimator; using a slice
returns another Pipeline instance which copies a slice of this
Pipeline. This copy is shallow: modifying (or fitting) estimators in
the sub-pipeline will affect the larger pipeline and vice-versa.
However, replacing a value in `step` will not affect a copy.
"""
if isinstance(ind, slice):
if ind.step not in (1, None):
raise ValueError('Pipeline slicing only supports a step of 1')
return self.__class__(self.steps[ind])
try:
name, est = self.steps[ind]
except TypeError:
# Not an int, try get step by name
return self.named_steps[ind]
return est
@property
def _estimator_type(self):
return self.steps[-1][1]._estimator_type
@property
def named_steps(self):
# Use Bunch object to improve autocomplete
return Bunch(**dict(self.steps))
@property
def _final_estimator(self):
estimator = self.steps[-1][1]
return 'passthrough' if estimator is None else estimator
def _log_message(self, step_idx):
if not self.verbose:
return None
name, step = self.steps[step_idx]
return '(step %d of %d) Processing %s' % (step_idx + 1,
len(self.steps),
name)
def _check_fit_params(self, **fit_params):
fit_params_steps = {name: {} for name, step in self.steps
if step is not None}
for pname, pval in fit_params.items():
if '__' not in pname:
raise ValueError(
"Pipeline.fit does not accept the {} parameter. "
"You can pass parameters to specific steps of your "
"pipeline using the stepname__parameter format, e.g. "
"`Pipeline.fit(X, y, logisticregression__sample_weight"
"=sample_weight)`.".format(pname))
step, param = pname.split('__', 1)
fit_params_steps[step][param] = pval
return fit_params_steps
# Estimator interface
def _fit(self, X, y=None, **fit_params_steps):
# shallow copy of steps - this should really be steps_
self.steps = list(self.steps)
self._validate_steps()
# Setup the memory
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(_fit_transform_one)
for (step_idx,
name,
transformer) in self._iter(with_final=False,
filter_passthrough=False):
if (transformer is None or transformer == 'passthrough'):
with _print_elapsed_time('Pipeline',
self._log_message(step_idx)):
continue
if hasattr(memory, 'location'):
# joblib >= 0.12
if memory.location is None:
# we do not clone when caching is disabled to
# preserve backward compatibility
cloned_transformer = transformer
else:
cloned_transformer = clone(transformer)
elif hasattr(memory, 'cachedir'):
# joblib < 0.11
if memory.cachedir is None:
# we do not clone when caching is disabled to
# preserve backward compatibility
cloned_transformer = transformer
else:
cloned_transformer = clone(transformer)
else:
cloned_transformer = clone(transformer)
# Fit or load from cache the current transformer
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer, X, y, None,
message_clsname='Pipeline',
message=self._log_message(step_idx),
**fit_params_steps[name])
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
# from the cache.
self.steps[step_idx] = (name, fitted_transformer)
return X
def fit(self, X, y=None, **fit_params):
"""Fit the model
Fit all the transforms one after the other and transform the
data, then fit the transformed data using the final estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
pipeline.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the pipeline.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
self : Pipeline
This estimator
"""
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
with _print_elapsed_time('Pipeline',
self._log_message(len(self.steps) - 1)):
if self._final_estimator != 'passthrough':
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **fit_params_last_step)
return self
def fit_transform(self, X, y=None, **fit_params):
"""Fit the model and transform with the final estimator
Fits all the transforms one after the other and transforms the
data, then uses fit_transform on transformed data with the final
estimator.
Loading ...