Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / scipy   python

Repository URL to install this package:

Version: 1.3.3 

/ spatial / tests / test_distance.py

#
# Author: Damian Eads
# Date: April 17, 2008
#
# Copyright (C) 2008 Damian Eads
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#
# 3. The name of the author may not be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import division, print_function, absolute_import

import os.path

from functools import wraps, partial
from scipy._lib.six import xrange, u

import numpy as np
import warnings
from numpy.linalg import norm
from numpy.testing import (verbose, assert_,
                           assert_array_equal, assert_equal,
                           assert_almost_equal, assert_allclose)
import pytest
from pytest import raises as assert_raises

from scipy._lib._numpy_compat import suppress_warnings
from scipy.spatial.distance import (squareform, pdist, cdist, num_obs_y,
                                    num_obs_dm, is_valid_dm, is_valid_y,
                                    _validate_vector, _METRICS_NAMES)

# these were missing: chebyshev cityblock kulsinski
from scipy.spatial.distance import (braycurtis, canberra, chebyshev, cityblock,
                                    correlation, cosine, dice, euclidean,
                                    hamming, jaccard, jensenshannon,
                                    kulsinski, mahalanobis, matching,
                                    minkowski, rogerstanimoto, russellrao,
                                    seuclidean, sokalmichener, sokalsneath,
                                    sqeuclidean, yule)
from scipy.spatial.distance import wminkowski as old_wminkowski

_filenames = [
              "cdist-X1.txt",
              "cdist-X2.txt",
              "iris.txt",
              "pdist-boolean-inp.txt",
              "pdist-chebyshev-ml-iris.txt",
              "pdist-chebyshev-ml.txt",
              "pdist-cityblock-ml-iris.txt",
              "pdist-cityblock-ml.txt",
              "pdist-correlation-ml-iris.txt",
              "pdist-correlation-ml.txt",
              "pdist-cosine-ml-iris.txt",
              "pdist-cosine-ml.txt",
              "pdist-double-inp.txt",
              "pdist-euclidean-ml-iris.txt",
              "pdist-euclidean-ml.txt",
              "pdist-hamming-ml.txt",
              "pdist-jaccard-ml.txt",
              "pdist-jensenshannon-ml-iris.txt",
              "pdist-jensenshannon-ml.txt",
              "pdist-minkowski-3.2-ml-iris.txt",
              "pdist-minkowski-3.2-ml.txt",
              "pdist-minkowski-5.8-ml-iris.txt",
              "pdist-seuclidean-ml-iris.txt",
              "pdist-seuclidean-ml.txt",
              "pdist-spearman-ml.txt",
              "random-bool-data.txt",
              "random-double-data.txt",
              "random-int-data.txt",
              "random-uint-data.txt",
              ]

_tdist = np.array([[0, 662, 877, 255, 412, 996],
                      [662, 0, 295, 468, 268, 400],
                      [877, 295, 0, 754, 564, 138],
                      [255, 468, 754, 0, 219, 869],
                      [412, 268, 564, 219, 0, 669],
                      [996, 400, 138, 869, 669, 0]], dtype='double')

_ytdist = squareform(_tdist)

# A hashmap of expected output arrays for the tests. These arrays
# come from a list of text files, which are read prior to testing.
# Each test loads inputs and outputs from this dictionary.
eo = {}


def load_testing_files():
    for fn in _filenames:
        name = fn.replace(".txt", "").replace("-ml", "")
        fqfn = os.path.join(os.path.dirname(__file__), 'data', fn)
        fp = open(fqfn)
        eo[name] = np.loadtxt(fp)
        fp.close()
    eo['pdist-boolean-inp'] = np.bool_(eo['pdist-boolean-inp'])
    eo['random-bool-data'] = np.bool_(eo['random-bool-data'])
    eo['random-float32-data'] = np.float32(eo['random-double-data'])
    eo['random-int-data'] = np.int_(eo['random-int-data'])
    eo['random-uint-data'] = np.uint(eo['random-uint-data'])


load_testing_files()


def _chk_asarrays(arrays, axis=None):
    arrays = [np.asanyarray(a) for a in arrays]
    if axis is None:
        # np < 1.10 ravel removes subclass from arrays
        arrays = [np.ravel(a) if a.ndim != 1 else a
                  for a in arrays]
        axis = 0
    arrays = tuple(np.atleast_1d(a) for a in arrays)
    if axis < 0:
        if not all(a.ndim == arrays[0].ndim for a in arrays):
            raise ValueError("array ndim must be the same for neg axis")
        axis = range(arrays[0].ndim)[axis]
    return arrays + (axis,)


