Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / scikit-learn   python

Repository URL to install this package:

Version: 0.22 

/ decomposition / tests / test_online_lda.py

import sys

import numpy as np
from scipy.linalg import block_diag
from scipy.sparse import csr_matrix
from scipy.special import psi

import pytest

from sklearn.decomposition import LatentDirichletAllocation
from sklearn.decomposition._online_lda import (_dirichlet_expectation_1d,
                                               _dirichlet_expectation_2d)

from sklearn.utils._testing import assert_allclose
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_almost_equal
from sklearn.utils._testing import if_safe_multiprocessing_with_blas

from sklearn.exceptions import NotFittedError
from io import StringIO


def _build_sparse_mtx():
    # Create 3 topics and each topic has 3 distinct words.
    # (Each word only belongs to a single topic.)
    n_components = 3
    block = np.full((3, 3), n_components, dtype=np.int)
    blocks = [block] * n_components
    X = block_diag(*blocks)
    X = csr_matrix(X)
    return (n_components, X)


def test_lda_default_prior_params():
    # default prior parameter should be `1 / topics`
    # and verbose params should not affect result
    n_components, X = _build_sparse_mtx()
    prior = 1. / n_components
    lda_1 = LatentDirichletAllocation(n_components=n_components,
                                      doc_topic_prior=prior,
                                      topic_word_prior=prior, random_state=0)
    lda_2 = LatentDirichletAllocation(n_components=n_components,
                                      random_state=0)
    topic_distr_1 = lda_1.fit_transform(X)
    topic_distr_2 = lda_2.fit_transform(X)
    assert_almost_equal(topic_distr_1, topic_distr_2)


