Test the pipeline module.
from distutils.version import LooseVersion
from tempfile import mkdtemp
import shutil
import time
import re
import itertools

import pytest
import numpy as np
from scipy import sparse
import joblib

from sklearn.utils._testing import assert_raises
from sklearn.utils._testing import assert_raises_regex
from sklearn.utils._testing import assert_raise_message
from sklearn.utils._testing import assert_allclose
from sklearn.utils._testing import assert_array_equal
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_no_warnings

from sklearn.base import clone, BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion, make_pipeline, make_union
from sklearn.svm import SVC
from sklearn.neighbors import LocalOutlierFactor
from sklearn.linear_model import LogisticRegression, Lasso
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.dummy import DummyRegressor
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.experimental import enable_hist_gradient_boosting  # noqa
from sklearn.ensemble import HistGradientBoostingClassifier

iris = load_iris()

    "the pizza pizza beer copyright",
    "the pizza burger beer copyright",
    "the the pizza beer beer copyright",
    "the burger beer beer copyright",
    "the coke burger coke copyright",
    "the coke burger burger",

class NoFit:
    """Small class to test parameter dispatching.

    def __init__(self, a=None, b=None):
        self.a = a
        self.b = b

class NoTrans(NoFit):

    def fit(self, X, y):
        return self

    def get_params(self, deep=False):
        return {'a': self.a, 'b': self.b}

    def set_params(self, **params):
        self.a = params['a']
        return self

class NoInvTransf(NoTrans):
    def transform(self, X):
        return X

class Transf(NoInvTransf):
    def transform(self, X):
        return X

    def inverse_transform(self, X):
        return X

class TransfFitParams(Transf):

    def fit(self, X, y, **fit_params):
        self.fit_params = fit_params
        return self

class Mult(BaseEstimator):
    def __init__(self, mult=1):
        self.mult = mult

    def fit(self, X, y):
        return self

    def transform(self, X):
        return np.asarray(X) * self.mult

    def inverse_transform(self, X):
        return np.asarray(X) / self.mult

    def predict(self, X):
        return (np.asarray(X) * self.mult).sum(axis=1)

    predict_proba = predict_log_proba = decision_function = predict

    def score(self, X, y=None):
        return np.sum(X)

class FitParamT(BaseEstimator):
    """Mock classifier

    def __init__(self):
        self.successful = False

    def fit(self, X, y, should_succeed=False):
        self.successful = should_succeed

    def predict(self, X):
        return self.successful

    def fit_predict(self, X, y, should_succeed=False):
        self.fit(X, y, should_succeed=should_succeed)
        return self.predict(X)

    def score(self, X, y=None, sample_weight=None):
        if sample_weight is not None:
            X = X * sample_weight
        return np.sum(X)

class DummyTransf(Transf):
    """Transformer which store the column means"""

    def fit(self, X, y):
        self.means_ = np.mean(X, axis=0)
        # store timestamp to figure out whether the result of 'fit' has been
        # cached or not
        self.timestamp_ = time.time()
        return self

class DummyEstimatorParams(BaseEstimator):
    """Mock classifier that takes params on predict"""

    def fit(self, X, y):
        return self

    def predict(self, X, got_attribute=False):
        self.got_attribute = got_attribute
        return self

