Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ base / tests / test_screening.py

# -*- coding: utf-8 -*-
"""
Created on Wed May 23 12:53:27 2018

Author: Josef Perktold

"""

import numpy as np
from numpy.testing import assert_allclose, assert_equal
import pandas as pd

from statsmodels.discrete.discrete_model import Poisson, Logit
from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.genmod.families import family
from statsmodels.base._penalized import PenalizedMixin
from statsmodels.base._screening import VariableScreening


class PoissonPenalized(PenalizedMixin, Poisson):
    pass


class LogitPenalized(PenalizedMixin, Logit):
    pass


class GLMPenalized(PenalizedMixin, GLM):
    pass


def _get_poisson_data():
    np.random.seed(987865)

    nobs, k_vars = 100, 500
    k_nonzero = 5
    x = (np.random.rand(nobs, k_vars) +
         1. * (np.random.rand(nobs, 1) - 0.5)) * 2 - 1
    x *= 1.2

    x = (x - x.mean(0)) / x.std(0)
    x[:, 0] = 1
    beta = np.zeros(k_vars)
    idx_nonzero_true = [0, 100, 300, 400, 411]
    beta[idx_nonzero_true] = 1. / np.arange(1, k_nonzero + 1)
    beta = np.sqrt(beta)  # make small coefficients larger
    linpred = x.dot(beta)
    mu = np.exp(linpred)
    y = np.random.poisson(mu)
    return y, x, idx_nonzero_true, beta


def test_poisson_screening():

    np.random.seed(987865)

    y, x, idx_nonzero_true, beta = _get_poisson_data()
    nobs = len(y)

    xnames_true = ['var%4d' % ii for ii in idx_nonzero_true]
    xnames_true[0] = 'const'
    parameters = pd.DataFrame(beta[idx_nonzero_true], index=xnames_true,
                              columns=['true'])

    xframe_true = pd.DataFrame(x[:, idx_nonzero_true], columns=xnames_true)
    res_oracle = Poisson(y, xframe_true).fit()
    parameters['oracle'] = res_oracle.params

    mod_initial = PoissonPenalized(y, np.ones(nobs), pen_weight=nobs * 5)

    screener = VariableScreening(mod_initial)
    exog_candidates = x[:, 1:]
    res_screen = screener.screen_exog(exog_candidates, maxiter=10)

    assert_equal(np.sort(res_screen.idx_nonzero), idx_nonzero_true)

    xnames = ['var%4d' % ii for ii in res_screen.idx_nonzero]
    xnames[0] = 'const'

    # smoke test
    res_screen.results_final.summary(xname=xnames)
    res_screen.results_pen.summary()
    assert_equal(res_screen.results_final.mle_retvals['converged'], True)

    ps = pd.Series(res_screen.results_final.params, index=xnames, name='final')
    parameters = parameters.join(ps, how='outer')

    assert_allclose(parameters['oracle'], parameters['final'], atol=5e-6)


