Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / scikit-learn   python

Repository URL to install this package:

/ cluster / tests / test_k_means.py

"""Testing for K-means"""
import sys

import numpy as np
from scipy import sparse as sp
from threadpoolctl import threadpool_limits

import pytest

from sklearn.utils._testing import assert_array_equal
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_allclose
from sklearn.utils._testing import assert_almost_equal
from sklearn.utils._testing import assert_warns
from sklearn.utils._testing import assert_warns_message
from sklearn.utils._testing import assert_raise_message
from sklearn.utils.validation import _num_samples
from sklearn.base import clone
from sklearn.exceptions import ConvergenceWarning

from sklearn.utils.extmath import row_norms
from sklearn.metrics import pairwise_distances_argmin
from sklearn.metrics.cluster import v_measure_score
from sklearn.cluster import KMeans, k_means
from sklearn.cluster import MiniBatchKMeans
from sklearn.cluster._kmeans import _labels_inertia
from sklearn.cluster._kmeans import _mini_batch_step
from sklearn.cluster._k_means_fast import _relocate_empty_clusters_dense
from sklearn.cluster._k_means_fast import _relocate_empty_clusters_sparse
from sklearn.cluster._k_means_fast import _euclidean_dense_dense_wrapper
from sklearn.cluster._k_means_fast import _euclidean_sparse_dense_wrapper
from sklearn.cluster._k_means_fast import _inertia_dense
from sklearn.cluster._k_means_fast import _inertia_sparse
from sklearn.datasets import make_blobs
from io import StringIO
from sklearn.metrics.cluster import homogeneity_score


# non centered, sparse centers to check the
centers = np.array([
    [0.0, 5.0, 0.0, 0.0, 0.0],
    [1.0, 1.0, 4.0, 0.0, 0.0],
    [1.0, 0.0, 0.0, 5.0, 1.0],
])
n_samples = 100
n_clusters, n_features = centers.shape
X, true_labels = make_blobs(n_samples=n_samples, centers=centers,
                            cluster_std=1., random_state=42)
X_csr = sp.csr_matrix(X)


@pytest.mark.parametrize("representation", ["dense", "sparse"])
@pytest.mark.parametrize("algo", ["full", "elkan"])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_kmeans_results(representation, algo, dtype):
    # cheks that kmeans works as intended
    array_constr = {'dense': np.array, 'sparse': sp.csr_matrix}[representation]
    X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]], dtype=dtype)
    sample_weight = [3, 1, 1, 3]  # will be rescaled to [1.5, 0.5, 0.5, 1.5]
    init_centers = np.array([[0, 0], [1, 1]], dtype=dtype)

    expected_labels = [0, 0, 1, 1]
    expected_inertia = 0.1875
    expected_centers = np.array([[0.125, 0], [0.875, 1]], dtype=dtype)
    expected_n_iter = 2

    kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
    kmeans.fit(X, sample_weight=sample_weight)

    assert_array_equal(kmeans.labels_, expected_labels)
    assert_almost_equal(kmeans.inertia_, expected_inertia)
    assert_array_almost_equal(kmeans.cluster_centers_, expected_centers)
    assert kmeans.n_iter_ == expected_n_iter


@pytest.mark.parametrize("array_constr",
                         [np.array, sp.csr_matrix],
                         ids=['dense', 'sparse'])
@pytest.mark.parametrize("algo", ['full', 'elkan'])
def test_relocated_clusters(array_constr, algo):
    # check that empty clusters are relocated as expected
    X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]])

    # second center too far from others points will be empty at first iter
    init_centers = np.array([[0.5, 0.5], [3, 3]])

    expected_labels = [0, 0, 1, 1]
    expected_inertia = 0.25
    expected_centers = [[0.25, 0], [0.75, 1]]
    expected_n_iter = 3

    kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
    kmeans.fit(X)

    assert_array_equal(kmeans.labels_, expected_labels)
    assert_almost_equal(kmeans.inertia_, expected_inertia)
    assert_array_almost_equal(kmeans.cluster_centers_, expected_centers)
    assert kmeans.n_iter_ == expected_n_iter


