Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ regression / tests / test_rolling.py

from io import BytesIO
from itertools import product

import numpy as np
import pandas as pd
import pytest
from numpy.testing import assert_allclose, assert_array_equal

from statsmodels import tools
from statsmodels.regression.linear_model import WLS
from statsmodels.regression.rolling import RollingWLS, RollingOLS


def gen_data(nobs, nvar, const, pandas=False, missing=0.0,
             weights=False):
    rs = np.random.RandomState(987499302)
    x = rs.standard_normal((nobs, nvar))
    cols = ['x{0}'.format(i) for i in range(nvar)]
    if const:
        x = tools.add_constant(x)
        cols = ['const'] + cols
    if missing > 0.0:
        mask = rs.random_sample(x.shape) < missing
        x[mask] = np.nan
    if x.shape[1] > 1:
        y = x[:, :-1].sum(1) + rs.standard_normal(nobs)
    else:
        y = x.sum(1) + rs.standard_normal(nobs)
    w = rs.chisquare(5, y.shape[0]) / 5
    if pandas:
        idx = pd.date_range('12-31-1999', periods=nobs)
        x = pd.DataFrame(x, index=idx, columns=cols)
        y = pd.Series(y, index=idx, name='y')
        w = pd.Series(w, index=idx, name='weights')
    if not weights:
        w = None

    return y, x, w


nobs = (250,)
nvar = (3, 0)
tf = (True, False)
missing = (0, 0.1)
params = list(product(nobs, nvar, tf, tf, missing))
params = [param for param in params if param[1] + param[2] > 0]
ids = ['-'.join(map(str, param)) for param in params]

basic_params = [param for param in params if params[2] and params[4]]
weighted_params = [param + (tf,) for param in params for tf in (True, False)]
weighted_ids = ['-'.join(map(str, param)) for param in weighted_params]


@pytest.fixture(scope='module', params=params, ids=ids)
def data(request):
    return gen_data(*request.param)


@pytest.fixture(scope='module', params=basic_params, ids=ids)
def basic_data(request):
    return gen_data(*request.param)


@pytest.fixture(scope='module', params=weighted_params, ids=weighted_ids)
def weighted_data(request):
    return gen_data(*request.param)


def get_single(x, idx):
    if isinstance(x, (pd.Series, pd.DataFrame)):
        return x.iloc[idx]
    return x[idx]


def get_sub(x, idx, window):
    if isinstance(x, (pd.Series, pd.DataFrame)):
        out = x.iloc[idx - window:idx]
        return np.asarray(out)
    return x[idx - window:idx]


def test_has_nan(data):
    y, x, w = data
    mod = RollingWLS(y, x, window=100, weights=w)
    has_nan = np.zeros(y.shape[0], dtype=np.bool)
    for i in range(100, y.shape[0] + 1):
        _y = get_sub(y, i, 100)
        _x = get_sub(x, i, 100)
        has_nan[i - 1] = np.squeeze((np.any(np.isnan(_y)) or
                                     np.any(np.isnan(_x))))
    assert_array_equal(mod._has_nan, has_nan)