def test_screen_iterated():
    np.random.seed(987865)

    nobs, k_nonzero = 100, 5

    x = (np.random.rand(nobs, k_nonzero - 1) +
         1.* (np.random.rand(nobs, 1) - 0.5)) * 2 - 1
    x *= 1.2
    x = (x - x.mean(0)) / x.std(0)
    x = np.column_stack((np.ones(nobs), x))

    beta = 1. / np.arange(1, k_nonzero + 1)
    beta = np.sqrt(beta)  # make small coefficients larger
    linpred = x.dot(beta)
    mu = np.exp(linpred)
    y = np.random.poisson(mu)

    common = x[:, 1:].sum(1)[:, None]

    x_nonzero = x

    def exog_iterator():
        k_vars = 100

        n_batches = 6
        for i in range(n_batches):
            x = (0.05 * common + np.random.rand(nobs, k_vars) +
                 1.* (np.random.rand(nobs, 1) - 0.5)) * 2 - 1
            x *= 1.2
            if i < k_nonzero - 1:
                # hide a nonezero
                x[:, 10] = x_nonzero[:, i + 1]
            x = (x - x.mean(0)) / x.std(0)
            yield x

    dummy = np.ones(nobs)
    dummy[:nobs // 2] = 0
    exog_keep = np.column_stack((np.ones(nobs), dummy))
    for k in [1, 2]:
        mod_initial = PoissonPenalized(y, exog_keep[:, :k], pen_weight=nobs * 500)
        screener = VariableScreening(mod_initial)
        screener.k_max_add = 30

        final = screener.screen_exog_iterator(exog_iterator())
        names = ['var0_10', 'var1_10', 'var2_10', 'var3_10']
        assert_equal(final.exog_final_names, names)
        idx_full = np.array([[ 0, 10],
                             [ 1, 10],
                             [ 2, 10],
                             [ 3, 10]], dtype=np.int64)
        assert_equal(final.idx_nonzero_batches, idx_full)


def test_glmpoisson_screening():

    y, x, idx_nonzero_true, beta = _get_poisson_data()
    nobs = len(y)

    xnames_true = ['var%4d' % ii for ii in idx_nonzero_true]
    xnames_true[0] = 'const'
    parameters = pd.DataFrame(beta[idx_nonzero_true], index=xnames_true, columns=['true'])

    xframe_true = pd.DataFrame(x[:, idx_nonzero_true], columns=xnames_true)
    res_oracle = GLMPenalized(y, xframe_true, family=family.Poisson()).fit()
    parameters['oracle'] = res_oracle.params

    mod_initial = GLMPenalized(y, np.ones(nobs), family=family.Poisson())

    screener = VariableScreening(mod_initial)
    exog_candidates = x[:, 1:]
    res_screen = screener.screen_exog(exog_candidates, maxiter=10)

    assert_equal(np.sort(res_screen.idx_nonzero), idx_nonzero_true)

    xnames = ['var%4d' % ii for ii in res_screen.idx_nonzero]
    xnames[0] = 'const'

    # smoke test
    res_screen.results_final.summary(xname=xnames)
    res_screen.results_pen.summary()
    assert_equal(res_screen.results_final.mle_retvals['converged'], True)

    ps = pd.Series(res_screen.results_final.params, index=xnames, name='final')
    parameters = parameters.join(ps, how='outer')

    assert_allclose(parameters['oracle'], parameters['final'], atol=5e-6)


def _get_logit_data():
    np.random.seed(987865)

    nobs, k_vars = 300, 500
    k_nonzero = 5
    x = (np.random.rand(nobs, k_vars) +
         0.01 * (np.random.rand(nobs, 1) - 0.5)) * 2 - 1
    x *= 1.2

    x = (x - x.mean(0)) / x.std(0)
    x[:, 0] = 1
    beta = np.zeros(k_vars)
    idx_nonzero_true = [0, 100, 300, 400, 411]
    beta[idx_nonzero_true] = 1. / np.arange(1, k_nonzero + 1)**0.5
    beta = np.sqrt(beta)  # make small coefficients larger
    linpred = x.dot(beta)
    mu = 1 / (1 + np.exp(-linpred))
    y = (np.random.rand(len(mu)) < mu).astype(int)
    return y, x, idx_nonzero_true, beta


def test_logit_screening():

    y, x, idx_nonzero_true, beta = _get_logit_data()
    nobs = len(y)
    # test uses
    screener_kwds = dict(pen_weight=nobs * 0.7, threshold_trim=1e-3)

    xnames_true = ['var%4d' % ii for ii in idx_nonzero_true]
    xnames_true[0] = 'const'
    parameters = pd.DataFrame(beta[idx_nonzero_true], index=xnames_true,
                              columns=['true'])

    xframe_true = pd.DataFrame(x[:, idx_nonzero_true], columns=xnames_true)
    res_oracle = Logit(y, xframe_true).fit()
    parameters['oracle'] = res_oracle.params

    mod_initial = LogitPenalized(y, np.ones(nobs), pen_weight=nobs * 0.5)
    screener = VariableScreening(mod_initial, **screener_kwds)
    screener.k_max_add = 30
    exog_candidates = x[:, 1:]
    res_screen = screener.screen_exog(exog_candidates, maxiter=30)

    # we have extra variables, check index for larger params
    mask = np.abs(res_screen.results_final.params) > 0.1
    assert_equal(np.sort(res_screen.idx_nonzero[mask]), idx_nonzero_true)
    # regression test
    idx_r = np.array([0, 74, 100, 163, 300, 400, 411])
    assert_equal(np.sort(res_screen.idx_nonzero), idx_r)

    xnames = ['var%4d' % ii for ii in res_screen.idx_nonzero]
    xnames[0] = 'const'

    # smoke test
    res_screen.results_final.summary(xname=xnames)
    res_screen.results_pen.summary()
    assert_equal(res_screen.results_final.mle_retvals['converged'], True)

    ps = pd.Series(res_screen.results_final.params, index=xnames, name='final')
    # changed the following to allow for some extra params
    # parameters = parameters.join(ps, how='outer')
    parameters['final'] = ps

    assert_allclose(parameters['oracle'], parameters['final'], atol=0.005)


def test_glmlogit_screening():

    y, x, idx_nonzero_true, beta = _get_logit_data()
    nobs = len(y)

    # test uses
    screener_kwds = dict(pen_weight=nobs * 0.75, threshold_trim=1e-3,
                         ranking_attr='model.score_factor')

    xnames_true = ['var%4d' % ii for ii in idx_nonzero_true]
    xnames_true[0] = 'const'
    parameters = pd.DataFrame(beta[idx_nonzero_true], index=xnames_true,
                              columns=['true'])

    xframe_true = pd.DataFrame(x[:, idx_nonzero_true], columns=xnames_true)
    res_oracle = GLMPenalized(y, xframe_true, family=family.Binomial()).fit()
    parameters['oracle'] = res_oracle.params

    #mod_initial = LogitPenalized(y, np.ones(nobs), pen_weight=nobs * 0.5)
    mod_initial = GLMPenalized(y, np.ones(nobs), family=family.Binomial())

    screener = VariableScreening(mod_initial, **screener_kwds)
    screener.k_max_add = 10
    exog_candidates = x[:, 1:]
    res_screen = screener.screen_exog(exog_candidates, maxiter=30)

    res_screen.idx_nonzero

    res_screen.results_final

    xnames = ['var%4d' % ii for ii in res_screen.idx_nonzero]
    xnames[0] = 'const'

    # smoke test
    res_screen.results_final.summary(xname=xnames)
    res_screen.results_pen.summary()
    assert_equal(res_screen.results_final.mle_retvals['converged'], True)

    ps = pd.Series(res_screen.results_final.params, index=xnames, name='final')
    # changed the following to allow for some extra params
    # parameters = parameters.join(ps, how='outer')
    parameters['final'] = ps

    assert_allclose(parameters['oracle'], parameters['final'], atol=0.005)


def _get_gaussian_data():
    np.random.seed(987865)

    nobs, k_vars = 100, 500
    k_nonzero = 5
    x = (np.random.rand(nobs, k_vars) +
         0.01 * (np.random.rand(nobs, 1) - 0.5)) * 2 - 1
    x *= 1.2

    x = (x - x.mean(0)) / x.std(0)
    x[:, 0] = 1  # make first column into constant
    beta = np.zeros(k_vars)
    idx_nonzero_true = [0, 1, 300, 400, 411]
    beta[idx_nonzero_true] = 1. / np.arange(1, k_nonzero + 1)
    beta = np.sqrt(beta)  # make small coefficients larger
    linpred = x.dot(beta)
    y = linpred + 0.1 * np.random.randn(len(linpred))

    return y, x, idx_nonzero_true, beta


def test_glmgaussian_screening():

    y, x, idx_nonzero_true, beta = _get_gaussian_data()
    nobs = len(y)
    # demeaning makes constant zero, checks that exog_keep are not trimmed
    y = y - y.mean(0)

    # test uses
    screener_kwds = dict(pen_weight=nobs * 0.75, threshold_trim=1e-3,
                         ranking_attr='model.score_factor')

    xnames_true = ['var%4d' % ii for ii in idx_nonzero_true]
    xnames_true[0] = 'const'
    parameters = pd.DataFrame(beta[idx_nonzero_true], index=xnames_true,
                              columns=['true'])

    xframe_true = pd.DataFrame(x[:, idx_nonzero_true], columns=xnames_true)
    res_oracle = GLMPenalized(y, xframe_true, family=family.Gaussian()).fit()
    parameters['oracle'] = res_oracle.params

    for k_keep in [1, 2]:
        mod_initial = GLMPenalized(y, x[:, :k_keep], family=family.Gaussian())
        screener = VariableScreening(mod_initial, **screener_kwds)
        exog_candidates = x[:, k_keep:]
        res_screen = screener.screen_exog(exog_candidates, maxiter=30)

        assert_equal(np.sort(res_screen.idx_nonzero), idx_nonzero_true)

        xnames = ['var%4d' % ii for ii in res_screen.idx_nonzero]
        xnames[0] = 'const'

        # smoke test
        res_screen.results_final.summary(xname=xnames)
        res_screen.results_pen.summary()
        assert_equal(res_screen.results_final.mle_retvals['converged'], True)

        ps = pd.Series(res_screen.results_final.params, index=xnames, name='final')
        parameters = parameters.join(ps, how='outer')

        assert_allclose(parameters['oracle'], parameters['final'], atol=1e-5)
        # we need to remove 'final' again for next iteration
        del parameters['final']