@pytest.mark.parametrize("representation", ["dense", "sparse"])
def test_relocate_empty_clusters(representation):
    # test for the _relocate_empty_clusters_(dense/sparse) helpers

    # Synthetic dataset with 3 obvious clusters of different sizes
    X = np.array(
        [-10., -9.5, -9, -8.5, -8, -1, 1, 9, 9.5, 10]).reshape(-1, 1)
    if representation == "sparse":
        X = sp.csr_matrix(X)
    sample_weight = np.full(shape=10, fill_value=1.)

    # centers all initialized to the first point of X
    centers_old = np.array([-10., -10, -10]).reshape(-1, 1)

    # With this initialization, all points will be assigned to the first center
    # At this point a center in centers_new is the weighted sum of the points
    # it contains if it's not empty, otherwise it is the same as before.
    centers_new = np.array([-16.5, -10, -10]).reshape(-1, 1)
    weight_in_clusters = np.array([10., 0, 0])
    labels = np.zeros(10, dtype=np.int32)

    if representation == "dense":
        _relocate_empty_clusters_dense(X, sample_weight, centers_old,
                                       centers_new, weight_in_clusters, labels)
    else:
        _relocate_empty_clusters_sparse(X.data, X.indices, X.indptr,
                                        sample_weight, centers_old,
                                        centers_new, weight_in_clusters,
                                        labels)

    # The relocation scheme will take the 2 points farthest from the center and
    # assign them to the 2 empty clusters, i.e. points at 10 and at 9.9. The
    # first center will be updated to contain the other 8 points.
    assert_array_equal(weight_in_clusters, [8, 1, 1])
    assert_allclose(centers_new, [[-36], [10], [9.5]])


@pytest.mark.parametrize('distribution', ['normal', 'blobs'])
@pytest.mark.parametrize('tol', [1e-2, 1e-4, 1e-8])
def test_elkan_results(distribution, tol):
    # check that results are identical between lloyd and elkan algorithms
    rnd = np.random.RandomState(0)
    if distribution == 'normal':
        X = rnd.normal(size=(5000, 10))
    else:
        X, _ = make_blobs(random_state=rnd)

    km_full = KMeans(algorithm='full', n_clusters=5,
                     random_state=0, n_init=1, tol=tol)
    km_elkan = KMeans(algorithm='elkan', n_clusters=5,
                      random_state=0, n_init=1, tol=tol)

    km_full.fit(X)
    km_elkan.fit(X)
    assert_allclose(km_elkan.cluster_centers_, km_full.cluster_centers_)
    assert_array_equal(km_elkan.labels_, km_full.labels_)

    assert km_elkan.n_iter_ == km_full.n_iter_
    assert km_elkan.inertia_ == pytest.approx(km_full.inertia_, rel=1e-6)


@pytest.mark.parametrize('algorithm', ['full', 'elkan'])
def test_kmeans_convergence(algorithm):
    # Check that KMeans stops when convergence is reached when tol=0. (#16075)
    rnd = np.random.RandomState(0)
    X = rnd.normal(size=(5000, 10))

    km = KMeans(algorithm=algorithm, n_clusters=5, random_state=0, n_init=1,
                tol=0, max_iter=300).fit(X)

    assert km.n_iter_ < 300


@pytest.mark.parametrize('distribution', ['normal', 'blobs'])
def test_elkan_results_sparse(distribution):
    # check that results are identical between lloyd and elkan algorithms
    # with sparse input
    rnd = np.random.RandomState(0)
    if distribution == 'normal':
        X = sp.random(100, 100, density=0.1, format='csr', random_state=rnd)
        X.data = rnd.randn(len(X.data))
    else:
        X, _ = make_blobs(n_samples=100, n_features=100, random_state=rnd)
        X = sp.csr_matrix(X)

    km_full = KMeans(algorithm='full', n_clusters=5, random_state=0, n_init=1)
    km_elkan = KMeans(algorithm='elkan', n_clusters=5,
                      random_state=0, n_init=1)

    km_full.fit(X)
    km_elkan.fit(X)
    assert_allclose(km_elkan.cluster_centers_, km_full.cluster_centers_)
    assert_allclose(km_elkan.labels_, km_full.labels_)


def test_labels_assignment_and_inertia():
    # pure numpy implementation as easily auditable reference gold
    # implementation
    rng = np.random.RandomState(42)
    noisy_centers = centers + rng.normal(size=centers.shape)
    labels_gold = np.full(n_samples, -1, dtype=np.int)
    mindist = np.empty(n_samples)
    mindist.fill(np.infty)
    for center_id in range(n_clusters):
        dist = np.sum((X - noisy_centers[center_id]) ** 2, axis=1)
        labels_gold[dist < mindist] = center_id
        mindist = np.minimum(dist, mindist)
    inertia_gold = mindist.sum()
    assert (mindist >= 0.0).all()
    assert (labels_gold != -1).all()

    sample_weight = None

    # perform label assignment using the dense array input
    x_squared_norms = (X ** 2).sum(axis=1)
    labels_array, inertia_array = _labels_inertia(
        X, sample_weight, x_squared_norms, noisy_centers)
    assert_array_almost_equal(inertia_array, inertia_gold)
    assert_array_equal(labels_array, labels_gold)

    # perform label assignment using the sparse CSR input
    x_squared_norms_from_csr = row_norms(X_csr, squared=True)
    labels_csr, inertia_csr = _labels_inertia(
        X_csr, sample_weight, x_squared_norms_from_csr, noisy_centers)
    assert_array_almost_equal(inertia_csr, inertia_gold)
    assert_array_equal(labels_csr, labels_gold)