def _chk_weights(arrays, weights=None, axis=None,
                 force_weights=False, simplify_weights=True,
                 pos_only=False, neg_check=False,
                 nan_screen=False, mask_screen=False,
                 ddof=None):
    chked = _chk_asarrays(arrays, axis=axis)
    arrays, axis = chked[:-1], chked[-1]

    simplify_weights = simplify_weights and not force_weights
    if not force_weights and mask_screen:
        force_weights = any(np.ma.getmask(a) is not np.ma.nomask for a in arrays)

    if nan_screen:
        has_nans = [np.isnan(np.sum(a)) for a in arrays]
        if any(has_nans):
            mask_screen = True
            force_weights = True
            arrays = tuple(np.ma.masked_invalid(a) if has_nan else a
                           for a, has_nan in zip(arrays, has_nans))

    if weights is not None:
        weights = np.asanyarray(weights)
    elif force_weights:
        weights = np.ones(arrays[0].shape[axis])
    else:
        return arrays + (weights, axis)

    if ddof:
        weights = _freq_weights(weights)

    if mask_screen:
        weights = _weight_masked(arrays, weights, axis)

    if not all(weights.shape == (a.shape[axis],) for a in arrays):
        raise ValueError("weights shape must match arrays along axis")
    if neg_check and (weights < 0).any():
        raise ValueError("weights cannot be negative")

    if pos_only:
        pos_weights = np.nonzero(weights > 0)[0]
        if pos_weights.size < weights.size:
            arrays = tuple(np.take(a, pos_weights, axis=axis) for a in arrays)
            weights = weights[pos_weights]
    if simplify_weights and (weights == 1).all():
        weights = None
    return arrays + (weights, axis)


def _freq_weights(weights):
    if weights is None:
        return weights
    int_weights = weights.astype(int)
    if (weights != int_weights).any():
        raise ValueError("frequency (integer count-type) weights required %s" % weights)
    return int_weights


def _weight_masked(arrays, weights, axis):
    if axis is None:
        axis = 0
    weights = np.asanyarray(weights)
    for a in arrays:
        axis_mask = np.ma.getmask(a)
        if axis_mask is np.ma.nomask:
            continue
        if a.ndim > 1:
            not_axes = tuple(i for i in range(a.ndim) if i != axis)
            axis_mask = axis_mask.any(axis=not_axes)
        weights *= 1 - axis_mask.astype(int)
    return weights


def within_tol(a, b, tol):
    return np.abs(a - b).max() < tol


def _assert_within_tol(a, b, atol=0, rtol=0, verbose_=False):
    if verbose_:
        print(np.abs(a - b).max())
    assert_allclose(a, b, rtol=rtol, atol=atol)


def _rand_split(arrays, weights, axis, split_per, seed=None):
    # inverse operation for stats.collapse_weights
    weights = np.array(weights, dtype=np.float64)  # modified inplace; need a copy
    seeded_rand = np.random.RandomState(seed)

    def mytake(a, ix, axis):
        record = np.asanyarray(np.take(a, ix, axis=axis))
        return record.reshape([a.shape[i] if i != axis else 1
                               for i in range(a.ndim)])

    n_obs = arrays[0].shape[axis]
    assert all(a.shape[axis] == n_obs for a in arrays), "data must be aligned on sample axis"
    for i in range(int(split_per) * n_obs):
        split_ix = seeded_rand.randint(n_obs + i)
        prev_w = weights[split_ix]
        q = seeded_rand.rand()
        weights[split_ix] = q * prev_w
        weights = np.append(weights, (1. - q) * prev_w)
        arrays = [np.append(a, mytake(a, split_ix, axis=axis),
                            axis=axis) for a in arrays]
    return arrays, weights


def _rough_check(a, b, compare_assert=partial(assert_allclose, atol=1e-5),
                  key=lambda x: x, w=None):
    check_a = key(a)
    check_b = key(b)
    try:
        if np.array(check_a != check_b).any():  # try strict equality for string types
            compare_assert(check_a, check_b)
    except AttributeError:  # masked array
        compare_assert(check_a, check_b)
    except (TypeError, ValueError):  # nested data structure
        for a_i, b_i in zip(check_a, check_b):
            _rough_check(a_i, b_i, compare_assert=compare_assert)