def test_lda_fit_batch():
    # Test LDA batch learning_offset (`fit` method with 'batch' learning)
    rng = np.random.RandomState(0)
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components,
                                    evaluate_every=1, learning_method='batch',
                                    random_state=rng)
    lda.fit(X)

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for component in lda.components_:
        # Find top 3 words in each LDA component
        top_idx = set(component.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


def test_lda_fit_online():
    # Test LDA online learning (`fit` method with 'online' learning)
    rng = np.random.RandomState(0)
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components,
                                    learning_offset=10., evaluate_every=1,
                                    learning_method='online', random_state=rng)
    lda.fit(X)

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for component in lda.components_:
        # Find top 3 words in each LDA component
        top_idx = set(component.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


def test_lda_partial_fit():
    # Test LDA online learning (`partial_fit` method)
    # (same as test_lda_batch)
    rng = np.random.RandomState(0)
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components,
                                    learning_offset=10., total_samples=100,
                                    random_state=rng)
    for i in range(3):
        lda.partial_fit(X)

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for c in lda.components_:
        top_idx = set(c.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


def test_lda_dense_input():
    # Test LDA with dense input.
    rng = np.random.RandomState(0)
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components,
                                    learning_method='batch', random_state=rng)
    lda.fit(X.toarray())

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for component in lda.components_:
        # Find top 3 words in each LDA component
        top_idx = set(component.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


def test_lda_transform():
    # Test LDA transform.
    # Transform result cannot be negative and should be normalized
    rng = np.random.RandomState(0)
    X = rng.randint(5, size=(20, 10))
    n_components = 3
    lda = LatentDirichletAllocation(n_components=n_components,
                                    random_state=rng)
    X_trans = lda.fit_transform(X)
    assert (X_trans > 0.0).any()
    assert_array_almost_equal(np.sum(X_trans, axis=1),
                              np.ones(X_trans.shape[0]))


@pytest.mark.parametrize('method', ('online', 'batch'))
def test_lda_fit_transform(method):
    # Test LDA fit_transform & transform
    # fit_transform and transform result should be the same
    rng = np.random.RandomState(0)
    X = rng.randint(10, size=(50, 20))
    lda = LatentDirichletAllocation(n_components=5, learning_method=method,
                                    random_state=rng)
    X_fit = lda.fit_transform(X)
    X_trans = lda.transform(X)
    assert_array_almost_equal(X_fit, X_trans, 4)


def test_lda_partial_fit_dim_mismatch():
    # test `n_features` mismatch in `partial_fit`
    rng = np.random.RandomState(0)
    n_components = rng.randint(3, 6)
    n_col = rng.randint(6, 10)
    X_1 = np.random.randint(4, size=(10, n_col))
    X_2 = np.random.randint(4, size=(10, n_col + 1))
    lda = LatentDirichletAllocation(n_components=n_components,
                                    learning_offset=5., total_samples=20,
                                    random_state=rng)
    lda.partial_fit(X_1)
    with pytest.raises(ValueError, match=r"^The provided data has"):
        lda.partial_fit(X_2)


def test_invalid_params():
    # test `_check_params` method
    X = np.ones((5, 10))

    invalid_models = (
        ('n_components', LatentDirichletAllocation(n_components=0)),
        ('learning_method',
         LatentDirichletAllocation(learning_method='unknown')),
        ('total_samples', LatentDirichletAllocation(total_samples=0)),
        ('learning_offset', LatentDirichletAllocation(learning_offset=-1)),
    )
    for param, model in invalid_models:
        regex = r"^Invalid %r parameter" % param
        with pytest.raises(ValueError, match=regex):
            model.fit(X)


def test_lda_negative_input():
    # test pass dense matrix with sparse negative input.
    X = np.full((5, 10), -1.)
    lda = LatentDirichletAllocation()
    regex = r"^Negative values in data passed"
    with pytest.raises(ValueError, match=regex):
        lda.fit(X)


def test_lda_no_component_error():
    # test `perplexity` before `fit`
    rng = np.random.RandomState(0)
    X = rng.randint(4, size=(20, 10))
    lda = LatentDirichletAllocation()
    regex = ("This LatentDirichletAllocation instance is not fitted yet. "
             "Call 'fit' with appropriate arguments before using this "
             "estimator.")
    with pytest.raises(NotFittedError, match=regex):
        lda.perplexity(X)


def test_lda_transform_mismatch():
    # test `n_features` mismatch in partial_fit and transform
    rng = np.random.RandomState(0)
    X = rng.randint(4, size=(20, 10))
    X_2 = rng.randint(4, size=(10, 8))

    n_components = rng.randint(3, 6)
    lda = LatentDirichletAllocation(n_components=n_components,
                                    random_state=rng)
    lda.partial_fit(X)
    with pytest.raises(ValueError, match=r"^The provided data has"):
        lda.partial_fit(X_2)


@if_safe_multiprocessing_with_blas
@pytest.mark.parametrize('method', ('online', 'batch'))
def test_lda_multi_jobs(method):
    n_components, X = _build_sparse_mtx()
    # Test LDA batch training with multi CPU
    rng = np.random.RandomState(0)
    lda = LatentDirichletAllocation(n_components=n_components, n_jobs=2,
                                    learning_method=method,
                                    evaluate_every=1, random_state=rng)
    lda.fit(X)

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for c in lda.components_:
        top_idx = set(c.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


@if_safe_multiprocessing_with_blas
def test_lda_partial_fit_multi_jobs():
    # Test LDA online training with multi CPU
    rng = np.random.RandomState(0)
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components, n_jobs=2,
                                    learning_offset=5., total_samples=30,
                                    random_state=rng)
    for i in range(2):
        lda.partial_fit(X)

    correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
    for c in lda.components_:
        top_idx = set(c.argsort()[-3:][::-1])
        assert tuple(sorted(top_idx)) in correct_idx_grps


def test_lda_preplexity_mismatch():
    # test dimension mismatch in `perplexity` method
    rng = np.random.RandomState(0)
    n_components = rng.randint(3, 6)
    n_samples = rng.randint(6, 10)
    X = np.random.randint(4, size=(n_samples, 10))
    lda = LatentDirichletAllocation(n_components=n_components,
                                    learning_offset=5., total_samples=20,
                                    random_state=rng)
    lda.fit(X)
    # invalid samples
    invalid_n_samples = rng.randint(4, size=(n_samples + 1, n_components))
    with pytest.raises(ValueError, match=r'Number of samples'):
        lda._perplexity_precomp_distr(X, invalid_n_samples)
    # invalid topic number
    invalid_n_components = rng.randint(4, size=(n_samples, n_components + 1))
    with pytest.raises(ValueError, match=r'Number of topics'):
        lda._perplexity_precomp_distr(X, invalid_n_components)


@pytest.mark.parametrize('method', ('online', 'batch'))
def test_lda_perplexity(method):
    # Test LDA perplexity for batch training
    # perplexity should be lower after each iteration
    n_components, X = _build_sparse_mtx()
    lda_1 = LatentDirichletAllocation(n_components=n_components,
                                      max_iter=1, learning_method=method,
                                      total_samples=100, random_state=0)
    lda_2 = LatentDirichletAllocation(n_components=n_components,
                                      max_iter=10, learning_method=method,
                                      total_samples=100, random_state=0)
    lda_1.fit(X)
    perp_1 = lda_1.perplexity(X, sub_sampling=False)

    lda_2.fit(X)
    perp_2 = lda_2.perplexity(X, sub_sampling=False)
    assert perp_1 >= perp_2

    perp_1_subsampling = lda_1.perplexity(X, sub_sampling=True)
    perp_2_subsampling = lda_2.perplexity(X, sub_sampling=True)
    assert perp_1_subsampling >= perp_2_subsampling


@pytest.mark.parametrize('method', ('online', 'batch'))
def test_lda_score(method):
    # Test LDA score for batch training
    # score should be higher after each iteration
    n_components, X = _build_sparse_mtx()
    lda_1 = LatentDirichletAllocation(n_components=n_components,
                                      max_iter=1, learning_method=method,
                                      total_samples=100, random_state=0)
    lda_2 = LatentDirichletAllocation(n_components=n_components,
                                      max_iter=10, learning_method=method,
                                      total_samples=100, random_state=0)
    lda_1.fit_transform(X)
    score_1 = lda_1.score(X)

    lda_2.fit_transform(X)
    score_2 = lda_2.score(X)
    assert score_2 >= score_1


def test_perplexity_input_format():
    # Test LDA perplexity for sparse and dense input
    # score should be the same for both dense and sparse input
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components, max_iter=1,
                                    learning_method='batch',
                                    total_samples=100, random_state=0)
    lda.fit(X)
    perp_1 = lda.perplexity(X)
    perp_2 = lda.perplexity(X.toarray())
    assert_almost_equal(perp_1, perp_2)


