Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ base / tests / test_distributed_estimation.py

import numpy as np
from numpy.testing import assert_equal, assert_, assert_allclose
from statsmodels.regression.linear_model import OLS
from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.genmod.families import Binomial
from statsmodels.base.distributed_estimation import _calc_grad, \
    _calc_wdesign_mat, _est_regularized_debiased, _join_debiased, \
    _est_regularized_naive, _est_unregularized_naive, _join_naive, \
    DistributedModel


def _data_gen(endog, exog, partitions):
    """partitions data"""

    n_exog = exog.shape[0]
    n_part = np.ceil(n_exog / partitions)

    n_part = np.floor(n_exog / partitions)
    rem = n_exog - n_part * partitions

    stp = 0

    while stp < (partitions - 1):
        ii = int(n_part * stp)
        jj = int(n_part * (stp + 1))
        yield endog[ii:jj], exog[ii:jj, :]
        stp += 1

    ii = int(n_part * stp)
    jj = int(n_part * (stp + 1) + rem)
    yield endog[ii:jj], exog[ii:jj, :]


def test_calc_grad():

    # separately tests that _calc_grad returns
    # sensible results
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    beta = np.random.normal(size=3)
    mod = OLS(y, X)
    grad = _calc_grad(mod, beta, 0.01, 1, {})
    assert_allclose(grad, np.array([19.75816, -6.62307, 7.324644]),
                    atol=1e-6, rtol=0)


def test_calc_wdesign_mat():

    # separately tests that _calc_wdesign_mat
    # returns sensible results
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(3, 3))
    y = np.random.randint(0, 2, size=3)
    beta = np.random.normal(size=3)
    mod = OLS(y, X)
    dmat = _calc_wdesign_mat(mod, beta, {})
    assert_allclose(dmat, np.array([[1.306314, -0.024897, 1.326498],
                                    [-0.539219, -0.483028, -0.703503],
                                    [-3.327987, 0.524541, -0.139761]]),
                    atol=1e-6, rtol=0)

    mod = GLM(y, X, family=Binomial())
    dmat = _calc_wdesign_mat(mod, beta, {})
    assert_allclose(dmat, np.array([[0.408616, -0.007788, 0.41493],
                                    [-0.263292, -0.235854, -0.343509],
                                    [-0.11241, 0.017718, -0.004721]]),
                    atol=1e-6, rtol=0)


def test_est_regularized_debiased():

    # tests that the shape of all the intermediate steps
    # remains correct for regularized debiased estimation,
    # does this for OLS and GLM

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    beta = np.random.normal(size=3)
    mod = OLS(y, X)
    res = _est_regularized_debiased(mod, 0, 2, fit_kwds={"alpha": 0.5})
    bhat = res[0]
    grad = res[1]
    ghat_l = res[2]
    that_l = res[3]

    assert_(isinstance(res, tuple))
    assert_equal(bhat.shape, beta.shape)
    assert_equal(grad.shape, beta.shape)
    assert_(isinstance(ghat_l, list))
    assert_(isinstance(that_l, list))
    assert_equal(len(ghat_l), len(that_l))
    assert_equal(ghat_l[0].shape, (2,))
    assert_(isinstance(that_l[0], float))

    mod = GLM(y, X, family=Binomial())
    res = _est_regularized_debiased(mod, 0, 2, fit_kwds={"alpha": 0.5})
    bhat = res[0]
    grad = res[1]
    ghat_l = res[2]
    that_l = res[3]

    assert_(isinstance(res, tuple))
    assert_equal(bhat.shape, beta.shape)
    assert_equal(grad.shape, beta.shape)
    assert_(isinstance(ghat_l, list))
    assert_(isinstance(that_l, list))
    assert_equal(len(ghat_l), len(that_l))
    assert_equal(ghat_l[0].shape, (2,))
    assert_(isinstance(that_l[0], float))


def test_est_regularized_naive():

    # tests that the shape of all the intermediate steps
    # remains correct for regularized naive estimation,
    # does this for OLS and GLM

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    beta = np.random.normal(size=3)
    mod = OLS(y, X)
    res = _est_regularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5})

    assert_equal(res.shape, beta.shape)

    mod = GLM(y, X, family=Binomial())
    res = _est_regularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5})

    assert_equal(res.shape, beta.shape)


