"""Testing for K-means"""
import sys
import numpy as np
from scipy import sparse as sp
from threadpoolctl import threadpool_limits
import pytest
from sklearn.utils._testing import assert_array_equal
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_allclose
from sklearn.utils._testing import assert_almost_equal
from sklearn.utils._testing import assert_warns
from sklearn.utils._testing import assert_warns_message
from sklearn.utils._testing import assert_raise_message
from sklearn.utils.validation import _num_samples
from sklearn.base import clone
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils.extmath import row_norms
from sklearn.metrics import pairwise_distances_argmin
from sklearn.metrics.cluster import v_measure_score
from sklearn.cluster import KMeans, k_means
from sklearn.cluster import MiniBatchKMeans
from sklearn.cluster._kmeans import _labels_inertia
from sklearn.cluster._kmeans import _mini_batch_step
from sklearn.cluster._k_means_fast import _relocate_empty_clusters_dense
from sklearn.cluster._k_means_fast import _relocate_empty_clusters_sparse
from sklearn.cluster._k_means_fast import _euclidean_dense_dense_wrapper
from sklearn.cluster._k_means_fast import _euclidean_sparse_dense_wrapper
from sklearn.cluster._k_means_fast import _inertia_dense
from sklearn.cluster._k_means_fast import _inertia_sparse
from sklearn.datasets import make_blobs
from io import StringIO
from sklearn.metrics.cluster import homogeneity_score
# non centered, sparse centers to check the
centers = np.array([
[0.0, 5.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 4.0, 0.0, 0.0],
[1.0, 0.0, 0.0, 5.0, 1.0],
])
n_samples = 100
n_clusters, n_features = centers.shape
X, true_labels = make_blobs(n_samples=n_samples, centers=centers,
cluster_std=1., random_state=42)
X_csr = sp.csr_matrix(X)
@pytest.mark.parametrize("representation", ["dense", "sparse"])
@pytest.mark.parametrize("algo", ["full", "elkan"])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_kmeans_results(representation, algo, dtype):
# cheks that kmeans works as intended
array_constr = {'dense': np.array, 'sparse': sp.csr_matrix}[representation]
X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]], dtype=dtype)
sample_weight = [3, 1, 1, 3] # will be rescaled to [1.5, 0.5, 0.5, 1.5]
init_centers = np.array([[0, 0], [1, 1]], dtype=dtype)
expected_labels = [0, 0, 1, 1]
expected_inertia = 0.1875
expected_centers = np.array([[0.125, 0], [0.875, 1]], dtype=dtype)
expected_n_iter = 2
kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
kmeans.fit(X, sample_weight=sample_weight)
assert_array_equal(kmeans.labels_, expected_labels)
assert_almost_equal(kmeans.inertia_, expected_inertia)
assert_array_almost_equal(kmeans.cluster_centers_, expected_centers)
assert kmeans.n_iter_ == expected_n_iter
@pytest.mark.parametrize("array_constr",
[np.array, sp.csr_matrix],
ids=['dense', 'sparse'])
@pytest.mark.parametrize("algo", ['full', 'elkan'])
def test_relocated_clusters(array_constr, algo):
# check that empty clusters are relocated as expected
X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]])
# second center too far from others points will be empty at first iter
init_centers = np.array([[0.5, 0.5], [3, 3]])
expected_labels = [0, 0, 1, 1]
expected_inertia = 0.25
expected_centers = [[0.25, 0], [0.75, 1]]
expected_n_iter = 3
kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
kmeans.fit(X)
assert_array_equal(kmeans.labels_, expected_labels)
assert_almost_equal(kmeans.inertia_, expected_inertia)
assert_array_almost_equal(kmeans.cluster_centers_, expected_centers)
assert kmeans.n_iter_ == expected_n_iter
@pytest.mark.parametrize("representation", ["dense", "sparse"])
def test_relocate_empty_clusters(representation):
# test for the _relocate_empty_clusters_(dense/sparse) helpers
# Synthetic dataset with 3 obvious clusters of different sizes
X = np.array(
[-10., -9.5, -9, -8.5, -8, -1, 1, 9, 9.5, 10]).reshape(-1, 1)
if representation == "sparse":
X = sp.csr_matrix(X)
sample_weight = np.full(shape=10, fill_value=1.)
# centers all initialized to the first point of X
centers_old = np.array([-10., -10, -10]).reshape(-1, 1)
# With this initialization, all points will be assigned to the first center
# At this point a center in centers_new is the weighted sum of the points
# it contains if it's not empty, otherwise it is the same as before.
centers_new = np.array([-16.5, -10, -10]).reshape(-1, 1)
weight_in_clusters = np.array([10., 0, 0])
labels = np.zeros(10, dtype=np.int32)
if representation == "dense":
_relocate_empty_clusters_dense(X, sample_weight, centers_old,
centers_new, weight_in_clusters, labels)
else:
_relocate_empty_clusters_sparse(X.data, X.indices, X.indptr,
sample_weight, centers_old,
centers_new, weight_in_clusters,
labels)
# The relocation scheme will take the 2 points farthest from the center and
# assign them to the 2 empty clusters, i.e. points at 10 and at 9.9. The
# first center will be updated to contain the other 8 points.
assert_array_equal(weight_in_clusters, [8, 1, 1])
assert_allclose(centers_new, [[-36], [10], [9.5]])
@pytest.mark.parametrize('distribution', ['normal', 'blobs'])
@pytest.mark.parametrize('tol', [1e-2, 1e-4, 1e-8])
def test_elkan_results(distribution, tol):
# check that results are identical between lloyd and elkan algorithms
rnd = np.random.RandomState(0)
if distribution == 'normal':
X = rnd.normal(size=(5000, 10))
else:
X, _ = make_blobs(random_state=rnd)
km_full = KMeans(algorithm='full', n_clusters=5,
random_state=0, n_init=1, tol=tol)
km_elkan = KMeans(algorithm='elkan', n_clusters=5,
random_state=0, n_init=1, tol=tol)
km_full.fit(X)
km_elkan.fit(X)
assert_allclose(km_elkan.cluster_centers_, km_full.cluster_centers_)
assert_array_equal(km_elkan.labels_, km_full.labels_)
assert km_elkan.n_iter_ == km_full.n_iter_
assert km_elkan.inertia_ == pytest.approx(km_full.inertia_, rel=1e-6)
@pytest.mark.parametrize('algorithm', ['full', 'elkan'])
def test_kmeans_convergence(algorithm):
# Check that KMeans stops when convergence is reached when tol=0. (#16075)
rnd = np.random.RandomState(0)
X = rnd.normal(size=(5000, 10))
km = KMeans(algorithm=algorithm, n_clusters=5, random_state=0, n_init=1,
tol=0, max_iter=300).fit(X)
assert km.n_iter_ < 300
@pytest.mark.parametrize('distribution', ['normal', 'blobs'])
def test_elkan_results_sparse(distribution):
# check that results are identical between lloyd and elkan algorithms
# with sparse input
rnd = np.random.RandomState(0)
if distribution == 'normal':
X = sp.random(100, 100, density=0.1, format='csr', random_state=rnd)
X.data = rnd.randn(len(X.data))
else:
X, _ = make_blobs(n_samples=100, n_features=100, random_state=rnd)
X = sp.csr_matrix(X)
km_full = KMeans(algorithm='full', n_clusters=5, random_state=0, n_init=1)
km_elkan = KMeans(algorithm='elkan', n_clusters=5,
random_state=0, n_init=1)
km_full.fit(X)
km_elkan.fit(X)
assert_allclose(km_elkan.cluster_centers_, km_full.cluster_centers_)
assert_allclose(km_elkan.labels_, km_full.labels_)
def test_labels_assignment_and_inertia():
# pure numpy implementation as easily auditable reference gold
# implementation
rng = np.random.RandomState(42)
noisy_centers = centers + rng.normal(size=centers.shape)
labels_gold = np.full(n_samples, -1, dtype=np.int)
mindist = np.empty(n_samples)
mindist.fill(np.infty)
for center_id in range(n_clusters):
dist = np.sum((X - noisy_centers[center_id]) ** 2, axis=1)
labels_gold[dist < mindist] = center_id
mindist = np.minimum(dist, mindist)
inertia_gold = mindist.sum()
assert (mindist >= 0.0).all()
assert (labels_gold != -1).all()
sample_weight = None
# perform label assignment using the dense array input
x_squared_norms = (X ** 2).sum(axis=1)
labels_array, inertia_array = _labels_inertia(
X, sample_weight, x_squared_norms, noisy_centers)
assert_array_almost_equal(inertia_array, inertia_gold)
assert_array_equal(labels_array, labels_gold)
# perform label assignment using the sparse CSR input
x_squared_norms_from_csr = row_norms(X_csr, squared=True)
labels_csr, inertia_csr = _labels_inertia(
X_csr, sample_weight, x_squared_norms_from_csr, noisy_centers)
assert_array_almost_equal(inertia_csr, inertia_gold)
assert_array_equal(labels_csr, labels_gold)
def test_minibatch_update_consistency():
# Check that dense and sparse minibatch update give the same results
rng = np.random.RandomState(42)
old_centers = centers + rng.normal(size=centers.shape)
new_centers = old_centers.copy()
new_centers_csr = old_centers.copy()
weight_sums = np.zeros(new_centers.shape[0], dtype=np.double)
weight_sums_csr = np.zeros(new_centers.shape[0], dtype=np.double)
x_squared_norms = (X ** 2).sum(axis=1)
x_squared_norms_csr = row_norms(X_csr, squared=True)
buffer = np.zeros(centers.shape[1], dtype=np.double)
buffer_csr = np.zeros(centers.shape[1], dtype=np.double)
# extract a small minibatch
X_mb = X[:10]
X_mb_csr = X_csr[:10]
x_mb_squared_norms = x_squared_norms[:10]
x_mb_squared_norms_csr = x_squared_norms_csr[:10]
sample_weight_mb = np.ones(X_mb.shape[0], dtype=np.double)
# step 1: compute the dense minibatch update
old_inertia, incremental_diff = _mini_batch_step(
X_mb, sample_weight_mb, x_mb_squared_norms, new_centers, weight_sums,
buffer, 1, None, random_reassign=False)
assert old_inertia > 0.0
# compute the new inertia on the same batch to check that it decreased
labels, new_inertia = _labels_inertia(
X_mb, sample_weight_mb, x_mb_squared_norms, new_centers)
assert new_inertia > 0.0
assert new_inertia < old_inertia
# check that the incremental difference computation is matching the
# final observed value
effective_diff = np.sum((new_centers - old_centers) ** 2)
assert_almost_equal(incremental_diff, effective_diff)
# step 2: compute the sparse minibatch update
old_inertia_csr, incremental_diff_csr = _mini_batch_step(
X_mb_csr, sample_weight_mb, x_mb_squared_norms_csr, new_centers_csr,
weight_sums_csr, buffer_csr, 1, None, random_reassign=False)
assert old_inertia_csr > 0.0
# compute the new inertia on the same batch to check that it decreased
labels_csr, new_inertia_csr = _labels_inertia(
X_mb_csr, sample_weight_mb, x_mb_squared_norms_csr, new_centers_csr)
assert new_inertia_csr > 0.0
assert new_inertia_csr < old_inertia_csr
# check that the incremental difference computation is matching the
# final observed value
effective_diff = np.sum((new_centers_csr - old_centers) ** 2)
assert_almost_equal(incremental_diff_csr, effective_diff)
# step 3: check that sparse and dense updates lead to the same results
assert_array_equal(labels, labels_csr)
assert_array_almost_equal(new_centers, new_centers_csr)
assert_almost_equal(incremental_diff, incremental_diff_csr)
assert_almost_equal(old_inertia, old_inertia_csr)
assert_almost_equal(new_inertia, new_inertia_csr)
def _check_fitted_model(km):
# check that the number of clusters centers and distinct labels match
# the expectation
centers = km.cluster_centers_
assert centers.shape == (n_clusters, n_features)
labels = km.labels_
assert np.unique(labels).shape[0] == n_clusters
# check that the labels assignment are perfect (up to a permutation)
assert v_measure_score(true_labels, labels) == 1.0
assert km.inertia_ > 0.0
# check error on dataset being too small
assert_raise_message(ValueError, "n_samples=1 should be >= n_clusters=%d"
% km.n_clusters, km.fit, [[0., 1.]])
def test_k_means_new_centers():
# Explore the part of the code where a new center is reassigned
X = np.array([[0, 0, 1, 1],
[0, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 1, 0, 0]])
labels = [0, 1, 2, 1, 1, 2]
bad_centers = np.array([[+0, 1, 0, 0],
[.2, 0, .2, .2],
[+0, 0, 0, 0]])
km = KMeans(n_clusters=3, init=bad_centers, n_init=1, max_iter=10,
random_state=1)
for this_X in (X, sp.coo_matrix(X)):
km.fit(this_X)
this_labels = km.labels_
# Reorder the labels so that the first instance is in cluster 0,
# the second in cluster 1, ...
this_labels = np.unique(this_labels, return_index=True)[1][this_labels]
np.testing.assert_array_equal(this_labels, labels)
@pytest.mark.parametrize('data', [X, X_csr], ids=['dense', 'sparse'])
@pytest.mark.parametrize('init', ['random', 'k-means++', centers.copy()])
def test_k_means_init(data, init):
km = KMeans(init=init, n_clusters=n_clusters, random_state=42, n_init=1)
km.fit(data)
_check_fitted_model(km)
Loading ...