Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

stream / scikit-learn   python

Repository URL to install this package:

/ cluster / tests / test_k_means.py

"""Testing for K-means"""
import sys

import numpy as np
from scipy import sparse as sp

from sklearn.utils.testing import assert_equal
from sklearn.utils.testing import assert_array_equal
from sklearn.utils.testing import assert_array_almost_equal
from sklearn.utils.testing import SkipTest
from sklearn.utils.testing import assert_almost_equal
from sklearn.utils.testing import assert_raises
from sklearn.utils.testing import assert_raises_regex
from sklearn.utils.testing import assert_true
from sklearn.utils.testing import assert_greater
from sklearn.utils.testing import assert_less
from sklearn.utils.testing import assert_warns
from sklearn.utils.testing import if_safe_multiprocessing_with_blas
from sklearn.utils.testing import if_not_mac_os
from sklearn.utils.testing import assert_raise_message


from sklearn.utils.validation import DataConversionWarning
from sklearn.utils.extmath import row_norms
from sklearn.metrics.cluster import v_measure_score
from sklearn.cluster import KMeans, k_means
from sklearn.cluster import MiniBatchKMeans
from sklearn.cluster.k_means_ import _labels_inertia
from sklearn.cluster.k_means_ import _mini_batch_step
from sklearn.datasets.samples_generator import make_blobs
from sklearn.externals.six.moves import cStringIO as StringIO


# non centered, sparse centers to check the
centers = np.array([
    [0.0, 5.0, 0.0, 0.0, 0.0],
    [1.0, 1.0, 4.0, 0.0, 0.0],
    [1.0, 0.0, 0.0, 5.0, 1.0],
])
n_samples = 100
n_clusters, n_features = centers.shape
X, true_labels = make_blobs(n_samples=n_samples, centers=centers,
                            cluster_std=1., random_state=42)
X_csr = sp.csr_matrix(X)


def test_kmeans_dtype():
    rnd = np.random.RandomState(0)
    X = rnd.normal(size=(40, 2))
    X = (X * 10).astype(np.uint8)
    km = KMeans(n_init=1).fit(X)
    pred_x = assert_warns(DataConversionWarning, km.predict, X)
    assert_array_equal(km.labels_, pred_x)


def test_labels_assignment_and_inertia():
    # pure numpy implementation as easily auditable reference gold
    # implementation
    rng = np.random.RandomState(42)
    noisy_centers = centers + rng.normal(size=centers.shape)
    labels_gold = - np.ones(n_samples, dtype=np.int)
    mindist = np.empty(n_samples)
    mindist.fill(np.infty)
    for center_id in range(n_clusters):
        dist = np.sum((X - noisy_centers[center_id]) ** 2, axis=1)
        labels_gold[dist < mindist] = center_id
        mindist = np.minimum(dist, mindist)
    inertia_gold = mindist.sum()
    assert_true((mindist >= 0.0).all())
    assert_true((labels_gold != -1).all())

    # perform label assignment using the dense array input
    x_squared_norms = (X ** 2).sum(axis=1)
    labels_array, inertia_array = _labels_inertia(
        X, x_squared_norms, noisy_centers)
    assert_array_almost_equal(inertia_array, inertia_gold)
    assert_array_equal(labels_array, labels_gold)

    # perform label assignment using the sparse CSR input
    x_squared_norms_from_csr = row_norms(X_csr, squared=True)
    labels_csr, inertia_csr = _labels_inertia(
        X_csr, x_squared_norms_from_csr, noisy_centers)
    assert_array_almost_equal(inertia_csr, inertia_gold)
    assert_array_equal(labels_csr, labels_gold)