def test_pipeline_init():
    # Test the various init parameters of the pipeline.
    assert_raises(TypeError, Pipeline)
    # Check that we can't instantiate pipelines with objects without fit
    # method
                        'Last step of Pipeline should implement fit '
                        'or be the string \'passthrough\''
                        Pipeline, [('clf', NoFit())])
    # Smoke test with only an estimator
    clf = NoTrans()
    pipe = Pipeline([('svc', clf)])
    assert (pipe.get_params(deep=True) ==
                 dict(svc__a=None, svc__b=None, svc=clf,

    # Check that params are set
    assert clf.a == 0.1
    assert clf.b is None
    # Smoke test the repr:

    # Test with two objects
    clf = SVC()
    filter1 = SelectKBest(f_classif)
    pipe = Pipeline([('anova', filter1), ('svc', clf)])

    # Check that estimators are not cloned on pipeline construction
    assert pipe.named_steps['anova'] is filter1
    assert pipe.named_steps['svc'] is clf

    # Check that we can't instantiate with non-transformers on the way
    # Note that NoTrans implements fit, but not transform
                        'All intermediate steps should be transformers'
                        Pipeline, [('t', NoTrans()), ('svc', clf)])

    # Check that params are set
    assert clf.C == 0.1
    # Smoke test the repr:

    # Check that params are not set when naming them wrong
    assert_raises(ValueError, pipe.set_params, anova__C=0.1)

    # Test clone
    pipe2 = assert_no_warnings(clone, pipe)
    assert not pipe.named_steps['svc'] is pipe2.named_steps['svc']

    # Check that apart from estimators, the parameters are the same
    params = pipe.get_params(deep=True)
    params2 = pipe2.get_params(deep=True)

    for x in pipe.get_params(deep=False):

    for x in pipe2.get_params(deep=False):

    # Remove estimators that where copied
    assert params == params2

def test_pipeline_init_tuple():
    # Pipeline accepts steps as tuple
    X = np.array([[1, 2]])
    pipe = Pipeline((('transf', Transf()), ('clf', FitParamT())))
    pipe.fit(X, y=None)

    pipe.fit(X, y=None)

def test_pipeline_methods_anova():
    # Test the various methods of the pipeline (anova).
    X = iris.data
    y = iris.target
    # Test with Anova + LogisticRegression
    clf = LogisticRegression()
    filter1 = SelectKBest(f_classif, k=2)
    pipe = Pipeline([('anova', filter1), ('logistic', clf)])
    pipe.fit(X, y)
    pipe.score(X, y)

def test_pipeline_fit_params():
    # Test that the pipeline can take fit parameters
    pipe = Pipeline([('transf', Transf()), ('clf', FitParamT())])
    pipe.fit(X=None, y=None, clf__should_succeed=True)
    # classifier should return True
    assert pipe.predict(None)
    # and transformer params should not be changed
    assert pipe.named_steps['transf'].a is None
    assert pipe.named_steps['transf'].b is None
    # invalid parameters should raise an error message
        "fit() got an unexpected keyword argument 'bad'",
        pipe.fit, None, None, clf__bad=True

def test_pipeline_sample_weight_supported():
    # Pipeline should pass sample_weight
    X = np.array([[1, 2]])
    pipe = Pipeline([('transf', Transf()), ('clf', FitParamT())])
    pipe.fit(X, y=None)
    assert pipe.score(X) == 3
    assert pipe.score(X, y=None) == 3
    assert pipe.score(X, y=None, sample_weight=None) == 3
    assert pipe.score(X, sample_weight=np.array([2, 3])) == 8

def test_pipeline_sample_weight_unsupported():
    # When sample_weight is None it shouldn't be passed
    X = np.array([[1, 2]])
    pipe = Pipeline([('transf', Transf()), ('clf', Mult())])
    pipe.fit(X, y=None)
    assert pipe.score(X) == 3
    assert pipe.score(X, sample_weight=None) == 3
        "score() got an unexpected keyword argument 'sample_weight'",
        pipe.score, X, sample_weight=np.array([2, 3])

def test_pipeline_raise_set_params_error():
    # Test pipeline raises set params error message for nested models.
    pipe = Pipeline([('cls', LinearRegression())])

    # expected error message
    error_msg = ('Invalid parameter %s for estimator %s. '
                 'Check the list of available parameters '
                 'with `estimator.get_params().keys()`.')

                         error_msg % ('fake', pipe),

    # nested model check
                         error_msg % ("fake", pipe),

def test_pipeline_methods_pca_svm():
    # Test the various methods of the pipeline (pca + svm).
    X = iris.data
    y = iris.target
    # Test with PCA + SVC
    clf = SVC(probability=True, random_state=0)
    pca = PCA(svd_solver='full', n_components='mle', whiten=True)
    pipe = Pipeline([('pca', pca), ('svc', clf)])
    pipe.fit(X, y)
    pipe.score(X, y)

def test_pipeline_score_samples_pca_lof():
    X = iris.data
    # Test that the score_samples method is implemented on a pipeline.
    # Test that the score_samples method on pipeline yields same results as
    # applying transform and score_samples steps separately.
    pca = PCA(svd_solver='full', n_components='mle', whiten=True)
    lof = LocalOutlierFactor(novelty=True)
    pipe = Pipeline([('pca', pca), ('lof', lof)])
