from itertools import product
import pytest
import numpy as np
from scipy.sparse import (bsr_matrix, coo_matrix, csc_matrix, csr_matrix,
dok_matrix, lil_matrix, issparse)
from sklearn import metrics
from sklearn import neighbors, datasets
from sklearn.base import clone
from sklearn.exceptions import DataConversionWarning
from sklearn.exceptions import EfficiencyWarning
from sklearn.exceptions import NotFittedError
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import VALID_METRICS_SPARSE, VALID_METRICS
from sklearn.neighbors._base import _is_sorted_by_data, _check_precomputed
from sklearn.pipeline import make_pipeline
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_array_equal
from sklearn.utils._testing import assert_raises
from sklearn.utils._testing import assert_raises_regex
from sklearn.utils._testing import assert_warns
from sklearn.utils._testing import assert_warns_message
from sklearn.utils._testing import assert_raise_message
from sklearn.utils._testing import ignore_warnings
from sklearn.utils.validation import check_random_state
import joblib
rng = np.random.RandomState(0)
# load and shuffle iris dataset
iris = datasets.load_iris()
perm = rng.permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
# load and shuffle digits
digits = datasets.load_digits()
perm = rng.permutation(digits.target.size)
digits.data = digits.data[perm]
digits.target = digits.target[perm]
SPARSE_TYPES = (bsr_matrix, coo_matrix, csc_matrix, csr_matrix, dok_matrix,
lil_matrix)
SPARSE_OR_DENSE = SPARSE_TYPES + (np.asarray,)
ALGORITHMS = ('ball_tree', 'brute', 'kd_tree', 'auto')
P = (1, 2, 3, 4, np.inf)
JOBLIB_BACKENDS = list(joblib.parallel.BACKENDS.keys())
# Filter deprecation warnings.
neighbors.kneighbors_graph = ignore_warnings(neighbors.kneighbors_graph)
neighbors.radius_neighbors_graph = ignore_warnings(
neighbors.radius_neighbors_graph)
def _weight_func(dist):
""" Weight function to replace lambda d: d ** -2.
The lambda function is not valid because:
if d==0 then 0^-2 is not valid. """
# Dist could be multidimensional, flatten it so all values
# can be looped
with np.errstate(divide='ignore'):
retval = 1. / dist
return retval ** 2
def test_unsupervised_kneighbors(n_samples=20, n_features=5,
n_query_pts=2, n_neighbors=5):
# Test unsupervised neighbors methods
X = rng.rand(n_samples, n_features)
test = rng.rand(n_query_pts, n_features)
for p in P:
results_nodist = []
results = []
for algorithm in ALGORITHMS:
neigh = neighbors.NearestNeighbors(n_neighbors=n_neighbors,
algorithm=algorithm,
p=p)
neigh.fit(X)
results_nodist.append(neigh.kneighbors(test,
return_distance=False))
results.append(neigh.kneighbors(test, return_distance=True))
for i in range(len(results) - 1):
assert_array_almost_equal(results_nodist[i], results[i][1])
assert_array_almost_equal(results[i][0], results[i + 1][0])
assert_array_almost_equal(results[i][1], results[i + 1][1])
def test_unsupervised_inputs():
# test the types of valid input into NearestNeighbors
X = rng.random_sample((10, 3))
nbrs_fid = neighbors.NearestNeighbors(n_neighbors=1)
nbrs_fid.fit(X)
dist1, ind1 = nbrs_fid.kneighbors(X)
nbrs = neighbors.NearestNeighbors(n_neighbors=1)
for input in (nbrs_fid, neighbors.BallTree(X), neighbors.KDTree(X)):
nbrs.fit(input)
dist2, ind2 = nbrs.kneighbors(X)
assert_array_almost_equal(dist1, dist2)
assert_array_almost_equal(ind1, ind2)
def test_n_neighbors_datatype():
# Test to check whether n_neighbors is integer
X = [[1, 1], [1, 1], [1, 1]]
expected_msg = "n_neighbors does not take .*float.* " \
"value, enter integer value"
msg = "Expected n_neighbors > 0. Got -3"
neighbors_ = neighbors.NearestNeighbors(n_neighbors=3.)
assert_raises_regex(TypeError, expected_msg, neighbors_.fit, X)
assert_raises_regex(ValueError, msg,
neighbors_.kneighbors, X=X, n_neighbors=-3)
assert_raises_regex(TypeError, expected_msg,
neighbors_.kneighbors, X=X, n_neighbors=3.)
def test_not_fitted_error_gets_raised():
X = [[1]]
neighbors_ = neighbors.NearestNeighbors()
assert_raises(NotFittedError, neighbors_.kneighbors_graph, X)
assert_raises(NotFittedError, neighbors_.radius_neighbors_graph, X)
@ignore_warnings(category=EfficiencyWarning)
def check_precomputed(make_train_test, estimators):
"""Tests unsupervised NearestNeighbors with a distance matrix."""
# Note: smaller samples may result in spurious test success
rng = np.random.RandomState(42)
X = rng.random_sample((10, 4))
Y = rng.random_sample((3, 4))
DXX, DYX = make_train_test(X, Y)
for method in ['kneighbors', ]:
# TODO: also test radius_neighbors, but requires different assertion
# As a feature matrix (n_samples by n_features)
nbrs_X = neighbors.NearestNeighbors(n_neighbors=3)
nbrs_X.fit(X)
dist_X, ind_X = getattr(nbrs_X, method)(Y)
# As a dense distance matrix (n_samples by n_samples)
nbrs_D = neighbors.NearestNeighbors(n_neighbors=3, algorithm='brute',
metric='precomputed')
nbrs_D.fit(DXX)
dist_D, ind_D = getattr(nbrs_D, method)(DYX)
assert_array_almost_equal(dist_X, dist_D)
assert_array_almost_equal(ind_X, ind_D)
# Check auto works too
nbrs_D = neighbors.NearestNeighbors(n_neighbors=3, algorithm='auto',
metric='precomputed')
nbrs_D.fit(DXX)
dist_D, ind_D = getattr(nbrs_D, method)(DYX)
assert_array_almost_equal(dist_X, dist_D)
assert_array_almost_equal(ind_X, ind_D)
# Check X=None in prediction
dist_X, ind_X = getattr(nbrs_X, method)(None)
dist_D, ind_D = getattr(nbrs_D, method)(None)
assert_array_almost_equal(dist_X, dist_D)
assert_array_almost_equal(ind_X, ind_D)
# Must raise a ValueError if the matrix is not of correct shape
assert_raises(ValueError, getattr(nbrs_D, method), X)
target = np.arange(X.shape[0])
for Est in estimators:
est = Est(metric='euclidean')
est.radius = est.n_neighbors = 1
pred_X = est.fit(X, target).predict(Y)
est.metric = 'precomputed'
pred_D = est.fit(DXX, target).predict(DYX)
assert_array_almost_equal(pred_X, pred_D)
def test_precomputed_dense():
def make_train_test(X_train, X_test):
return (metrics.pairwise_distances(X_train),
metrics.pairwise_distances(X_test, X_train))
estimators = [
neighbors.KNeighborsClassifier, neighbors.KNeighborsRegressor,
neighbors.RadiusNeighborsClassifier, neighbors.RadiusNeighborsRegressor
]
check_precomputed(make_train_test, estimators)
@pytest.mark.parametrize('fmt', ['csr', 'lil'])
def test_precomputed_sparse_knn(fmt):
def make_train_test(X_train, X_test):
nn = neighbors.NearestNeighbors(n_neighbors=3 + 1).fit(X_train)
return (nn.kneighbors_graph(X_train, mode='distance').asformat(fmt),
nn.kneighbors_graph(X_test, mode='distance').asformat(fmt))
# We do not test RadiusNeighborsClassifier and RadiusNeighborsRegressor
# since the precomputed neighbors graph is built with k neighbors only.
estimators = [
neighbors.KNeighborsClassifier,
neighbors.KNeighborsRegressor,
]
check_precomputed(make_train_test, estimators)
@pytest.mark.parametrize('fmt', ['csr', 'lil'])
def test_precomputed_sparse_radius(fmt):
def make_train_test(X_train, X_test):
nn = neighbors.NearestNeighbors(radius=1).fit(X_train)
return (nn.radius_neighbors_graph(X_train,
mode='distance').asformat(fmt),
nn.radius_neighbors_graph(X_test,
mode='distance').asformat(fmt))
# We do not test KNeighborsClassifier and KNeighborsRegressor
# since the precomputed neighbors graph is built with a radius.
estimators = [
neighbors.RadiusNeighborsClassifier,
neighbors.RadiusNeighborsRegressor,
]
check_precomputed(make_train_test, estimators)
def test_is_sorted_by_data():
# Test that _is_sorted_by_data works as expected. In CSR sparse matrix,
# entries in each row can be sorted by indices, by data, or unsorted.
# _is_sorted_by_data should return True when entries are sorted by data,
# and False in all other cases.
# Test with sorted 1D array
X = csr_matrix(np.arange(10))
assert _is_sorted_by_data(X)
# Test with unsorted 1D array
X[0, 2] = 5
assert not _is_sorted_by_data(X)
# Test when the data is sorted in each sample, but not necessarily
# between samples
X = csr_matrix([[0, 1, 2], [3, 0, 0], [3, 4, 0], [1, 0, 2]])
assert _is_sorted_by_data(X)
# Test with duplicates entries in X.indptr
data, indices, indptr = [0, 4, 2, 2], [0, 1, 1, 1], [0, 2, 2, 4]
X = csr_matrix((data, indices, indptr), shape=(3, 3))
assert _is_sorted_by_data(X)
@ignore_warnings(category=EfficiencyWarning)
def test_check_precomputed():
# Test that _check_precomputed returns a graph sorted by data
X = csr_matrix(np.abs(np.random.RandomState(42).randn(10, 10)))
assert not _is_sorted_by_data(X)
Xt = _check_precomputed(X)
assert _is_sorted_by_data(Xt)
# est with a different number of nonzero entries for each sample
mask = np.random.RandomState(42).randint(2, size=(10, 10))
X = X.toarray()
X[mask == 1] = 0
X = csr_matrix(X)
assert not _is_sorted_by_data(X)
Xt = _check_precomputed(X)
assert _is_sorted_by_data(Xt)
@ignore_warnings(category=EfficiencyWarning)
def test_precomputed_sparse_invalid():
dist = np.array([[0., 2., 1.], [2., 0., 3.], [1., 3., 0.]])
dist_csr = csr_matrix(dist)
neigh = neighbors.NearestNeighbors(n_neighbors=1, metric="precomputed")
neigh.fit(dist_csr)
neigh.kneighbors(None, n_neighbors=1)
neigh.kneighbors(np.array([[0., 0., 0.]]), n_neighbors=2)
# Ensures enough number of nearest neighbors
dist = np.array([[0., 2., 0.], [2., 0., 3.], [0., 3., 0.]])
dist_csr = csr_matrix(dist)
neigh.fit(dist_csr)
msg = "2 neighbors per samples are required, but some samples have only 1"
assert_raises_regex(ValueError, msg, neigh.kneighbors, None, n_neighbors=1)
# Checks error with inconsistent distance matrix
dist = np.array([[5., 2., 1.], [-2., 0., 3.], [1., 3., 0.]])
dist_csr = csr_matrix(dist)
msg = "Negative values in data passed to precomputed distance matrix."
assert_raises_regex(ValueError, msg, neigh.kneighbors, dist_csr,
n_neighbors=1)
def test_precomputed_cross_validation():
# Ensure array is split correctly
rng = np.random.RandomState(0)
X = rng.rand(20, 2)
D = pairwise_distances(X, metric='euclidean')
y = rng.randint(3, size=20)
for Est in (neighbors.KNeighborsClassifier,
neighbors.RadiusNeighborsClassifier,
neighbors.KNeighborsRegressor,
neighbors.RadiusNeighborsRegressor):
metric_score = cross_val_score(Est(), X, y)
precomp_score = cross_val_score(Est(metric='precomputed'), D, y)
assert_array_equal(metric_score, precomp_score)
def test_unsupervised_radius_neighbors(n_samples=20, n_features=5,
n_query_pts=2, radius=0.5,
random_state=0):
# Test unsupervised radius-based query
rng = np.random.RandomState(random_state)
X = rng.rand(n_samples, n_features)
test = rng.rand(n_query_pts, n_features)
for p in P:
results = []
for algorithm in ALGORITHMS:
neigh = neighbors.NearestNeighbors(radius=radius,
algorithm=algorithm,
p=p)
neigh.fit(X)
ind1 = neigh.radius_neighbors(test, return_distance=False)
# sort the results: this is not done automatically for
# radius searches
dist, ind = neigh.radius_neighbors(test, return_distance=True)
for (d, i, i1) in zip(dist, ind, ind1):
j = d.argsort()
d[:] = d[j]
i[:] = i[j]
i1[:] = i1[j]
results.append((dist, ind))
assert_array_almost_equal(np.concatenate(list(ind)),
np.concatenate(list(ind1)))
for i in range(len(results) - 1):
assert_array_almost_equal(np.concatenate(list(results[i][0])),
np.concatenate(list(results[i + 1][0]))),
assert_array_almost_equal(np.concatenate(list(results[i][1])),
np.concatenate(list(results[i + 1][1])))
def test_kneighbors_classifier(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=5,
random_state=0):
# Test k-neighbors classification
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = ((X ** 2).sum(axis=1) < .5).astype(np.int)
y_str = y.astype(str)
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
knn = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors,
weights=weights,
algorithm=algorithm)
knn.fit(X, y)
epsilon = 1e-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = knn.predict(X[:n_test_pts] + epsilon)
assert_array_equal(y_pred, y[:n_test_pts])
# Test prediction with y_str
knn.fit(X, y_str)
y_pred = knn.predict(X[:n_test_pts] + epsilon)
assert_array_equal(y_pred, y_str[:n_test_pts])
def test_kneighbors_classifier_float_labels(n_samples=40, n_features=5,
n_test_pts=10, n_neighbors=5,
random_state=0):
# Test k-neighbors classification
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = ((X ** 2).sum(axis=1) < .5).astype(np.int)
knn = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors)
knn.fit(X, y.astype(np.float))
epsilon = 1e-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = knn.predict(X[:n_test_pts] + epsilon)
assert_array_equal(y_pred, y[:n_test_pts])
def test_kneighbors_classifier_predict_proba():
# Test KNeighborsClassifier.predict_proba() method
X = np.array([[0, 2, 0],
[0, 2, 1],
[2, 0, 0],
[2, 2, 0],
[0, 0, 2],
[0, 0, 1]])
y = np.array([4, 4, 5, 5, 1, 1])
cls = neighbors.KNeighborsClassifier(n_neighbors=3, p=1) # cityblock dist
cls.fit(X, y)
y_prob = cls.predict_proba(X)
real_prob = np.array([[0, 2. / 3, 1. / 3],
[1. / 3, 2. / 3, 0],
[1. / 3, 0, 2. / 3],
[0, 1. / 3, 2. / 3],
[2. / 3, 1. / 3, 0],
[2. / 3, 1. / 3, 0]])
assert_array_equal(real_prob, y_prob)
# Check that it also works with non integer labels
cls.fit(X, y.astype(str))
y_prob = cls.predict_proba(X)
assert_array_equal(real_prob, y_prob)
# Check that it works with weights='distance'
cls = neighbors.KNeighborsClassifier(
n_neighbors=2, p=1, weights='distance')
cls.fit(X, y)
y_prob = cls.predict_proba(np.array([[0, 2, 0], [2, 2, 2]]))
real_prob = np.array([[0, 1, 0], [0, 0.4, 0.6]])
assert_array_almost_equal(real_prob, y_prob)
def test_radius_neighbors_classifier(n_samples=40,
n_features=5,
n_test_pts=10,
radius=0.5,
random_state=0):
# Test radius-based classification
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = ((X ** 2).sum(axis=1) < .5).astype(np.int)
y_str = y.astype(str)
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
neigh = neighbors.RadiusNeighborsClassifier(radius=radius,
weights=weights,
algorithm=algorithm)
neigh.fit(X, y)
epsilon = 1e-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = neigh.predict(X[:n_test_pts] + epsilon)
assert_array_equal(y_pred, y[:n_test_pts])
neigh.fit(X, y_str)
y_pred = neigh.predict(X[:n_test_pts] + epsilon)
assert_array_equal(y_pred, y_str[:n_test_pts])
def test_radius_neighbors_classifier_when_no_neighbors():
# Test radius-based classifier when no neighbors found.
# In this case it should rise an informative exception
X = np.array([[1.0, 1.0], [2.0, 2.0]])
y = np.array([1, 2])
radius = 0.1
z1 = np.array([[1.01, 1.01], [2.01, 2.01]]) # no outliers
z2 = np.array([[1.01, 1.01], [1.4, 1.4]]) # one outlier
weight_func = _weight_func
for outlier_label in [0, -1, None]:
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
rnc = neighbors.RadiusNeighborsClassifier
clf = rnc(radius=radius, weights=weights, algorithm=algorithm,
outlier_label=outlier_label)
clf.fit(X, y)
assert_array_equal(np.array([1, 2]),
clf.predict(z1))
if outlier_label is None:
assert_raises(ValueError, clf.predict, z2)
def test_radius_neighbors_classifier_outlier_labeling():
# Test radius-based classifier when no neighbors found and outliers
# are labeled.
X = np.array([[1.0, 1.0], [2.0, 2.0], [0.99, 0.99],
[0.98, 0.98], [2.01, 2.01]])
y = np.array([1, 2, 1, 1, 2])
radius = 0.1
z1 = np.array([[1.01, 1.01], [2.01, 2.01]]) # no outliers
z2 = np.array([[1.4, 1.4], [1.01, 1.01], [2.01, 2.01]]) # one outlier
correct_labels1 = np.array([1, 2])
correct_labels2 = np.array([-1, 1, 2])
outlier_proba = np.array([0, 0])
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
clf = neighbors.RadiusNeighborsClassifier(radius=radius,
weights=weights,
algorithm=algorithm,
outlier_label=-1)
clf.fit(X, y)
assert_array_equal(correct_labels1, clf.predict(z1))
assert_array_equal(correct_labels2, clf.predict(z2))
assert_array_equal(outlier_proba, clf.predict_proba(z2)[0])
# test outlier_labeling of using predict_proba()
RNC = neighbors.RadiusNeighborsClassifier
X = np.array([[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]])
y = np.array([0, 2, 2, 1, 1, 1, 3, 3, 3, 3])
# test outlier_label scalar verification
def check_array_exception():
clf = RNC(radius=1, outlier_label=[[5]])
clf.fit(X, y)
assert_raises(TypeError, check_array_exception)
# test invalid outlier_label dtype
def check_dtype_exception():
clf = RNC(radius=1, outlier_label='a')
clf.fit(X, y)
assert_raises(TypeError, check_dtype_exception)
# test most frequent
clf = RNC(radius=1, outlier_label='most_frequent')
clf.fit(X, y)
proba = clf.predict_proba([[1], [15]])
assert_array_equal(proba[1, :], [0, 0, 0, 1])
# test manual label in y
clf = RNC(radius=1, outlier_label=1)
clf.fit(X, y)
proba = clf.predict_proba([[1], [15]])
assert_array_equal(proba[1, :], [0, 1, 0, 0])
pred = clf.predict([[1], [15]])
assert_array_equal(pred, [2, 1])
# test manual label out of y warning
def check_warning():
clf = RNC(radius=1, outlier_label=4)
clf.fit(X, y)
clf.predict_proba([[1], [15]])
assert_warns(UserWarning, check_warning)
# test multi output same outlier label
y_multi = [[0, 1], [2, 1], [2, 2], [1, 2], [1, 2],
[1, 3], [3, 3], [3, 3], [3, 0], [3, 0]]
clf = RNC(radius=1, outlier_label=1)
clf.fit(X, y_multi)
proba = clf.predict_proba([[7], [15]])
assert_array_equal(proba[1][1, :], [0, 1, 0, 0])
pred = clf.predict([[7], [15]])
assert_array_equal(pred[1, :], [1, 1])
# test multi output different outlier label
y_multi = [[0, 0], [2, 2], [2, 2], [1, 1], [1, 1],
[1, 1], [3, 3], [3, 3], [3, 3], [3, 3]]
clf = RNC(radius=1, outlier_label=[0, 1])
clf.fit(X, y_multi)
proba = clf.predict_proba([[7], [15]])
assert_array_equal(proba[0][1, :], [1, 0, 0, 0])
assert_array_equal(proba[1][1, :], [0, 1, 0, 0])
pred = clf.predict([[7], [15]])
assert_array_equal(pred[1, :], [0, 1])
# test inconsistent outlier label list length
def check_exception():
clf = RNC(radius=1, outlier_label=[0, 1, 2])
clf.fit(X, y_multi)
assert_raises(ValueError, check_exception)
def test_radius_neighbors_classifier_zero_distance():
# Test radius-based classifier, when distance to a sample is zero.
X = np.array([[1.0, 1.0], [2.0, 2.0]])
y = np.array([1, 2])
radius = 0.1
z1 = np.array([[1.01, 1.01], [2.0, 2.0]])
correct_labels1 = np.array([1, 2])
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
clf = neighbors.RadiusNeighborsClassifier(radius=radius,
weights=weights,
algorithm=algorithm)
clf.fit(X, y)
assert_array_equal(correct_labels1, clf.predict(z1))
def test_neighbors_regressors_zero_distance():
# Test radius-based regressor, when distance to a sample is zero.
X = np.array([[1.0, 1.0], [1.0, 1.0], [2.0, 2.0], [2.5, 2.5]])
y = np.array([1.0, 1.5, 2.0, 0.0])
radius = 0.2
z = np.array([[1.1, 1.1], [2.0, 2.0]])
rnn_correct_labels = np.array([1.25, 2.0])
knn_correct_unif = np.array([1.25, 1.0])
knn_correct_dist = np.array([1.25, 2.0])
for algorithm in ALGORITHMS:
# we don't test for weights=_weight_func since user will be expected
# to handle zero distances themselves in the function.
for weights in ['uniform', 'distance']:
rnn = neighbors.RadiusNeighborsRegressor(radius=radius,
weights=weights,
algorithm=algorithm)
rnn.fit(X, y)
assert_array_almost_equal(rnn_correct_labels, rnn.predict(z))
for weights, corr_labels in zip(['uniform', 'distance'],
[knn_correct_unif, knn_correct_dist]):
knn = neighbors.KNeighborsRegressor(n_neighbors=2,
weights=weights,
algorithm=algorithm)
knn.fit(X, y)
assert_array_almost_equal(corr_labels, knn.predict(z))
def test_radius_neighbors_boundary_handling():
"""Test whether points lying on boundary are handled consistently
Also ensures that even with only one query point, an object array
is returned rather than a 2d array.
"""
X = np.array([[1.5], [3.0], [3.01]])
radius = 3.0
for algorithm in ALGORITHMS:
nbrs = neighbors.NearestNeighbors(radius=radius,
algorithm=algorithm).fit(X)
results = nbrs.radius_neighbors([[0.0]], return_distance=False)
assert results.shape == (1,)
assert results.dtype == object
assert_array_equal(results[0], [0, 1])
def test_RadiusNeighborsClassifier_multioutput():
# Test k-NN classifier on multioutput data
rng = check_random_state(0)
n_features = 2
n_samples = 40
n_output = 3
X = rng.rand(n_samples, n_features)
y = rng.randint(0, 3, (n_samples, n_output))
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
weights = [None, 'uniform', 'distance', _weight_func]
for algorithm, weights in product(ALGORITHMS, weights):
# Stack single output prediction
y_pred_so = []
for o in range(n_output):
rnn = neighbors.RadiusNeighborsClassifier(weights=weights,
algorithm=algorithm)
rnn.fit(X_train, y_train[:, o])
y_pred_so.append(rnn.predict(X_test))
y_pred_so = np.vstack(y_pred_so).T
assert y_pred_so.shape == y_test.shape
# Multioutput prediction
rnn_mo = neighbors.RadiusNeighborsClassifier(weights=weights,
algorithm=algorithm)
rnn_mo.fit(X_train, y_train)
y_pred_mo = rnn_mo.predict(X_test)
assert y_pred_mo.shape == y_test.shape
assert_array_almost_equal(y_pred_mo, y_pred_so)
def test_kneighbors_classifier_sparse(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=5,
random_state=0):
# Test k-NN classifier on sparse matrices
# Like the above, but with various types of sparse matrices
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
X *= X > .2
y = ((X ** 2).sum(axis=1) < .5).astype(np.int)
for sparsemat in SPARSE_TYPES:
knn = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors,
algorithm='auto')
knn.fit(sparsemat(X), y)
epsilon = 1e-5 * (2 * rng.rand(1, n_features) - 1)
for sparsev in SPARSE_TYPES + (np.asarray,):
X_eps = sparsev(X[:n_test_pts] + epsilon)
y_pred = knn.predict(X_eps)
assert_array_equal(y_pred, y[:n_test_pts])
def test_KNeighborsClassifier_multioutput():
# Test k-NN classifier on multioutput data
rng = check_random_state(0)
n_features = 5
n_samples = 50
n_output = 3
X = rng.rand(n_samples, n_features)
y = rng.randint(0, 3, (n_samples, n_output))
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
weights = [None, 'uniform', 'distance', _weight_func]
for algorithm, weights in product(ALGORITHMS, weights):
# Stack single output prediction
y_pred_so = []
y_pred_proba_so = []
for o in range(n_output):
knn = neighbors.KNeighborsClassifier(weights=weights,
algorithm=algorithm)
knn.fit(X_train, y_train[:, o])
y_pred_so.append(knn.predict(X_test))
y_pred_proba_so.append(knn.predict_proba(X_test))
y_pred_so = np.vstack(y_pred_so).T
assert y_pred_so.shape == y_test.shape
assert len(y_pred_proba_so) == n_output
# Multioutput prediction
knn_mo = neighbors.KNeighborsClassifier(weights=weights,
algorithm=algorithm)
knn_mo.fit(X_train, y_train)
y_pred_mo = knn_mo.predict(X_test)
assert y_pred_mo.shape == y_test.shape
assert_array_almost_equal(y_pred_mo, y_pred_so)
# Check proba
y_pred_proba_mo = knn_mo.predict_proba(X_test)
assert len(y_pred_proba_mo) == n_output
for proba_mo, proba_so in zip(y_pred_proba_mo, y_pred_proba_so):
assert_array_almost_equal(proba_mo, proba_so)
def test_kneighbors_regressor(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=3,
random_state=0):
# Test k-neighbors regression
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = np.sqrt((X ** 2).sum(1))
y /= y.max()
y_target = y[:n_test_pts]
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
knn = neighbors.KNeighborsRegressor(n_neighbors=n_neighbors,
weights=weights,
algorithm=algorithm)
knn.fit(X, y)
epsilon = 1E-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = knn.predict(X[:n_test_pts] + epsilon)
assert np.all(abs(y_pred - y_target) < 0.3)
def test_KNeighborsRegressor_multioutput_uniform_weight():
# Test k-neighbors in multi-output regression with uniform weight
rng = check_random_state(0)
n_features = 5
n_samples = 40
n_output = 4
X = rng.rand(n_samples, n_features)
y = rng.rand(n_samples, n_output)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
for algorithm, weights in product(ALGORITHMS, [None, 'uniform']):
knn = neighbors.KNeighborsRegressor(weights=weights,
algorithm=algorithm)
knn.fit(X_train, y_train)
neigh_idx = knn.kneighbors(X_test, return_distance=False)
y_pred_idx = np.array([np.mean(y_train[idx], axis=0)
for idx in neigh_idx])
y_pred = knn.predict(X_test)
assert y_pred.shape == y_test.shape
assert y_pred_idx.shape == y_test.shape
assert_array_almost_equal(y_pred, y_pred_idx)
def test_kneighbors_regressor_multioutput(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=3,
random_state=0):
# Test k-neighbors in multi-output regression
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = np.sqrt((X ** 2).sum(1))
y /= y.max()
y = np.vstack([y, y]).T
y_target = y[:n_test_pts]
weights = ['uniform', 'distance', _weight_func]
for algorithm, weights in product(ALGORITHMS, weights):
knn = neighbors.KNeighborsRegressor(n_neighbors=n_neighbors,
weights=weights,
algorithm=algorithm)
knn.fit(X, y)
epsilon = 1E-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = knn.predict(X[:n_test_pts] + epsilon)
assert y_pred.shape == y_target.shape
assert np.all(np.abs(y_pred - y_target) < 0.3)
def test_radius_neighbors_regressor(n_samples=40,
n_features=3,
n_test_pts=10,
radius=0.5,
random_state=0):
# Test radius-based neighbors regression
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = np.sqrt((X ** 2).sum(1))
y /= y.max()
y_target = y[:n_test_pts]
weight_func = _weight_func
for algorithm in ALGORITHMS:
for weights in ['uniform', 'distance', weight_func]:
neigh = neighbors.RadiusNeighborsRegressor(radius=radius,
weights=weights,
algorithm=algorithm)
neigh.fit(X, y)
epsilon = 1E-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = neigh.predict(X[:n_test_pts] + epsilon)
assert np.all(abs(y_pred - y_target) < radius / 2)
# test that nan is returned when no nearby observations
for weights in ['uniform', 'distance']:
neigh = neighbors.RadiusNeighborsRegressor(radius=radius,
weights=weights,
algorithm='auto')
neigh.fit(X, y)
X_test_nan = np.full((1, n_features), -1.)
empty_warning_msg = ("One or more samples have no neighbors "
"within specified radius; predicting NaN.")
pred = assert_warns_message(UserWarning,
empty_warning_msg,
neigh.predict,
X_test_nan)
assert np.all(np.isnan(pred))
def test_RadiusNeighborsRegressor_multioutput_with_uniform_weight():
# Test radius neighbors in multi-output regression (uniform weight)
rng = check_random_state(0)
n_features = 5
n_samples = 40
n_output = 4
X = rng.rand(n_samples, n_features)
y = rng.rand(n_samples, n_output)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
for algorithm, weights in product(ALGORITHMS, [None, 'uniform']):
rnn = neighbors. RadiusNeighborsRegressor(weights=weights,
algorithm=algorithm)
rnn.fit(X_train, y_train)
neigh_idx = rnn.radius_neighbors(X_test, return_distance=False)
y_pred_idx = np.array([np.mean(y_train[idx], axis=0)
for idx in neigh_idx])
y_pred_idx = np.array(y_pred_idx)
y_pred = rnn.predict(X_test)
assert y_pred_idx.shape == y_test.shape
assert y_pred.shape == y_test.shape
assert_array_almost_equal(y_pred, y_pred_idx)
def test_RadiusNeighborsRegressor_multioutput(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=3,
random_state=0):
# Test k-neighbors in multi-output regression with various weight
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = np.sqrt((X ** 2).sum(1))
y /= y.max()
y = np.vstack([y, y]).T
y_target = y[:n_test_pts]
weights = ['uniform', 'distance', _weight_func]
for algorithm, weights in product(ALGORITHMS, weights):
rnn = neighbors.RadiusNeighborsRegressor(n_neighbors=n_neighbors,
weights=weights,
algorithm=algorithm)
rnn.fit(X, y)
epsilon = 1E-5 * (2 * rng.rand(1, n_features) - 1)
y_pred = rnn.predict(X[:n_test_pts] + epsilon)
assert y_pred.shape == y_target.shape
assert np.all(np.abs(y_pred - y_target) < 0.3)
@ignore_warnings(category=EfficiencyWarning)
def test_kneighbors_regressor_sparse(n_samples=40,
n_features=5,
n_test_pts=10,
n_neighbors=5,
random_state=0):
# Test radius-based regression on sparse matrices
# Like the above, but with various types of sparse matrices
rng = np.random.RandomState(random_state)
X = 2 * rng.rand(n_samples, n_features) - 1
y = ((X ** 2).sum(axis=1) < .25).astype(np.int)
for sparsemat in SPARSE_TYPES:
knn = neighbors.KNeighborsRegressor(n_neighbors=n_neighbors,
algorithm='auto')
knn.fit(sparsemat(X), y)
knn_pre = neighbors.KNeighborsRegressor(n_neighbors=n_neighbors,
metric='precomputed')
knn_pre.fit(pairwise_distances(X, metric='euclidean'), y)
for sparsev in SPARSE_OR_DENSE:
X2 = sparsev(X)
assert np.mean(knn.predict(X2).round() == y) > 0.95
X2_pre = sparsev(pairwise_distances(X, metric='euclidean'))
assert np.mean(knn_pre.predict(X2_pre).round() == y) > 0.95
def test_neighbors_iris():
# Sanity checks on the iris dataset
# Puts three points of each label in the plane and performs a
# nearest neighbor query on points near the decision boundary.
for algorithm in ALGORITHMS:
clf = neighbors.KNeighborsClassifier(n_neighbors=1,
algorithm=algorithm)
clf.fit(iris.data, iris.target)
assert_array_equal(clf.predict(iris.data), iris.target)
clf.set_params(n_neighbors=9, algorithm=algorithm)
clf.fit(iris.data, iris.target)
assert np.mean(clf.predict(iris.data) == iris.target) > 0.95
rgs = neighbors.KNeighborsRegressor(n_neighbors=5, algorithm=algorithm)
rgs.fit(iris.data, iris.target)
assert (np.mean(rgs.predict(iris.data).round() == iris.target) >
0.95)
def test_neighbors_digits():
# Sanity check on the digits dataset
# the 'brute' algorithm has been observed to fail if the input
# dtype is uint8 due to overflow in distance calculations.
X = digits.data.astype('uint8')
Y = digits.target
(n_samples, n_features) = X.shape
train_test_boundary = int(n_samples * 0.8)
train = np.arange(0, train_test_boundary)
test = np.arange(train_test_boundary, n_samples)
(X_train, Y_train, X_test, Y_test) = X[train], Y[train], X[test], Y[test]
clf = neighbors.KNeighborsClassifier(n_neighbors=1, algorithm='brute')
score_uint8 = clf.fit(X_train, Y_train).score(X_test, Y_test)
score_float = clf.fit(X_train.astype(float, copy=False), Y_train).score(
X_test.astype(float, copy=False), Y_test)
assert score_uint8 == score_float
def test_kneighbors_graph():
# Test kneighbors_graph to build the k-Nearest Neighbor graph.
X = np.array([[0, 1], [1.01, 1.], [2, 0]])
# n_neighbors = 1
A = neighbors.kneighbors_graph(X, 1, mode='connectivity',
include_self=True)
assert_array_equal(A.toarray(), np.eye(A.shape[0]))
A = neighbors.kneighbors_graph(X, 1, mode='distance')
assert_array_almost_equal(
A.toarray(),
[[0.00, 1.01, 0.],
[1.01, 0., 0.],
[0.00, 1.40716026, 0.]])
# n_neighbors = 2
A = neighbors.kneighbors_graph(X, 2, mode='connectivity',
include_self=True)
assert_array_equal(
A.toarray(),
[[1., 1., 0.],
[1., 1., 0.],
[0., 1., 1.]])
A = neighbors.kneighbors_graph(X, 2, mode='distance')
assert_array_almost_equal(
A.toarray(),
[[0., 1.01, 2.23606798],
[1.01, 0., 1.40716026],
[2.23606798, 1.40716026, 0.]])
# n_neighbors = 3
A = neighbors.kneighbors_graph(X, 3, mode='connectivity',
include_self=True)
assert_array_almost_equal(
A.toarray(),
[[1, 1, 1], [1, 1, 1], [1, 1, 1]])
def test_kneighbors_graph_sparse(seed=36):
# Test kneighbors_graph to build the k-Nearest Neighbor graph
# for sparse input.
rng = np.random.RandomState(seed)
X = rng.randn(10, 10)
Xcsr = csr_matrix(X)
for n_neighbors in [1, 2, 3]:
for mode in ["connectivity", "distance"]:
assert_array_almost_equal(
neighbors.kneighbors_graph(X,
n_neighbors,
mode=mode).toarray(),
neighbors.kneighbors_graph(Xcsr,
n_neighbors,
mode=mode).toarray())
def test_radius_neighbors_graph():
# Test radius_neighbors_graph to build the Nearest Neighbor graph.
X = np.array([[0, 1], [1.01, 1.], [2, 0]])
A = neighbors.radius_neighbors_graph(X, 1.5, mode='connectivity',
include_self=True)
assert_array_equal(
A.toarray(),
[[1., 1., 0.],
[1., 1., 1.],
[0., 1., 1.]])
A = neighbors.radius_neighbors_graph(X, 1.5, mode='distance')
assert_array_almost_equal(
A.toarray(),
[[0., 1.01, 0.],
[1.01, 0., 1.40716026],
[0., 1.40716026, 0.]])
def test_radius_neighbors_graph_sparse(seed=36):
# Test radius_neighbors_graph to build the Nearest Neighbor graph
# for sparse input.
rng = np.random.RandomState(seed)
X = rng.randn(10, 10)
Xcsr = csr_matrix(X)
for n_neighbors in [1, 2, 3]:
for mode in ["connectivity", "distance"]:
assert_array_almost_equal(
neighbors.radius_neighbors_graph(X,
n_neighbors,
mode=mode).toarray(),
neighbors.radius_neighbors_graph(Xcsr,
n_neighbors,
mode=mode).toarray())
def test_neighbors_badargs():
# Test bad argument values: these should all raise ValueErrors
assert_raises(ValueError,
neighbors.NearestNeighbors,
algorithm='blah')
X = rng.random_sample((10, 2))
Xsparse = csr_matrix(X)
X3 = rng.random_sample((10, 3))
y = np.ones(10)
for cls in (neighbors.KNeighborsClassifier,
neighbors.RadiusNeighborsClassifier,
neighbors.KNeighborsRegressor,
neighbors.RadiusNeighborsRegressor):
assert_raises(ValueError,
cls,
weights='blah')
assert_raises(ValueError,
cls, p=-1)
assert_raises(ValueError,
cls, algorithm='blah')
nbrs = cls(algorithm='ball_tree', metric='haversine')
assert_raises(ValueError,
nbrs.predict,
X)
assert_raises(ValueError,
ignore_warnings(nbrs.fit),
Xsparse, y)
nbrs = cls(metric='haversine', algorithm='brute')
nbrs.fit(X3, y)
assert_raise_message(ValueError,
"Haversine distance only valid in 2 dimensions",
nbrs.predict,
X3)
nbrs = cls()
assert_raises(ValueError,
nbrs.fit,
np.ones((0, 2)), np.ones(0))
assert_raises(ValueError,
nbrs.fit,
X[:, :, None], y)
nbrs.fit(X, y)
assert_raises(ValueError,
nbrs.predict,
[[]])
if (issubclass(cls, neighbors.KNeighborsClassifier) or
issubclass(cls, neighbors.KNeighborsRegressor)):
nbrs = cls(n_neighbors=-1)
assert_raises(ValueError, nbrs.fit, X, y)
nbrs = neighbors.NearestNeighbors().fit(X)
assert_raises(ValueError, nbrs.kneighbors_graph, X, mode='blah')
assert_raises(ValueError, nbrs.radius_neighbors_graph, X, mode='blah')
def test_neighbors_metrics(n_samples=20, n_features=3,
n_query_pts=2, n_neighbors=5):
# Test computing the neighbors for various metrics
# create a symmetric matrix
V = rng.rand(n_features, n_features)
VI = np.dot(V, V.T)
metrics = [('euclidean', {}),
('manhattan', {}),
('minkowski', dict(p=1)),
('minkowski', dict(p=2)),
('minkowski', dict(p=3)),
('minkowski', dict(p=np.inf)),
('chebyshev', {}),
('seuclidean', dict(V=rng.rand(n_features))),
('wminkowski', dict(p=3, w=rng.rand(n_features))),
('mahalanobis', dict(VI=VI)),
('haversine', {})]
algorithms = ['brute', 'ball_tree', 'kd_tree']
X = rng.rand(n_samples, n_features)
test = rng.rand(n_query_pts, n_features)
for metric, metric_params in metrics:
results = {}
p = metric_params.pop('p', 2)
for algorithm in algorithms:
# KD tree doesn't support all metrics
if (algorithm == 'kd_tree' and
metric not in neighbors.KDTree.valid_metrics):
assert_raises(ValueError,
neighbors.NearestNeighbors,
algorithm=algorithm,
metric=metric, metric_params=metric_params)
continue
neigh = neighbors.NearestNeighbors(n_neighbors=n_neighbors,
algorithm=algorithm,
metric=metric, p=p,
metric_params=metric_params)
# Haversine distance only accepts 2D data
feature_sl = (slice(None, 2)
if metric == 'haversine' else slice(None))
neigh.fit(X[:, feature_sl])
results[algorithm] = neigh.kneighbors(test[:, feature_sl],
return_distance=True)
assert_array_almost_equal(results['brute'][0], results['ball_tree'][0])
assert_array_almost_equal(results['brute'][1], results['ball_tree'][1])
if 'kd_tree' in results:
assert_array_almost_equal(results['brute'][0],
results['kd_tree'][0])
assert_array_almost_equal(results['brute'][1],
results['kd_tree'][1])
def test_callable_metric():
def custom_metric(x1, x2):
return np.sqrt(np.sum(x1 ** 2 + x2 ** 2))
X = np.random.RandomState(42).rand(20, 2)
nbrs1 = neighbors.NearestNeighbors(3, algorithm='auto',
metric=custom_metric)
nbrs2 = neighbors.NearestNeighbors(3, algorithm='brute',
metric=custom_metric)
nbrs1.fit(X)
nbrs2.fit(X)
dist1, ind1 = nbrs1.kneighbors(X)
dist2, ind2 = nbrs2.kneighbors(X)
assert_array_almost_equal(dist1, dist2)
def test_valid_brute_metric_for_auto_algorithm():
X = rng.rand(12, 12)
Xcsr = csr_matrix(X)
# check that there is a metric that is valid for brute
# but not ball_tree (so we actually test something)
assert "cosine" in VALID_METRICS['brute']
assert "cosine" not in VALID_METRICS['ball_tree']
# Metric which don't required any additional parameter
require_params = ['mahalanobis', 'wminkowski', 'seuclidean']
for metric in VALID_METRICS['brute']:
if metric != 'precomputed' and metric not in require_params:
nn = neighbors.NearestNeighbors(n_neighbors=3,
algorithm='auto',
metric=metric)
if metric != 'haversine':
nn.fit(X)
nn.kneighbors(X)
else:
nn.fit(X[:, :2])
nn.kneighbors(X[:, :2])
elif metric == 'precomputed':
X_precomputed = rng.random_sample((10, 4))
Y_precomputed = rng.random_sample((3, 4))
DXX = metrics.pairwise_distances(X_precomputed, metric='euclidean')
DYX = metrics.pairwise_distances(Y_precomputed, X_precomputed,
metric='euclidean')
nb_p = neighbors.NearestNeighbors(n_neighbors=3)
nb_p.fit(DXX)
nb_p.kneighbors(DYX)
for metric in VALID_METRICS_SPARSE['brute']:
if metric != 'precomputed' and metric not in require_params:
nn = neighbors.NearestNeighbors(n_neighbors=3, algorithm='auto',
metric=metric).fit(Xcsr)
nn.kneighbors(Xcsr)
# Metric with parameter
VI = np.dot(X, X.T)
list_metrics = [('seuclidean', dict(V=rng.rand(12))),
('wminkowski', dict(w=rng.rand(12))),
('mahalanobis', dict(VI=VI))]
for metric, params in list_metrics:
nn = neighbors.NearestNeighbors(n_neighbors=3, algorithm='auto',
metric=metric,
metric_params=params).fit(X)
nn.kneighbors(X)
def test_metric_params_interface():
assert_warns(SyntaxWarning, neighbors.KNeighborsClassifier,
metric_params={'p': 3})
def test_predict_sparse_ball_kd_tree():
rng = np.random.RandomState(0)
X = rng.rand(5, 5)
y = rng.randint(0, 2, 5)
nbrs1 = neighbors.KNeighborsClassifier(1, algorithm='kd_tree')
nbrs2 = neighbors.KNeighborsRegressor(1, algorithm='ball_tree')
for model in [nbrs1, nbrs2]:
model.fit(X, y)
assert_raises(ValueError, model.predict, csr_matrix(X))
def test_non_euclidean_kneighbors():
rng = np.random.RandomState(0)
X = rng.rand(5, 5)
# Find a reasonable radius.
dist_array = pairwise_distances(X).flatten()
np.sort(dist_array)
radius = dist_array[15]
# Test kneighbors_graph
for metric in ['manhattan', 'chebyshev']:
nbrs_graph = neighbors.kneighbors_graph(
X, 3, metric=metric, mode='connectivity',
include_self=True).toarray()
nbrs1 = neighbors.NearestNeighbors(3, metric=metric).fit(X)
assert_array_equal(nbrs_graph, nbrs1.kneighbors_graph(X).toarray())
# Test radiusneighbors_graph
for metric in ['manhattan', 'chebyshev']:
nbrs_graph = neighbors.radius_neighbors_graph(
X, radius, metric=metric, mode='connectivity',
include_self=True).toarray()
nbrs1 = neighbors.NearestNeighbors(metric=metric, radius=radius).fit(X)
assert_array_equal(nbrs_graph, nbrs1.radius_neighbors_graph(X).A)
# Raise error when wrong parameters are supplied,
X_nbrs = neighbors.NearestNeighbors(3, metric='manhattan')
X_nbrs.fit(X)
assert_raises(ValueError, neighbors.kneighbors_graph, X_nbrs, 3,
metric='euclidean')
X_nbrs = neighbors.NearestNeighbors(radius=radius, metric='manhattan')
X_nbrs.fit(X)
assert_raises(ValueError, neighbors.radius_neighbors_graph, X_nbrs,
radius, metric='euclidean')
def check_object_arrays(nparray, list_check):
for ind, ele in enumerate(nparray):
assert_array_equal(ele, list_check[ind])
def test_k_and_radius_neighbors_train_is_not_query():
# Test kneighbors et.al when query is not training data
for algorithm in ALGORITHMS:
nn = neighbors.NearestNeighbors(n_neighbors=1, algorithm=algorithm)
X = [[0], [1]]
nn.fit(X)
test_data = [[2], [1]]
# Test neighbors.
dist, ind = nn.kneighbors(test_data)
assert_array_equal(dist, [[1], [0]])
assert_array_equal(ind, [[1], [1]])
dist, ind = nn.radius_neighbors([[2], [1]], radius=1.5)
check_object_arrays(dist, [[1], [1, 0]])
check_object_arrays(ind, [[1], [0, 1]])
# Test the graph variants.
assert_array_equal(
nn.kneighbors_graph(test_data).A, [[0., 1.], [0., 1.]])
assert_array_equal(
nn.kneighbors_graph([[2], [1]], mode='distance').A,
np.array([[0., 1.], [0., 0.]]))
rng = nn.radius_neighbors_graph([[2], [1]], radius=1.5)
assert_array_equal(rng.A, [[0, 1], [1, 1]])
def test_k_and_radius_neighbors_X_None():
# Test kneighbors et.al when query is None
for algorithm in ALGORITHMS:
nn = neighbors.NearestNeighbors(n_neighbors=1, algorithm=algorithm)
X = [[0], [1]]
nn.fit(X)
dist, ind = nn.kneighbors()
assert_array_equal(dist, [[1], [1]])
assert_array_equal(ind, [[1], [0]])
dist, ind = nn.radius_neighbors(None, radius=1.5)
check_object_arrays(dist, [[1], [1]])
check_object_arrays(ind, [[1], [0]])
# Test the graph variants.
rng = nn.radius_neighbors_graph(None, radius=1.5)
kng = nn.kneighbors_graph(None)
for graph in [rng, kng]:
assert_array_equal(graph.A, [[0, 1], [1, 0]])
assert_array_equal(graph.data, [1, 1])
assert_array_equal(graph.indices, [1, 0])
X = [[0, 1], [0, 1], [1, 1]]
nn = neighbors.NearestNeighbors(n_neighbors=2, algorithm=algorithm)
nn.fit(X)
assert_array_equal(
nn.kneighbors_graph().A,
np.array([[0., 1., 1.], [1., 0., 1.], [1., 1., 0]]))
def test_k_and_radius_neighbors_duplicates():
# Test behavior of kneighbors when duplicates are present in query
for algorithm in ALGORITHMS:
nn = neighbors.NearestNeighbors(n_neighbors=1, algorithm=algorithm)
nn.fit([[0], [1]])
# Do not do anything special to duplicates.
kng = nn.kneighbors_graph([[0], [1]], mode='distance')
assert_array_equal(
kng.A,
np.array([[0., 0.], [0., 0.]]))
assert_array_equal(kng.data, [0., 0.])
assert_array_equal(kng.indices, [0, 1])
dist, ind = nn.radius_neighbors([[0], [1]], radius=1.5)
check_object_arrays(dist, [[0, 1], [1, 0]])
check_object_arrays(ind, [[0, 1], [0, 1]])
rng = nn.radius_neighbors_graph([[0], [1]], radius=1.5)
assert_array_equal(rng.A, np.ones((2, 2)))
rng = nn.radius_neighbors_graph([[0], [1]], radius=1.5,
mode='distance')
rng.sort_indices()
assert_array_equal(rng.A, [[0, 1], [1, 0]])
assert_array_equal(rng.indices, [0, 1, 0, 1])
assert_array_equal(rng.data, [0, 1, 1, 0])
# Mask the first duplicates when n_duplicates > n_neighbors.
X = np.ones((3, 1))
nn = neighbors.NearestNeighbors(n_neighbors=1)
nn.fit(X)
dist, ind = nn.kneighbors()
assert_array_equal(dist, np.zeros((3, 1)))
assert_array_equal(ind, [[1], [0], [1]])
# Test that zeros are explicitly marked in kneighbors_graph.
kng = nn.kneighbors_graph(mode='distance')
assert_array_equal(
kng.A, np.zeros((3, 3)))
assert_array_equal(kng.data, np.zeros(3))
assert_array_equal(kng.indices, [1., 0., 1.])
assert_array_equal(
nn.kneighbors_graph().A,
np.array([[0., 1., 0.], [1., 0., 0.], [0., 1., 0.]]))
def test_include_self_neighbors_graph():
# Test include_self parameter in neighbors_graph
X = [[2, 3], [4, 5]]
kng = neighbors.kneighbors_graph(X, 1, include_self=True).A
kng_not_self = neighbors.kneighbors_graph(X, 1, include_self=False).A
assert_array_equal(kng, [[1., 0.], [0., 1.]])
assert_array_equal(kng_not_self, [[0., 1.], [1., 0.]])
rng = neighbors.radius_neighbors_graph(X, 5.0, include_self=True).A
rng_not_self = neighbors.radius_neighbors_graph(
X, 5.0, include_self=False).A
assert_array_equal(rng, [[1., 1.], [1., 1.]])
assert_array_equal(rng_not_self, [[0., 1.], [1., 0.]])
@pytest.mark.parametrize('algorithm', ALGORITHMS)
def test_same_knn_parallel(algorithm):
X, y = datasets.make_classification(n_samples=30, n_features=5,
n_redundant=0, random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y)
clf = neighbors.KNeighborsClassifier(n_neighbors=3,
algorithm=algorithm)
clf.fit(X_train, y_train)
y = clf.predict(X_test)
dist, ind = clf.kneighbors(X_test)
graph = clf.kneighbors_graph(X_test, mode='distance').toarray()
clf.set_params(n_jobs=3)
clf.fit(X_train, y_train)
y_parallel = clf.predict(X_test)
dist_parallel, ind_parallel = clf.kneighbors(X_test)
graph_parallel = \
clf.kneighbors_graph(X_test, mode='distance').toarray()
assert_array_equal(y, y_parallel)
assert_array_almost_equal(dist, dist_parallel)
assert_array_equal(ind, ind_parallel)
assert_array_almost_equal(graph, graph_parallel)
@pytest.mark.parametrize('algorithm', ALGORITHMS)
def test_same_radius_neighbors_parallel(algorithm):
X, y = datasets.make_classification(n_samples=30, n_features=5,
n_redundant=0, random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y)
clf = neighbors.RadiusNeighborsClassifier(radius=10,
algorithm=algorithm)
clf.fit(X_train, y_train)
y = clf.predict(X_test)
dist, ind = clf.radius_neighbors(X_test)
graph = clf.radius_neighbors_graph(X_test, mode='distance').toarray()
clf.set_params(n_jobs=3)
clf.fit(X_train, y_train)
y_parallel = clf.predict(X_test)
dist_parallel, ind_parallel = clf.radius_neighbors(X_test)
graph_parallel = \
clf.radius_neighbors_graph(X_test, mode='distance').toarray()
assert_array_equal(y, y_parallel)
for i in range(len(dist)):
assert_array_almost_equal(dist[i], dist_parallel[i])
assert_array_equal(ind[i], ind_parallel[i])
assert_array_almost_equal(graph, graph_parallel)
@pytest.mark.parametrize('backend', JOBLIB_BACKENDS)
@pytest.mark.parametrize('algorithm', ALGORITHMS)
def test_knn_forcing_backend(backend, algorithm):
# Non-regression test which ensure the knn methods are properly working
# even when forcing the global joblib backend.
with joblib.parallel_backend(backend):
X, y = datasets.make_classification(n_samples=30, n_features=5,
n_redundant=0, random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y)
clf = neighbors.KNeighborsClassifier(n_neighbors=3,
algorithm=algorithm,
n_jobs=3)
clf.fit(X_train, y_train)
clf.predict(X_test)
clf.kneighbors(X_test)
clf.kneighbors_graph(X_test, mode='distance').toarray()
def test_dtype_convert():
classifier = neighbors.KNeighborsClassifier(n_neighbors=1)
CLASSES = 15
X = np.eye(CLASSES)
y = [ch for ch in 'ABCDEFGHIJKLMNOPQRSTU'[:CLASSES]]
result = classifier.fit(X, y).predict(X)
assert_array_equal(result, y)
def test_sparse_metric_callable():
def sparse_metric(x, y): # Metric accepting sparse matrix input (only)
assert issparse(x) and issparse(y)
return x.dot(y.T).A.item()
X = csr_matrix([ # Population matrix
[1, 1, 1, 1, 1],
[1, 0, 1, 0, 1],
[0, 0, 1, 0, 0]
])
Y = csr_matrix([ # Query matrix
[1, 1, 0, 1, 1],
[1, 0, 0, 0, 1]
])
nn = neighbors.NearestNeighbors(algorithm='brute', n_neighbors=2,
metric=sparse_metric).fit(X)
N = nn.kneighbors(Y, return_distance=False)
# GS indices of nearest neighbours in `X` for `sparse_metric`
gold_standard_nn = np.array([
[2, 1],
[2, 1]
])
assert_array_equal(N, gold_standard_nn)
# ignore conversion to boolean in pairwise_distances
@ignore_warnings(category=DataConversionWarning)
def test_pairwise_boolean_distance():
# Non-regression test for #4523
# 'brute': uses scipy.spatial.distance through pairwise_distances
# 'ball_tree': uses sklearn.neighbors._dist_metrics
rng = np.random.RandomState(0)
X = rng.uniform(size=(6, 5))
NN = neighbors.NearestNeighbors
nn1 = NN(metric="jaccard", algorithm='brute').fit(X)
nn2 = NN(metric="jaccard", algorithm='ball_tree').fit(X)
assert_array_equal(nn1.kneighbors(X)[0], nn2.kneighbors(X)[0])
def test_radius_neighbors_predict_proba():
for seed in range(5):
X, y = datasets.make_classification(n_samples=50, n_features=5,
n_informative=3, n_redundant=0,
n_classes=3, random_state=seed)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, random_state=0)
outlier_label = int(2 - seed)
clf = neighbors.RadiusNeighborsClassifier(radius=2,
outlier_label=outlier_label)
clf.fit(X_tr, y_tr)
pred = clf.predict(X_te)
proba = clf.predict_proba(X_te)
proba_label = proba.argmax(axis=1)
proba_label = np.where(proba.sum(axis=1) == 0,
outlier_label, proba_label)
assert_array_equal(pred, proba_label)
def test_pipeline_with_nearest_neighbors_transformer():
# Test chaining KNeighborsTransformer and classifiers/regressors
rng = np.random.RandomState(0)
X = 2 * rng.rand(40, 5) - 1
X2 = 2 * rng.rand(40, 5) - 1
y = rng.rand(40, 1)
n_neighbors = 12
radius = 1.5
# We precompute more neighbors than necessary, to have equivalence between
# k-neighbors estimator after radius-neighbors transformer, and vice-versa.
factor = 2
k_trans = neighbors.KNeighborsTransformer(
n_neighbors=n_neighbors, mode='distance')
k_trans_factor = neighbors.KNeighborsTransformer(
n_neighbors=int(n_neighbors * factor), mode='distance')
r_trans = neighbors.RadiusNeighborsTransformer(
radius=radius, mode='distance')
r_trans_factor = neighbors.RadiusNeighborsTransformer(
radius=int(radius * factor), mode='distance')
k_reg = neighbors.KNeighborsRegressor(n_neighbors=n_neighbors)
r_reg = neighbors.RadiusNeighborsRegressor(radius=radius)
test_list = [(k_trans, k_reg), (k_trans_factor, r_reg),
(r_trans, r_reg), (r_trans_factor, k_reg), ]
for trans, reg in test_list:
# compare the chained version and the compact version
reg_compact = clone(reg)
reg_precomp = clone(reg)
reg_precomp.set_params(metric='precomputed')
reg_chain = make_pipeline(clone(trans), reg_precomp)
y_pred_chain = reg_chain.fit(X, y).predict(X2)
y_pred_compact = reg_compact.fit(X, y).predict(X2)
assert_array_almost_equal(y_pred_chain, y_pred_compact)