def test_weighted_against_wls(weighted_data):
    y, x, w = weighted_data
    mod = RollingWLS(y, x, weights=w, window=100)
    res = mod.fit(use_t=True)
    for i in range(100, y.shape[0]):
        _y = get_sub(y, i, 100)
        _x = get_sub(x, i, 100)
        if w is not None:
            _w = get_sub(w, i, 100)
        else:
            _w = np.ones_like(_y)
        wls = WLS(_y, _x, weights=_w, missing='drop').fit()
        rolling_params = get_single(res.params, i - 1)
        rolling_nobs = get_single(res.nobs, i - 1)
        assert_allclose(rolling_params, wls.params)
        assert_allclose(rolling_nobs, wls.nobs)
        assert_allclose(get_single(res.ssr, i - 1), wls.ssr)
        assert_allclose(get_single(res.llf, i - 1), wls.llf)
        assert_allclose(get_single(res.aic, i - 1), wls.aic)
        assert_allclose(get_single(res.bic, i - 1), wls.bic)
        assert_allclose(get_single(res.centered_tss, i - 1), wls.centered_tss)
        assert_allclose(res.df_model, wls.df_model)
        assert_allclose(get_single(res.df_resid, i - 1), wls.df_resid)
        assert_allclose(get_single(res.ess, i - 1), wls.ess, atol=1e-8)
        assert_allclose(res.k_constant, wls.k_constant)
        assert_allclose(get_single(res.mse_model, i - 1), wls.mse_model)
        assert_allclose(get_single(res.mse_resid, i - 1), wls.mse_resid)
        assert_allclose(get_single(res.mse_total, i - 1), wls.mse_total)
        assert_allclose(get_single(res.rsquared, i - 1), wls.rsquared,
                        atol=1e-8)
        assert_allclose(get_single(res.rsquared_adj, i - 1), wls.rsquared_adj,
                        atol=1e-8)
        assert_allclose(get_single(res.uncentered_tss, i - 1),
                        wls.uncentered_tss)


@pytest.mark.parametrize('cov_type', ['nonrobust', 'HC0'])
@pytest.mark.parametrize('use_t', [None, True, False])
def test_against_wls_inference(data, use_t, cov_type):
    y, x, w = data
    mod = RollingWLS(y, x, window=100, weights=w)
    res = mod.fit(use_t=use_t, cov_type=cov_type)
    ci_cols = ci = res.conf_int()
    test_cols = x.shape[1] > 3

    # This is a smoke test of cov_params to make sure it works
    res.cov_params()

    if test_cols:
        ci_cols = res.conf_int(cols=[0, 2])
    # Skip to improve performance
    for i in range(100, y.shape[0]):
        _y = get_sub(y, i, 100)
        _x = get_sub(x, i, 100)
        wls = WLS(_y, _x, missing='drop').fit(use_t=use_t, cov_type=cov_type)
        assert_allclose(get_single(res.tvalues, i - 1), wls.tvalues)
        assert_allclose(get_single(res.bse, i - 1), wls.bse)
        assert_allclose(get_single(res.pvalues, i - 1), wls.pvalues, atol=1e-8)
        assert_allclose(get_single(res.fvalue, i - 1), wls.fvalue)
        with np.errstate(invalid='ignore'):
            assert_allclose(get_single(res.f_pvalue, i - 1), wls.f_pvalue,
                            atol=1e-8)
        assert res.cov_type == wls.cov_type
        assert res.use_t == wls.use_t
        wls_ci = wls.conf_int()
        if isinstance(ci, pd.DataFrame):
            ci_val = ci.iloc[i - 1]
            ci_val = np.asarray(ci_val).reshape((-1, 2))
        else:
            ci_val = ci[i - 1].T
        assert_allclose(ci_val, wls_ci)
        if test_cols:
            wls_ci = wls.conf_int(cols=[0, 2])
            if isinstance(ci_cols, pd.DataFrame):
                ci_val = ci_cols.iloc[i - 1]
                ci_val = np.asarray(ci_val).reshape((-1, 2))
            else:
                ci_val = ci_cols[i - 1].T
            assert_allclose(ci_val, wls_ci)


def test_raise(data):
    y, x, w = data

    mod = RollingWLS(y, x, window=100, missing='drop', weights=w)
    res = mod.fit()
    params = np.asarray(res.params)
    assert np.all(np.isfinite(params[99:]))

    if not np.any(np.isnan(y)):
        return
    mod = RollingWLS(y, x, window=100, missing='skip')
    res = mod.fit()
    params = np.asarray(res.params)
    assert np.any(np.isnan(params[100:]))