def test_lda_score_perplexity():
    # Test the relationship between LDA score and perplexity
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components, max_iter=10,
                                    random_state=0)
    lda.fit(X)
    perplexity_1 = lda.perplexity(X, sub_sampling=False)

    score = lda.score(X)
    perplexity_2 = np.exp(-1. * (score / np.sum(X.data)))
    assert_almost_equal(perplexity_1, perplexity_2)


def test_lda_fit_perplexity():
    # Test that the perplexity computed during fit is consistent with what is
    # returned by the perplexity method
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components, max_iter=1,
                                    learning_method='batch', random_state=0,
                                    evaluate_every=1)
    lda.fit(X)

    # Perplexity computed at end of fit method
    perplexity1 = lda.bound_

    # Result of perplexity method on the train set
    perplexity2 = lda.perplexity(X)

    assert_almost_equal(perplexity1, perplexity2)


def test_lda_empty_docs():
    """Test LDA on empty document (all-zero rows)."""
    Z = np.zeros((5, 4))
    for X in [Z, csr_matrix(Z)]:
        lda = LatentDirichletAllocation(max_iter=750).fit(X)
        assert_almost_equal(lda.components_.sum(axis=0),
                            np.ones(lda.components_.shape[1]))


def test_dirichlet_expectation():
    """Test Cython version of Dirichlet expectation calculation."""
    x = np.logspace(-100, 10, 10000)
    expectation = np.empty_like(x)
    _dirichlet_expectation_1d(x, 0, expectation)
    assert_allclose(expectation, np.exp(psi(x) - psi(np.sum(x))),
                    atol=1e-19)

    x = x.reshape(100, 100)
    assert_allclose(_dirichlet_expectation_2d(x),
                    psi(x) - psi(np.sum(x, axis=1)[:, np.newaxis]),
                    rtol=1e-11, atol=3e-9)


def check_verbosity(verbose, evaluate_every, expected_lines,
                    expected_perplexities):
    n_components, X = _build_sparse_mtx()
    lda = LatentDirichletAllocation(n_components=n_components, max_iter=3,
                                    learning_method='batch',
                                    verbose=verbose,
                                    evaluate_every=evaluate_every,
                                    random_state=0)
    out = StringIO()
    old_out, sys.stdout = sys.stdout, out
    try:
        lda.fit(X)
    finally:
        sys.stdout = old_out

    n_lines = out.getvalue().count('\n')
    n_perplexity = out.getvalue().count('perplexity')
    assert expected_lines == n_lines
    assert expected_perplexities == n_perplexity


@pytest.mark.parametrize(
        'verbose,evaluate_every,expected_lines,expected_perplexities',
        [(False, 1, 0, 0),
         (False, 0, 0, 0),
         (True, 0, 3, 0),
         (True, 1, 3, 3),
         (True, 2, 3, 1)])
def test_verbosity(verbose, evaluate_every, expected_lines,
                   expected_perplexities):
    check_verbosity(verbose, evaluate_every, expected_lines,
                    expected_perplexities)