def test_est_unregularized_naive():

    # tests that the shape of all the intermediate steps
    # remains correct for unregularized naive estimation,
    # does this for OLS and GLM

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    beta = np.random.normal(size=3)
    mod = OLS(y, X)
    res = _est_unregularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5})

    assert_equal(res.shape, beta.shape)

    mod = GLM(y, X, family=Binomial())
    res = _est_unregularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5})

    assert_equal(res.shape, beta.shape)


def test_join_debiased():

    # tests that the results of all the intermediate steps
    # remains correct for debiased join, does this for OLS and GLM
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    mod = OLS(y, X)
    res_l = []
    for i in range(2):
        res = _est_regularized_debiased(mod, i, 2, fit_kwds={"alpha": 0.1})
        res_l.append(res)
    joined = _join_debiased(res_l)
    assert_allclose(joined, np.array([-0.167548, -0.016567, -0.34414]),
                    atol=1e-6, rtol=0)

    mod = GLM(y, X, family=Binomial())
    res_l = []
    for i in range(2):
        res = _est_regularized_debiased(mod, i, 2, fit_kwds={"alpha": 0.1})
        res_l.append(res)
    joined = _join_debiased(res_l)
    assert_allclose(joined, np.array([-0.164515, -0.412854, -0.223955]),
                    atol=1e-6, rtol=0)


def test_join_naive():

    # tests that the results of all the intermediate steps
    # remains correct for naive join, does this for OLS and GLM
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)
    mod = OLS(y, X)
    res_l = []
    for i in range(2):
        res = _est_regularized_naive(mod, i, 2, fit_kwds={"alpha": 0.1})
        res_l.append(res)
    joined = _join_naive(res_l)
    assert_allclose(joined, np.array([-0.020757, 0., 0.]),
                    atol=1e-6, rtol=0)

    mod = GLM(y, X, family=Binomial())
    res_l = []
    for i in range(2):
        res = _est_regularized_naive(mod, i, 2, fit_kwds={"alpha": 0.1})
        res_l.append(res)
    joined = _join_naive(res_l)
    assert_allclose(joined, np.array([0., 0., 0.]),
                    atol=1e-6, rtol=0)


def test_fit_sequential():

    # tests that the results of all the intermediate steps
    # remains correct for sequential fit, does this for OLS and GLM
    # and a variety of model sizes
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)

    mod = DistributedModel(1, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 1), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.191606, -0.012565, -0.351398]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(2, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 2), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.157416, -0.029643, -0.471653]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(3, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 3), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.124891, -0.050934, -0.403354]),
                    atol=1e-6, rtol=0)

    mod = DistributedModel(1, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 1), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.164515, -0.412854, -0.223955]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(2, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 2), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.142513, -0.360324, -0.295485]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(3, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 3), parallel_method="sequential",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.110487, -0.306431, -0.243921]),
                    atol=1e-6, rtol=0)


def test_fit_joblib():

    # tests that the results of all the intermediate steps
    # remains correct for joblib fit, does this for OLS and GLM
    # and a variety of model sizes
    #
    # regression test

    np.random.seed(435265)
    X = np.random.normal(size=(50, 3))
    y = np.random.randint(0, 2, size=50)

    mod = DistributedModel(1, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 1), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.191606, -0.012565, -0.351398]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(2, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 2), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.157416, -0.029643, -0.471653]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(3, model_class=OLS)
    fit = mod.fit(_data_gen(y, X, 3), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.124891, -0.050934, -0.403354]),
                    atol=1e-6, rtol=0)

    mod = DistributedModel(1, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 1), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.164515, -0.412854, -0.223955]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(2, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 2), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.142513, -0.360324, -0.295485]),
                    atol=1e-6, rtol=0)
    mod = DistributedModel(3, model_class=GLM,
                           init_kwds={"family": Binomial()})
    fit = mod.fit(_data_gen(y, X, 3), parallel_method="joblib",
                  fit_kwds={"alpha": 0.5})
    assert_allclose(fit.params, np.array([-0.110487, -0.306431, -0.243921]),
                    atol=1e-6, rtol=0)