def test_error():
    y, x, _ = gen_data(250, 2, True)
    with pytest.raises(ValueError, match='reset must be a positive integer'):
        RollingWLS(y, x, ).fit(reset=-1)
    with pytest.raises(ValueError):
        RollingWLS(y, x).fit(method='unknown')
    with pytest.raises(ValueError, match='min_nobs must be larger'):
        RollingWLS(y, x, min_nobs=1)
    with pytest.raises(ValueError, match='min_nobs must be larger'):
        RollingWLS(y, x, window=60, min_nobs=100)


def test_save_load(data):
    y, x, w = data
    res = RollingOLS(y, x, window=60).fit()
    fh = BytesIO()
    # test wrapped results load save pickle
    res.save(fh)
    fh.seek(0, 0)
    res_unpickled = res.__class__.load(fh)
    assert type(res_unpickled) is type(res)  # noqa: E721

    fh = BytesIO()
    # test wrapped results load save pickle
    res.save(fh, remove_data=True)
    fh.seek(0, 0)
    res_unpickled = res.__class__.load(fh)
    assert type(res_unpickled) is type(res)  # noqa: E721


def test_formula():
    y, x, w = gen_data(250, 3, True, pandas=True)
    fmla = 'y ~ 1 + x0 + x1 + x2'
    data = pd.concat([y, x], axis=1)
    mod = RollingWLS.from_formula(fmla, window=100, data=data, weights=w)
    res = mod.fit()
    alt = RollingWLS(y, x, window=100)
    alt_res = alt.fit()
    assert_allclose(res.params, alt_res.params)
    ols_mod = RollingOLS.from_formula(fmla, window=100, data=data)
    ols_mod.fit()


@pytest.mark.matplotlib
def test_plot():
    import matplotlib.pyplot as plt

    y, x, w = gen_data(250, 3, True, pandas=True)
    fmla = 'y ~ 1 + x0 + x1 + x2'
    data = pd.concat([y, x], axis=1)
    mod = RollingWLS.from_formula(fmla, window=100, data=data, weights=w)
    res = mod.fit()
    fig = res.plot_recursive_coefficient()
    assert isinstance(fig, plt.Figure)
    res.plot_recursive_coefficient(variables=2, alpha=None,
                                   figsize=(30, 7))
    res.plot_recursive_coefficient(variables='x0', alpha=None,
                                   figsize=(30, 7))
    res.plot_recursive_coefficient(variables=[0, 2], alpha=None,
                                   figsize=(30, 7))
    res.plot_recursive_coefficient(variables=['x0'], alpha=None,
                                   figsize=(30, 7))
    res.plot_recursive_coefficient(variables=['x0', 'x1', 'x2'], alpha=None,
                                   figsize=(30, 7))
    with pytest.raises(ValueError, match='variable x4 is not an integer'):
        res.plot_recursive_coefficient(variables='x4')

    fig = plt.Figure()
    with pytest.warns(UserWarning, match="tight_layout"):
        out = res.plot_recursive_coefficient(fig=fig)
    assert out is fig
    res.plot_recursive_coefficient(alpha=None, figsize=(30, 7))


def test_methods(basic_data):
    y, x, _ = basic_data
    mod = RollingOLS(y, x, 150)
    res_inv = mod.fit(method='inv')
    res_lstsq = mod.fit(method='lstsq')
    res_pinv = mod.fit(method='pinv')
    assert_allclose(res_inv.params, res_lstsq.params)
    assert_allclose(res_inv.params, res_pinv.params)


def test_min_nobs(basic_data):
    y, x, w = basic_data
    if not np.any(np.isnan(np.asarray(x))):
        return
    mod = RollingOLS(y, x, 150)
    res = mod.fit()
    # Ensures that the constraint binds
    min_nobs = res.nobs[res.nobs != 0].min() + 1
    mod = RollingOLS(y, x, 150, min_nobs=min_nobs)
    res = mod.fit()
    assert np.all(res.nobs[res.nobs != 0] >= min_nobs)