# diff from test_stats:
#  n_args=2, weight_arg='w', default_axis=None
#  ma_safe = False, nan_safe = False
def _weight_checked(fn, n_args=2, default_axis=None, key=lambda x: x, weight_arg='w',
                    squeeze=True, silent=False,
                    ones_test=True, const_test=True, dup_test=True,
                    split_test=True, dud_test=True, ma_safe=False, ma_very_safe=False, nan_safe=False,
                    split_per=1.0, seed=0, compare_assert=partial(assert_allclose, atol=1e-5)):
    """runs fn on its arguments 2 or 3 ways, checks that the results are the same,
       then returns the same thing it would have returned before"""
    @wraps(fn)
    def wrapped(*args, **kwargs):
        result = fn(*args, **kwargs)

        arrays = args[:n_args]
        rest = args[n_args:]
        weights = kwargs.get(weight_arg, None)
        axis = kwargs.get('axis', default_axis)

        chked = _chk_weights(arrays, weights=weights, axis=axis, force_weights=True, mask_screen=True)
        arrays, weights, axis = chked[:-2], chked[-2], chked[-1]
        if squeeze:
            arrays = [np.atleast_1d(a.squeeze()) for a in arrays]

        try:
            # WEIGHTS CHECK 1: EQUAL WEIGHTED OBESERVATIONS
            args = tuple(arrays) + rest
            if ones_test:
                kwargs[weight_arg] = weights
                _rough_check(result, fn(*args, **kwargs), key=key)
            if const_test:
                kwargs[weight_arg] = weights * 101.0
                _rough_check(result, fn(*args, **kwargs), key=key)
                kwargs[weight_arg] = weights * 0.101
                try:
                    _rough_check(result, fn(*args, **kwargs), key=key)
                except Exception as e:
                    raise type(e)((e, arrays, weights))

            # WEIGHTS CHECK 2: ADDL 0-WEIGHTED OBS
            if dud_test:
                # add randomly resampled rows, weighted at 0
                dud_arrays, dud_weights = _rand_split(arrays, weights, axis, split_per=split_per, seed=seed)
                dud_weights[:weights.size] = weights  # not exactly 1 because of masked arrays
                dud_weights[weights.size:] = 0
                dud_args = tuple(dud_arrays) + rest
                kwargs[weight_arg] = dud_weights
                _rough_check(result, fn(*dud_args, **kwargs), key=key)
                # increase the value of those 0-weighted rows
                for a in dud_arrays:
                    indexer = [slice(None)] * a.ndim
                    indexer[axis] = slice(weights.size, None)
                    indexer = tuple(indexer)
                    a[indexer] = a[indexer] * 101
                dud_args = tuple(dud_arrays) + rest
                _rough_check(result, fn(*dud_args, **kwargs), key=key)
                # set those 0-weighted rows to NaNs
                for a in dud_arrays:
                    indexer = [slice(None)] * a.ndim
                    indexer[axis] = slice(weights.size, None)
                    indexer = tuple(indexer)
                    a[indexer] = a[indexer] * np.nan
                if kwargs.get("nan_policy", None) == "omit" and nan_safe:
                    dud_args = tuple(dud_arrays) + rest
                    _rough_check(result, fn(*dud_args, **kwargs), key=key)
                # mask out those nan values
                if ma_safe:
                    dud_arrays = [np.ma.masked_invalid(a) for a in dud_arrays]
                    dud_args = tuple(dud_arrays) + rest
                    _rough_check(result, fn(*dud_args, **kwargs), key=key)
                    if ma_very_safe:
                        kwargs[weight_arg] = None
                        _rough_check(result, fn(*dud_args, **kwargs), key=key)
                del dud_arrays, dud_args, dud_weights

            # WEIGHTS CHECK 3: DUPLICATE DATA (DUMB SPLITTING)
            if dup_test:
                dup_arrays = [np.append(a, a, axis=axis) for a in arrays]
                dup_weights = np.append(weights, weights) / 2.0
                dup_args = tuple(dup_arrays) + rest
                kwargs[weight_arg] = dup_weights
                _rough_check(result, fn(*dup_args, **kwargs), key=key)
                del dup_args, dup_arrays, dup_weights
Loading ...