def test_single_partition():

    # tests that the results make sense if we have a single partition

    np.random.seed(435265)
    N = 200
    p = 10
    m = 1

    beta = np.random.normal(size=p)
    beta = beta * np.random.randint(0, 2, p)
    X = np.random.normal(size=(N, p))
    y = X.dot(beta) + np.random.normal(size=N)

    # test regularized OLS v. naive
    db_mod = DistributedModel(m)
    fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0})

    nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive,
                              join_method=_join_naive)
    fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0})

    ols_mod = OLS(y, X)
    fitOLS = ols_mod.fit(alpha=0)

    assert_allclose(fitOLSdb.params, fitOLS.params)
    assert_allclose(fitOLSnv.params, fitOLS.params)

    # test regularized
    nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive,
                              join_method=_join_naive)
    fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1})

    ols_mod = OLS(y, X)
    fitOLS = ols_mod.fit_regularized(alpha=0.1)

    assert_allclose(fitOLSnv.params, fitOLS.params)


def test_larger_p():

    # tests when p > N / m for the debiased and naive case

    np.random.seed(435265)
    N = 40
    p = 40
    m = 5

    beta = np.random.normal(size=p)
    beta = beta * np.random.randint(0, 2, p)
    X = np.random.normal(size=(N, p))
    y = X.dot(beta) + np.random.normal(size=N)

    db_mod = DistributedModel(m)
    fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1})
    assert_equal(np.sum(np.isnan(fitOLSdb.params)), 0)

    nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive,
                              join_method=_join_naive)
    fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1})
    assert_equal(np.sum(np.isnan(fitOLSnv.params)), 0)


def test_non_zero_params():

    # tests that the thresholding does not cause any issues

    np.random.seed(435265)
    N = 200
    p = 10
    m = 5

    beta = np.random.normal(size=p)
    beta = beta * np.random.randint(0, 2, p)
    X = np.random.normal(size=(N, p))
    y = X.dot(beta) + np.random.normal(size=N)

    db_mod = DistributedModel(m, join_kwds={"threshold": 0.13})
    fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1})
    ols_mod = OLS(y, X)
    fitOLS = ols_mod.fit_regularized(alpha=0.1)

    nz_params_db = 1 * (fitOLSdb.params != 0)
    nz_params_ols = 1 * (fitOLS.params != 0)

    assert_allclose(nz_params_db, nz_params_ols)


def test_repeat_partition():

    # tests that if we use identical partitions the average is the same
    # as the estimate for the full data

    np.random.seed(435265)
    N = 200
    p = 10
    m = 1

    beta = np.random.normal(size=p)
    beta = beta * np.random.randint(0, 2, p)
    X = np.random.normal(size=(N, p))
    y = X.dot(beta) + np.random.normal(size=N)

    def _rep_data_gen(endog, exog, partitions):
        """partitions data"""

        n_exog = exog.shape[0]
        n_part = np.ceil(n_exog / partitions)

        ii = 0
        while ii < n_exog:
            yield endog, exog
            ii += int(n_part)

    nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive,
                              join_method=_join_naive)
    fitOLSnv = nv_mod.fit(_rep_data_gen(y, X, m), fit_kwds={"alpha": 0.1})

    ols_mod = OLS(y, X)
    fitOLS = ols_mod.fit_regularized(alpha=0.1)

    assert_allclose(fitOLSnv.params, fitOLS.params)


def test_debiased_v_average():

    # tests that the debiased method performs better than the standard
    # average.  Does this for both OLS and GLM.

    np.random.seed(435265)
    N = 200
    p = 10
    m = 4

    beta = np.random.normal(size=p)
    beta = beta * np.random.randint(0, 2, p)
    X = np.random.normal(size=(N, p))
    y = X.dot(beta) + np.random.normal(size=N)

    db_mod = DistributedModel(m)
    fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2})
    olsdb = np.linalg.norm(fitOLSdb.params - beta)
    n_mod = DistributedModel(m, estimation_method=_est_regularized_naive,
                             join_method=_join_naive)
    fitOLSn = n_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2})
    olsn = np.linalg.norm(fitOLSn.params - beta)

    assert_(olsdb < olsn)

    prob = 1 / (1 + np.exp(-X.dot(beta) + np.random.normal(size=N)))
    y = 1. * (prob > 0.5)

    db_mod = DistributedModel(m, model_class=GLM,
                              init_kwds={"family": Binomial()})
    fitGLMdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2})
    glmdb = np.linalg.norm(fitGLMdb.params - beta)
    n_mod = DistributedModel(m, model_class=GLM,
                             init_kwds={"family": Binomial()},
                             estimation_method=_est_regularized_naive,
                             join_method=_join_naive)
    fitGLMn = n_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2})
    glmn = np.linalg.norm(fitGLMn.params - beta)

    assert_(glmdb < glmn)