"""Testing for K-means"""
import sys
import numpy as np
from scipy import sparse as sp
from sklearn.utils.testing import assert_equal
from sklearn.utils.testing import assert_array_equal
from sklearn.utils.testing import assert_array_almost_equal
from sklearn.utils.testing import SkipTest
from sklearn.utils.testing import assert_almost_equal
from sklearn.utils.testing import assert_raises
from sklearn.utils.testing import assert_raises_regex
from sklearn.utils.testing import assert_true
from sklearn.utils.testing import assert_greater
from sklearn.utils.testing import assert_less
from sklearn.utils.testing import assert_warns
from sklearn.utils.testing import if_safe_multiprocessing_with_blas
from sklearn.utils.testing import if_not_mac_os
from sklearn.utils.testing import assert_raise_message
from sklearn.utils.validation import DataConversionWarning
from sklearn.utils.extmath import row_norms
from sklearn.metrics.cluster import v_measure_score
from sklearn.cluster import KMeans, k_means
from sklearn.cluster import MiniBatchKMeans
from sklearn.cluster.k_means_ import _labels_inertia
from sklearn.cluster.k_means_ import _mini_batch_step
from sklearn.datasets.samples_generator import make_blobs
from sklearn.externals.six.moves import cStringIO as StringIO
# non centered, sparse centers to check the
centers = np.array([
[0.0, 5.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 4.0, 0.0, 0.0],
[1.0, 0.0, 0.0, 5.0, 1.0],
])
n_samples = 100
n_clusters, n_features = centers.shape
X, true_labels = make_blobs(n_samples=n_samples, centers=centers,
cluster_std=1., random_state=42)
X_csr = sp.csr_matrix(X)
def test_kmeans_dtype():
rnd = np.random.RandomState(0)
X = rnd.normal(size=(40, 2))
X = (X * 10).astype(np.uint8)
km = KMeans(n_init=1).fit(X)
pred_x = assert_warns(DataConversionWarning, km.predict, X)
assert_array_equal(km.labels_, pred_x)
def test_labels_assignment_and_inertia():
# pure numpy implementation as easily auditable reference gold
# implementation
rng = np.random.RandomState(42)
noisy_centers = centers + rng.normal(size=centers.shape)
labels_gold = - np.ones(n_samples, dtype=np.int)
mindist = np.empty(n_samples)
mindist.fill(np.infty)
for center_id in range(n_clusters):
dist = np.sum((X - noisy_centers[center_id]) ** 2, axis=1)
labels_gold[dist < mindist] = center_id
mindist = np.minimum(dist, mindist)
inertia_gold = mindist.sum()
assert_true((mindist >= 0.0).all())
assert_true((labels_gold != -1).all())
# perform label assignment using the dense array input
x_squared_norms = (X ** 2).sum(axis=1)
labels_array, inertia_array = _labels_inertia(
X, x_squared_norms, noisy_centers)
assert_array_almost_equal(inertia_array, inertia_gold)
assert_array_equal(labels_array, labels_gold)
# perform label assignment using the sparse CSR input
x_squared_norms_from_csr = row_norms(X_csr, squared=True)
labels_csr, inertia_csr = _labels_inertia(
X_csr, x_squared_norms_from_csr, noisy_centers)
assert_array_almost_equal(inertia_csr, inertia_gold)
assert_array_equal(labels_csr, labels_gold)
def test_minibatch_update_consistency():
# Check that dense and sparse minibatch update give the same results
rng = np.random.RandomState(42)
old_centers = centers + rng.normal(size=centers.shape)
new_centers = old_centers.copy()
new_centers_csr = old_centers.copy()
counts = np.zeros(new_centers.shape[0], dtype=np.int32)
counts_csr = np.zeros(new_centers.shape[0], dtype=np.int32)
x_squared_norms = (X ** 2).sum(axis=1)
x_squared_norms_csr = row_norms(X_csr, squared=True)
buffer = np.zeros(centers.shape[1], dtype=np.double)
buffer_csr = np.zeros(centers.shape[1], dtype=np.double)
# extract a small minibatch
X_mb = X[:10]
X_mb_csr = X_csr[:10]
x_mb_squared_norms = x_squared_norms[:10]
x_mb_squared_norms_csr = x_squared_norms_csr[:10]
# step 1: compute the dense minibatch update
old_inertia, incremental_diff = _mini_batch_step(
X_mb, x_mb_squared_norms, new_centers, counts,
buffer, 1, None, random_reassign=False)
assert_greater(old_inertia, 0.0)
# compute the new inertia on the same batch to check that it decreased
labels, new_inertia = _labels_inertia(
X_mb, x_mb_squared_norms, new_centers)
assert_greater(new_inertia, 0.0)
assert_less(new_inertia, old_inertia)
# check that the incremental difference computation is matching the
# final observed value
effective_diff = np.sum((new_centers - old_centers) ** 2)
assert_almost_equal(incremental_diff, effective_diff)
# step 2: compute the sparse minibatch update
old_inertia_csr, incremental_diff_csr = _mini_batch_step(
X_mb_csr, x_mb_squared_norms_csr, new_centers_csr, counts_csr,
buffer_csr, 1, None, random_reassign=False)
assert_greater(old_inertia_csr, 0.0)
# compute the new inertia on the same batch to check that it decreased
labels_csr, new_inertia_csr = _labels_inertia(
X_mb_csr, x_mb_squared_norms_csr, new_centers_csr)
assert_greater(new_inertia_csr, 0.0)
assert_less(new_inertia_csr, old_inertia_csr)
# check that the incremental difference computation is matching the
# final observed value
effective_diff = np.sum((new_centers_csr - old_centers) ** 2)
assert_almost_equal(incremental_diff_csr, effective_diff)
# step 3: check that sparse and dense updates lead to the same results
assert_array_equal(labels, labels_csr)
assert_array_almost_equal(new_centers, new_centers_csr)
assert_almost_equal(incremental_diff, incremental_diff_csr)
assert_almost_equal(old_inertia, old_inertia_csr)
assert_almost_equal(new_inertia, new_inertia_csr)
def _check_fitted_model(km):
# check that the number of clusters centers and distinct labels match
# the expectation
centers = km.cluster_centers_
assert_equal(centers.shape, (n_clusters, n_features))
labels = km.labels_
assert_equal(np.unique(labels).shape[0], n_clusters)
# check that the labels assignment are perfect (up to a permutation)
assert_equal(v_measure_score(true_labels, labels), 1.0)
assert_greater(km.inertia_, 0.0)
# check error on dataset being too small
assert_raises(ValueError, km.fit, [[0., 1.]])
def test_k_means_plus_plus_init():
km = KMeans(init="k-means++", n_clusters=n_clusters,
random_state=42).fit(X)
_check_fitted_model(km)
def test_k_means_new_centers():
# Explore the part of the code where a new center is reassigned
X = np.array([[0, 0, 1, 1],
[0, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 1, 0, 0]])
labels = [0, 1, 2, 1, 1, 2]
bad_centers = np.array([[+0, 1, 0, 0],
[.2, 0, .2, .2],
[+0, 0, 0, 0]])
km = KMeans(n_clusters=3, init=bad_centers, n_init=1, max_iter=10,
random_state=1)
for this_X in (X, sp.coo_matrix(X)):
km.fit(this_X)
this_labels = km.labels_
# Reorder the labels so that the first instance is in cluster 0,
# the second in cluster 1, ...
this_labels = np.unique(this_labels, return_index=True)[1][this_labels]
np.testing.assert_array_equal(this_labels, labels)
@if_safe_multiprocessing_with_blas
def test_k_means_plus_plus_init_2_jobs():
if sys.version_info[:2] < (3, 4):
raise SkipTest(
"Possible multi-process bug with some BLAS under Python < 3.4")
km = KMeans(init="k-means++", n_clusters=n_clusters, n_jobs=2,
random_state=42).fit(X)
_check_fitted_model(km)
def test_k_means_precompute_distances_flag():
# check that a warning is raised if the precompute_distances flag is not
# supported
km = KMeans(precompute_distances="wrong")
assert_raises(ValueError, km.fit, X)
def test_k_means_plus_plus_init_sparse():
km = KMeans(init="k-means++", n_clusters=n_clusters, random_state=42)
km.fit(X_csr)
_check_fitted_model(km)
def test_k_means_random_init():
km = KMeans(init="random", n_clusters=n_clusters, random_state=42)
km.fit(X)
_check_fitted_model(km)
def test_k_means_random_init_sparse():
km = KMeans(init="random", n_clusters=n_clusters, random_state=42)
km.fit(X_csr)
_check_fitted_model(km)
def test_k_means_plus_plus_init_not_precomputed():
km = KMeans(init="k-means++", n_clusters=n_clusters, random_state=42,
precompute_distances=False).fit(X)
_check_fitted_model(km)
def test_k_means_random_init_not_precomputed():
km = KMeans(init="random", n_clusters=n_clusters, random_state=42,
precompute_distances=False).fit(X)
_check_fitted_model(km)
def test_k_means_perfect_init():
km = KMeans(init=centers.copy(), n_clusters=n_clusters, random_state=42,
n_init=1)
km.fit(X)
_check_fitted_model(km)
def test_k_means_n_init():
rnd = np.random.RandomState(0)
X = rnd.normal(size=(40, 2))
# two regression tests on bad n_init argument
# previous bug: n_init <= 0 threw non-informative TypeError (#3858)
assert_raises_regex(ValueError, "n_init", KMeans(n_init=0).fit, X)
assert_raises_regex(ValueError, "n_init", KMeans(n_init=-1).fit, X)
def test_k_means_explicit_init_shape():
# test for sensible errors when giving explicit init
# with wrong number of features or clusters
rnd = np.random.RandomState(0)
X = rnd.normal(size=(40, 3))
for Class in [KMeans, MiniBatchKMeans]:
# mismatch of number of features
km = Class(n_init=1, init=X[:, :2], n_clusters=len(X))
msg = "does not match the number of features of the data"
assert_raises_regex(ValueError, msg, km.fit, X)
# for callable init
km = Class(n_init=1, init=lambda X_, k, random_state: X_[:, :2], n_clusters=len(X))
assert_raises_regex(ValueError, msg, km.fit, X)
# mismatch of number of clusters
msg = "does not match the number of clusters"
km = Class(n_init=1, init=X[:2, :], n_clusters=3)
assert_raises_regex(ValueError, msg, km.fit, X)
# for callable init
km = Class(n_init=1, init=lambda X_, k, random_state: X_[:2, :], n_clusters=3)
assert_raises_regex(ValueError, msg, km.fit, X)
def test_k_means_fortran_aligned_data():
# Check the KMeans will work well, even if X is a fortran-aligned data.
X = np.asfortranarray([[0, 0], [0, 1], [0, 1]])
centers = np.array([[0, 0], [0, 1]])
labels = np.array([0, 1, 1])
km = KMeans(n_init=1, init=centers, precompute_distances=False,
random_state=42, n_clusters=2)
km.fit(X)
assert_array_equal(km.cluster_centers_, centers)
assert_array_equal(km.labels_, labels)
def test_mb_k_means_plus_plus_init_dense_array():
mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
random_state=42)
mb_k_means.fit(X)
_check_fitted_model(mb_k_means)
def test_mb_kmeans_verbose():
mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
random_state=42, verbose=1)
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
mb_k_means.fit(X)
finally:
sys.stdout = old_stdout
def test_mb_k_means_plus_plus_init_sparse_matrix():
mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
random_state=42)
mb_k_means.fit(X_csr)
_check_fitted_model(mb_k_means)
def test_minibatch_init_with_large_k():
mb_k_means = MiniBatchKMeans(init='k-means++', init_size=10, n_clusters=20)
# Check that a warning is raised, as the number clusters is larger
# than the init_size
assert_warns(RuntimeWarning, mb_k_means.fit, X)
def test_minibatch_k_means_random_init_dense_array():
# increase n_init to make random init stable enough
mb_k_means = MiniBatchKMeans(init="random", n_clusters=n_clusters,
random_state=42, n_init=10).fit(X)
_check_fitted_model(mb_k_means)
def test_minibatch_k_means_random_init_sparse_csr():
# increase n_init to make random init stable enough
mb_k_means = MiniBatchKMeans(init="random", n_clusters=n_clusters,
random_state=42, n_init=10).fit(X_csr)
_check_fitted_model(mb_k_means)
def test_minibatch_k_means_perfect_init_dense_array():
mb_k_means = MiniBatchKMeans(init=centers.copy(), n_clusters=n_clusters,
Loading ...