def test_minibatch_update_consistency():
    # Check that dense and sparse minibatch update give the same results
    rng = np.random.RandomState(42)
    old_centers = centers + rng.normal(size=centers.shape)

    new_centers = old_centers.copy()
    new_centers_csr = old_centers.copy()

    counts = np.zeros(new_centers.shape[0], dtype=np.int32)
    counts_csr = np.zeros(new_centers.shape[0], dtype=np.int32)

    x_squared_norms = (X ** 2).sum(axis=1)
    x_squared_norms_csr = row_norms(X_csr, squared=True)

    buffer = np.zeros(centers.shape[1], dtype=np.double)
    buffer_csr = np.zeros(centers.shape[1], dtype=np.double)

    # extract a small minibatch
    X_mb = X[:10]
    X_mb_csr = X_csr[:10]
    x_mb_squared_norms = x_squared_norms[:10]
    x_mb_squared_norms_csr = x_squared_norms_csr[:10]

    # step 1: compute the dense minibatch update
    old_inertia, incremental_diff = _mini_batch_step(
        X_mb, x_mb_squared_norms, new_centers, counts,
        buffer, 1, None, random_reassign=False)
    assert_greater(old_inertia, 0.0)

    # compute the new inertia on the same batch to check that it decreased
    labels, new_inertia = _labels_inertia(
        X_mb, x_mb_squared_norms, new_centers)
    assert_greater(new_inertia, 0.0)
    assert_less(new_inertia, old_inertia)

    # check that the incremental difference computation is matching the
    # final observed value
    effective_diff = np.sum((new_centers - old_centers) ** 2)
    assert_almost_equal(incremental_diff, effective_diff)

    # step 2: compute the sparse minibatch update
    old_inertia_csr, incremental_diff_csr = _mini_batch_step(
        X_mb_csr, x_mb_squared_norms_csr, new_centers_csr, counts_csr,
        buffer_csr, 1, None, random_reassign=False)
    assert_greater(old_inertia_csr, 0.0)

    # compute the new inertia on the same batch to check that it decreased
    labels_csr, new_inertia_csr = _labels_inertia(
        X_mb_csr, x_mb_squared_norms_csr, new_centers_csr)
    assert_greater(new_inertia_csr, 0.0)
    assert_less(new_inertia_csr, old_inertia_csr)

    # check that the incremental difference computation is matching the
    # final observed value
    effective_diff = np.sum((new_centers_csr - old_centers) ** 2)
    assert_almost_equal(incremental_diff_csr, effective_diff)

    # step 3: check that sparse and dense updates lead to the same results
    assert_array_equal(labels, labels_csr)
    assert_array_almost_equal(new_centers, new_centers_csr)
    assert_almost_equal(incremental_diff, incremental_diff_csr)
    assert_almost_equal(old_inertia, old_inertia_csr)
    assert_almost_equal(new_inertia, new_inertia_csr)


def _check_fitted_model(km):
    # check that the number of clusters centers and distinct labels match
    # the expectation
    centers = km.cluster_centers_
    assert_equal(centers.shape, (n_clusters, n_features))

    labels = km.labels_
    assert_equal(np.unique(labels).shape[0], n_clusters)

    # check that the labels assignment are perfect (up to a permutation)
    assert_equal(v_measure_score(true_labels, labels), 1.0)
    assert_greater(km.inertia_, 0.0)

    # check error on dataset being too small
    assert_raises(ValueError, km.fit, [[0., 1.]])


def test_k_means_plus_plus_init():
    km = KMeans(init="k-means++", n_clusters=n_clusters,
                random_state=42).fit(X)
    _check_fitted_model(km)


def test_k_means_new_centers():
    # Explore the part of the code where a new center is reassigned
    X = np.array([[0, 0, 1, 1],
                  [0, 0, 0, 0],
                  [0, 1, 0, 0],
                  [0, 0, 0, 0],
                  [0, 0, 0, 0],
                  [0, 1, 0, 0]])
    labels = [0, 1, 2, 1, 1, 2]
    bad_centers = np.array([[+0, 1, 0, 0],
                            [.2, 0, .2, .2],
                            [+0, 0, 0, 0]])

    km = KMeans(n_clusters=3, init=bad_centers, n_init=1, max_iter=10,
                random_state=1)
    for this_X in (X, sp.coo_matrix(X)):
        km.fit(this_X)
        this_labels = km.labels_
        # Reorder the labels so that the first instance is in cluster 0,
        # the second in cluster 1, ...
        this_labels = np.unique(this_labels, return_index=True)[1][this_labels]
        np.testing.assert_array_equal(this_labels, labels)


@if_safe_multiprocessing_with_blas
def test_k_means_plus_plus_init_2_jobs():
    if sys.version_info[:2] < (3, 4):
        raise SkipTest(
            "Possible multi-process bug with some BLAS under Python < 3.4")

    km = KMeans(init="k-means++", n_clusters=n_clusters, n_jobs=2,
                random_state=42).fit(X)
    _check_fitted_model(km)


def test_k_means_precompute_distances_flag():
    # check that a warning is raised if the precompute_distances flag is not
    # supported
    km = KMeans(precompute_distances="wrong")
    assert_raises(ValueError, km.fit, X)


def test_k_means_plus_plus_init_sparse():
    km = KMeans(init="k-means++", n_clusters=n_clusters, random_state=42)
    km.fit(X_csr)
    _check_fitted_model(km)


def test_k_means_random_init():
    km = KMeans(init="random", n_clusters=n_clusters, random_state=42)
    km.fit(X)
    _check_fitted_model(km)


