import numpy as np
from numpy.testing import assert_array_almost_equal
import pytest
from sklearn.neighbors._kd_tree import (KDTree, NeighborsHeap,
simultaneous_sort, kernel_norm,
nodeheap_sort, DTYPE, ITYPE)
from sklearn.neighbors import DistanceMetric
from sklearn.utils import check_random_state
from sklearn.utils._testing import assert_allclose
DIMENSION = 3
METRICS = {'euclidean': {},
'manhattan': {},
'chebyshev': {},
'minkowski': dict(p=3)}
def test_kd_tree_query_radius(n_samples=100, n_features=10):
rng = check_random_state(0)
X = 2 * rng.random_sample(size=(n_samples, n_features)) - 1
query_pt = np.zeros(n_features, dtype=float)
eps = 1E-15 # roundoff error can cause test to fail
kdt = KDTree(X, leaf_size=5)
rad = np.sqrt(((X - query_pt) ** 2).sum(1))
for r in np.linspace(rad[0], rad[-1], 100):
ind = kdt.query_radius([query_pt], r + eps)[0]
i = np.where(rad <= r + eps)[0]
ind.sort()
i.sort()
assert_array_almost_equal(i, ind)
def test_kd_tree_query_radius_distance(n_samples=100, n_features=10):
rng = check_random_state(0)
X = 2 * rng.random_sample(size=(n_samples, n_features)) - 1
query_pt = np.zeros(n_features, dtype=float)
eps = 1E-15 # roundoff error can cause test to fail
kdt = KDTree(X, leaf_size=5)
rad = np.sqrt(((X - query_pt) ** 2).sum(1))
for r in np.linspace(rad[0], rad[-1], 100):
ind, dist = kdt.query_radius([query_pt], r + eps, return_distance=True)
ind = ind[0]
dist = dist[0]
d = np.sqrt(((query_pt - X[ind]) ** 2).sum(1))
assert_array_almost_equal(d, dist)
def compute_kernel_slow(Y, X, kernel, h):
d = np.sqrt(((Y[:, None, :] - X) ** 2).sum(-1))
norm = kernel_norm(h, X.shape[1], kernel)
if kernel == 'gaussian':
return norm * np.exp(-0.5 * (d * d) / (h * h)).sum(-1)
elif kernel == 'tophat':
return norm * (d < h).sum(-1)
elif kernel == 'epanechnikov':
return norm * ((1.0 - (d * d) / (h * h)) * (d < h)).sum(-1)
elif kernel == 'exponential':
return norm * (np.exp(-d / h)).sum(-1)
elif kernel == 'linear':
return norm * ((1 - d / h) * (d < h)).sum(-1)
elif kernel == 'cosine':
return norm * (np.cos(0.5 * np.pi * d / h) * (d < h)).sum(-1)
else:
raise ValueError('kernel not recognized')
def check_results(kernel, h, atol, rtol, breadth_first, Y, kdt, dens_true):
dens = kdt.kernel_density(Y, h, atol=atol, rtol=rtol,
kernel=kernel,
breadth_first=breadth_first)
assert_allclose(dens, dens_true, atol=atol,
rtol=max(rtol, 1e-7))
@pytest.mark.parametrize('kernel',
['gaussian', 'tophat', 'epanechnikov',
'exponential', 'linear', 'cosine'])
@pytest.mark.parametrize('h', [0.01, 0.1, 1])
def test_kd_tree_kde(kernel, h):
n_samples, n_features = (100, 3)
rng = check_random_state(0)
X = rng.random_sample((n_samples, n_features))
Y = rng.random_sample((n_samples, n_features))
kdt = KDTree(X, leaf_size=10)
dens_true = compute_kernel_slow(Y, X, kernel, h)
for rtol in [0, 1E-5]:
for atol in [1E-6, 1E-2]:
for breadth_first in (True, False):
check_results(kernel, h, atol, rtol,
breadth_first, Y, kdt, dens_true)
def test_gaussian_kde(n_samples=1000):
# Compare gaussian KDE results to scipy.stats.gaussian_kde
from scipy.stats import gaussian_kde
rng = check_random_state(0)
x_in = rng.normal(0, 1, n_samples)
x_out = np.linspace(-5, 5, 30)
for h in [0.01, 0.1, 1]:
kdt = KDTree(x_in[:, None])
gkde = gaussian_kde(x_in, bw_method=h / np.std(x_in))
dens_kdt = kdt.kernel_density(x_out[:, None], h) / n_samples
dens_gkde = gkde.evaluate(x_out)
assert_array_almost_equal(dens_kdt, dens_gkde, decimal=3)
@pytest.mark.parametrize('dualtree', (True, False))
def test_kd_tree_two_point(dualtree):
n_samples, n_features = (100, 3)
rng = check_random_state(0)
X = rng.random_sample((n_samples, n_features))
Y = rng.random_sample((n_samples, n_features))
r = np.linspace(0, 1, 10)
kdt = KDTree(X, leaf_size=10)
D = DistanceMetric.get_metric("euclidean").pairwise(Y, X)
counts_true = [(D <= ri).sum() for ri in r]
counts = kdt.two_point_correlation(Y, r=r, dualtree=dualtree)
assert_array_almost_equal(counts, counts_true)
def test_neighbors_heap(n_pts=5, n_nbrs=10):
heap = NeighborsHeap(n_pts, n_nbrs)
rng = np.random.RandomState(42)
for row in range(n_pts):
d_in = rng.random_sample(2 * n_nbrs).astype(DTYPE, copy=False)
i_in = np.arange(2 * n_nbrs, dtype=ITYPE)
for d, i in zip(d_in, i_in):
heap.push(row, d, i)
ind = np.argsort(d_in)
d_in = d_in[ind]
i_in = i_in[ind]
d_heap, i_heap = heap.get_arrays(sort=True)
assert_array_almost_equal(d_in[:n_nbrs], d_heap[row])
assert_array_almost_equal(i_in[:n_nbrs], i_heap[row])
def test_node_heap(n_nodes=50):
rng = np.random.RandomState(42)
vals = rng.random_sample(n_nodes).astype(DTYPE, copy=False)
i1 = np.argsort(vals)
vals2, i2 = nodeheap_sort(vals)
assert_array_almost_equal(i1, i2)
assert_array_almost_equal(vals[i1], vals2)
def test_simultaneous_sort(n_rows=10, n_pts=201):
rng = np.random.RandomState(42)
dist = rng.random_sample((n_rows, n_pts)).astype(DTYPE, copy=False)
ind = (np.arange(n_pts) + np.zeros((n_rows, 1))).astype(ITYPE, copy=False)
dist2 = dist.copy()
ind2 = ind.copy()
# simultaneous sort rows using function
simultaneous_sort(dist, ind)
# simultaneous sort rows using numpy
i = np.argsort(dist2, axis=1)
row_ind = np.arange(n_rows)[:, None]
dist2 = dist2[row_ind, i]
ind2 = ind2[row_ind, i]
assert_array_almost_equal(dist, dist2)
assert_array_almost_equal(ind, ind2)