def test_minibatch_update_consistency():
    # Check that dense and sparse minibatch update give the same results
    rng = np.random.RandomState(42)
    old_centers = centers + rng.normal(size=centers.shape)

    new_centers = old_centers.copy()
    new_centers_csr = old_centers.copy()

    weight_sums = np.zeros(new_centers.shape[0], dtype=np.double)
    weight_sums_csr = np.zeros(new_centers.shape[0], dtype=np.double)

    x_squared_norms = (X ** 2).sum(axis=1)
    x_squared_norms_csr = row_norms(X_csr, squared=True)

    buffer = np.zeros(centers.shape[1], dtype=np.double)
    buffer_csr = np.zeros(centers.shape[1], dtype=np.double)

    # extract a small minibatch
    X_mb = X[:10]
    X_mb_csr = X_csr[:10]
    x_mb_squared_norms = x_squared_norms[:10]
    x_mb_squared_norms_csr = x_squared_norms_csr[:10]

    sample_weight_mb = np.ones(X_mb.shape[0], dtype=np.double)

    # step 1: compute the dense minibatch update
    old_inertia, incremental_diff = _mini_batch_step(
        X_mb, sample_weight_mb, x_mb_squared_norms, new_centers, weight_sums,
        buffer, 1, None, random_reassign=False)
    assert old_inertia > 0.0

    # compute the new inertia on the same batch to check that it decreased
    labels, new_inertia = _labels_inertia(
        X_mb, sample_weight_mb, x_mb_squared_norms, new_centers)
    assert new_inertia > 0.0
    assert new_inertia < old_inertia

    # check that the incremental difference computation is matching the
    # final observed value
    effective_diff = np.sum((new_centers - old_centers) ** 2)
    assert_almost_equal(incremental_diff, effective_diff)

    # step 2: compute the sparse minibatch update
    old_inertia_csr, incremental_diff_csr = _mini_batch_step(
        X_mb_csr, sample_weight_mb, x_mb_squared_norms_csr, new_centers_csr,
        weight_sums_csr, buffer_csr, 1, None, random_reassign=False)
    assert old_inertia_csr > 0.0

    # compute the new inertia on the same batch to check that it decreased
    labels_csr, new_inertia_csr = _labels_inertia(
        X_mb_csr, sample_weight_mb, x_mb_squared_norms_csr, new_centers_csr)
    assert new_inertia_csr > 0.0
    assert new_inertia_csr < old_inertia_csr

    # check that the incremental difference computation is matching the
    # final observed value
    effective_diff = np.sum((new_centers_csr - old_centers) ** 2)
    assert_almost_equal(incremental_diff_csr, effective_diff)

    # step 3: check that sparse and dense updates lead to the same results
    assert_array_equal(labels, labels_csr)
    assert_array_almost_equal(new_centers, new_centers_csr)
    assert_almost_equal(incremental_diff, incremental_diff_csr)
    assert_almost_equal(old_inertia, old_inertia_csr)
    assert_almost_equal(new_inertia, new_inertia_csr)


def _check_fitted_model(km):
    # check that the number of clusters centers and distinct labels match
    # the expectation
    centers = km.cluster_centers_
    assert centers.shape == (n_clusters, n_features)

    labels = km.labels_
    assert np.unique(labels).shape[0] == n_clusters

    # check that the labels assignment are perfect (up to a permutation)
    assert v_measure_score(true_labels, labels) == 1.0
    assert km.inertia_ > 0.0

    # check error on dataset being too small
    assert_raise_message(ValueError, "n_samples=1 should be >= n_clusters=%d"
                         % km.n_clusters, km.fit, [[0., 1.]])


def test_k_means_new_centers():
    # Explore the part of the code where a new center is reassigned
    X = np.array([[0, 0, 1, 1],
                  [0, 0, 0, 0],
                  [0, 1, 0, 0],
                  [0, 0, 0, 0],
                  [0, 0, 0, 0],
                  [0, 1, 0, 0]])
    labels = [0, 1, 2, 1, 1, 2]
    bad_centers = np.array([[+0, 1, 0, 0],
                            [.2, 0, .2, .2],
                            [+0, 0, 0, 0]])

    km = KMeans(n_clusters=3, init=bad_centers, n_init=1, max_iter=10,
                random_state=1)
    for this_X in (X, sp.coo_matrix(X)):
        km.fit(this_X)
        this_labels = km.labels_
        # Reorder the labels so that the first instance is in cluster 0,
        # the second in cluster 1, ...
        this_labels = np.unique(this_labels, return_index=True)[1][this_labels]
        np.testing.assert_array_equal(this_labels, labels)


@pytest.mark.parametrize('data', [X, X_csr], ids=['dense', 'sparse'])
@pytest.mark.parametrize('init', ['random', 'k-means++', centers.copy()])
def test_k_means_init(data, init):
    km = KMeans(init=init, n_clusters=n_clusters, random_state=42, n_init=1)
    km.fit(data)
    _check_fitted_model(km)

Loading ...