def test_k_means_random_init_sparse():
    km = KMeans(init="random", n_clusters=n_clusters, random_state=42)
    km.fit(X_csr)
    _check_fitted_model(km)


def test_k_means_plus_plus_init_not_precomputed():
    km = KMeans(init="k-means++", n_clusters=n_clusters, random_state=42,
                precompute_distances=False).fit(X)
    _check_fitted_model(km)


def test_k_means_random_init_not_precomputed():
    km = KMeans(init="random", n_clusters=n_clusters, random_state=42,
                precompute_distances=False).fit(X)
    _check_fitted_model(km)


def test_k_means_perfect_init():
    km = KMeans(init=centers.copy(), n_clusters=n_clusters, random_state=42,
                n_init=1)
    km.fit(X)
    _check_fitted_model(km)


def test_k_means_n_init():
    rnd = np.random.RandomState(0)
    X = rnd.normal(size=(40, 2))

    # two regression tests on bad n_init argument
    # previous bug: n_init <= 0 threw non-informative TypeError (#3858)
    assert_raises_regex(ValueError, "n_init", KMeans(n_init=0).fit, X)
    assert_raises_regex(ValueError, "n_init", KMeans(n_init=-1).fit, X)


def test_k_means_explicit_init_shape():
    # test for sensible errors when giving explicit init
    # with wrong number of features or clusters
    rnd = np.random.RandomState(0)
    X = rnd.normal(size=(40, 3))
    for Class in [KMeans, MiniBatchKMeans]:
        # mismatch of number of features
        km = Class(n_init=1, init=X[:, :2], n_clusters=len(X))
        msg = "does not match the number of features of the data"
        assert_raises_regex(ValueError, msg, km.fit, X)
        # for callable init
        km = Class(n_init=1, init=lambda X_, k, random_state: X_[:, :2], n_clusters=len(X))
        assert_raises_regex(ValueError, msg, km.fit, X)
        # mismatch of number of clusters
        msg = "does not match the number of clusters"
        km = Class(n_init=1, init=X[:2, :], n_clusters=3)
        assert_raises_regex(ValueError, msg, km.fit, X)
        # for callable init
        km = Class(n_init=1, init=lambda X_, k, random_state: X_[:2, :], n_clusters=3)
        assert_raises_regex(ValueError, msg, km.fit, X)


def test_k_means_fortran_aligned_data():
    # Check the KMeans will work well, even if X is a fortran-aligned data.
    X = np.asfortranarray([[0, 0], [0, 1], [0, 1]])
    centers = np.array([[0, 0], [0, 1]])
    labels = np.array([0, 1, 1])
    km = KMeans(n_init=1, init=centers, precompute_distances=False,
                random_state=42, n_clusters=2)
    km.fit(X)
    assert_array_equal(km.cluster_centers_, centers)
    assert_array_equal(km.labels_, labels)


def test_mb_k_means_plus_plus_init_dense_array():
    mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
                                 random_state=42)
    mb_k_means.fit(X)
    _check_fitted_model(mb_k_means)


def test_mb_kmeans_verbose():
    mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
                                 random_state=42, verbose=1)
    old_stdout = sys.stdout
    sys.stdout = StringIO()
    try:
        mb_k_means.fit(X)
    finally:
        sys.stdout = old_stdout


def test_mb_k_means_plus_plus_init_sparse_matrix():
    mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
                                 random_state=42)
    mb_k_means.fit(X_csr)
    _check_fitted_model(mb_k_means)


def test_minibatch_init_with_large_k():
    mb_k_means = MiniBatchKMeans(init='k-means++', init_size=10, n_clusters=20)
    # Check that a warning is raised, as the number clusters is larger
    # than the init_size
    assert_warns(RuntimeWarning, mb_k_means.fit, X)


def test_minibatch_k_means_random_init_dense_array():
    # increase n_init to make random init stable enough
    mb_k_means = MiniBatchKMeans(init="random", n_clusters=n_clusters,
                                 random_state=42, n_init=10).fit(X)
    _check_fitted_model(mb_k_means)


def test_minibatch_k_means_random_init_sparse_csr():
    # increase n_init to make random init stable enough
    mb_k_means = MiniBatchKMeans(init="random", n_clusters=n_clusters,
                                 random_state=42, n_init=10).fit(X_csr)
    _check_fitted_model(mb_k_means)


def test_minibatch_k_means_perfect_init_dense_array():
    mb_k_means = MiniBatchKMeans(init=centers.copy(), n_clusters=n_clusters,
Loading ...