Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_structural.py

"""
Tests for structural time series models

Author: Chad Fulton
License: Simplified-BSD
"""

import warnings

import numpy as np
from numpy.testing import assert_equal, assert_allclose, assert_raises
import pandas as pd
import pytest

from statsmodels.datasets import macrodata
from statsmodels.tools.sm_exceptions import SpecificationWarning
from statsmodels.tsa.statespace import structural
from statsmodels.tsa.statespace.structural import UnobservedComponents
from statsmodels.tsa.statespace.tests.results import results_structural

dta = macrodata.load_pandas().data
dta.index = pd.date_range(start='1959-01-01', end='2009-07-01', freq='QS')


def run_ucm(name, use_exact_diffuse=False):
    true = getattr(results_structural, name)

    for model in true['models']:
        kwargs = model.copy()
        kwargs.update(true['kwargs'])
        kwargs['use_exact_diffuse'] = use_exact_diffuse

        # Make a copy of the data
        values = dta.copy()

        freq = kwargs.pop('freq', None)
        if freq is not None:
            values.index = pd.date_range(start='1959-01-01', periods=len(dta),
                                         freq=freq)

        # Test pandas exog
        if 'exog' in kwargs:
            # Default value here is pd.Series object
            exog = np.log(values['realgdp'])

            # Also allow a check with a 1-dim numpy array
            if kwargs['exog'] == 'numpy':
                exog = exog.values.squeeze()

            kwargs['exog'] = exog

        # Create the model
        mod = UnobservedComponents(values['unemp'], **kwargs)

        # Smoke test for starting parameters, untransform, transform
        # Also test that transform and untransform are inverses
        mod.start_params
        roundtrip = mod.transform_params(
            mod.untransform_params(mod.start_params))
        assert_allclose(mod.start_params, roundtrip)

        # Fit the model at the true parameters
        res_true = mod.filter(true['params'])

        # Check that the cycle bounds were computed correctly
        freqstr = freq[0] if freq is not None else values.index.freqstr[0]
        if 'cycle_period_bounds' in kwargs:
            cycle_period_bounds = kwargs['cycle_period_bounds']
        elif freqstr == 'A':
            cycle_period_bounds = (1.5, 12)
        elif freqstr == 'Q':
            cycle_period_bounds = (1.5*4, 12*4)
        elif freqstr == 'M':
            cycle_period_bounds = (1.5*12, 12*12)
        else:
            # If we have no information on data frequency, require the
            # cycle frequency to be between 0 and pi
            cycle_period_bounds = (2, np.inf)

        # Test that the cycle frequency bound is correct
        assert_equal(mod.cycle_frequency_bound,
                     (2*np.pi / cycle_period_bounds[1],
                      2*np.pi / cycle_period_bounds[0]))

        # Test that the likelihood is correct
        rtol = true.get('rtol', 1e-7)
        atol = true.get('atol', 0)

        if use_exact_diffuse:
            # If we are using exact diffuse initialization, then we need to
            # adjust for the fact that KFAS does not include the constant in
            # the likelihood function for the diffuse periods
            # (see note to test_exact_diffuse_filtering.py for details).
            res_llf = (res_true.llf_obs.sum()
                       + res_true.nobs_diffuse * 0.5 * np.log(2 * np.pi))
        else:
            # If we are using approximate diffuse initialization, then we need
            # to ignore the first period, and this will agree with KFAS (since
            # it does not include the constant in the likelihood function for
            # diffuse periods).
            res_llf = res_true.llf_obs[res_true.loglikelihood_burn:].sum()

        assert_allclose(res_llf, true['llf'], rtol=rtol, atol=atol)

        # Optional smoke test for plot_components
        try:
            import matplotlib.pyplot as plt
            try:
                from pandas.plotting import register_matplotlib_converters
                register_matplotlib_converters()
            except ImportError:
                pass
            fig = plt.figure()
            res_true.plot_components(fig=fig)
        except ImportError:
            pass

        # Now fit the model via MLE
        with warnings.catch_warnings(record=True):
            fit_kwargs = {}
            if 'maxiter' in true:
                fit_kwargs['maxiter'] = true['maxiter']
            res = mod.fit(start_params=true.get('start_params', None),
                          disp=-1, **fit_kwargs)
            # If we found a higher likelihood, no problem; otherwise check
            # that we're very close to that found by R

            # See note above about these computation
            if use_exact_diffuse:
                res_llf = (res.llf_obs.sum()
                           + res.nobs_diffuse * 0.5 * np.log(2 * np.pi))
            else:
                res_llf = res.llf_obs[res_true.loglikelihood_burn:].sum()

            if res_llf <= true['llf']:
                assert_allclose(res_llf, true['llf'], rtol=1e-4)

            # Smoke test for summary
            res.summary()


def test_irregular(close_figures):
    run_ucm('irregular')
    run_ucm('irregular', use_exact_diffuse=True)


def test_fixed_intercept(close_figures):
    # Clear warnings
    structural.__warningregistry__ = {}
    warning = SpecificationWarning
    match = 'Specified model does not contain'
    with pytest.warns(warning, match=match):
        run_ucm('fixed_intercept')
        run_ucm('fixed_intercept', use_exact_diffuse=True)


def test_deterministic_constant(close_figures):
    run_ucm('deterministic_constant')
    run_ucm('deterministic_constant', use_exact_diffuse=True)


def test_random_walk(close_figures):
    run_ucm('random_walk')
    run_ucm('random_walk', use_exact_diffuse=True)


def test_local_level(close_figures):
    run_ucm('local_level')
    run_ucm('local_level', use_exact_diffuse=True)


def test_fixed_slope(close_figures):
    warning = SpecificationWarning
    match = 'irregular component added'
    with pytest.warns(warning, match=match):
        run_ucm('fixed_slope')
        run_ucm('fixed_slope', use_exact_diffuse=True)


def test_fixed_slope_warn(close_figures):
    # Clear warnings
    structural.__warningregistry__ = {}

    warning = SpecificationWarning
    match = 'irregular component added'
    with pytest.warns(warning, match=match):
        run_ucm('fixed_slope')
        run_ucm('fixed_slope', use_exact_diffuse=True)


def test_deterministic_trend(close_figures):
    run_ucm('deterministic_trend')
    run_ucm('deterministic_trend', use_exact_diffuse=True)


def test_random_walk_with_drift(close_figures):
    run_ucm('random_walk_with_drift')
    run_ucm('random_walk_with_drift', use_exact_diffuse=True)


def test_local_linear_deterministic_trend(close_figures):
    run_ucm('local_linear_deterministic_trend')
    run_ucm('local_linear_deterministic_trend', use_exact_diffuse=True)


def test_local_linear_trend(close_figures):
    run_ucm('local_linear_trend')
    run_ucm('local_linear_trend', use_exact_diffuse=True)


def test_smooth_trend(close_figures):
    run_ucm('smooth_trend')
    run_ucm('smooth_trend', use_exact_diffuse=True)


def test_random_trend(close_figures):
    run_ucm('random_trend')
    run_ucm('random_trend', use_exact_diffuse=True)


def test_cycle(close_figures):
    run_ucm('cycle_approx_diffuse')
    run_ucm('cycle', use_exact_diffuse=True)


def test_seasonal(close_figures):
    run_ucm('seasonal_approx_diffuse')
    run_ucm('seasonal', use_exact_diffuse=True)


def test_freq_seasonal(close_figures):
    run_ucm('freq_seasonal_approx_diffuse')
    run_ucm('freq_seasonal', use_exact_diffuse=True)


def test_reg(close_figures):
    run_ucm('reg_approx_diffuse')
    run_ucm('reg', use_exact_diffuse=True)


def test_rtrend_ar1(close_figures):
    run_ucm('rtrend_ar1')
    run_ucm('rtrend_ar1', use_exact_diffuse=True)


@pytest.mark.slow
def test_lltrend_cycle_seasonal_reg_ar1(close_figures):
    run_ucm('lltrend_cycle_seasonal_reg_ar1_approx_diffuse')
    run_ucm('lltrend_cycle_seasonal_reg_ar1', use_exact_diffuse=True)


@pytest.mark.parametrize("use_exact_diffuse", [True, False])
def test_mle_reg(use_exact_diffuse):
    endog = np.arange(100)*1.0
    exog = endog*2
    # Make the fit not-quite-perfect
    endog[::2] += 0.01
    endog[1::2] -= 0.01

    with warnings.catch_warnings(record=True):
        mod1 = UnobservedComponents(endog, irregular=True,
                                    exog=exog, mle_regression=False,
                                    use_exact_diffuse=use_exact_diffuse)
        res1 = mod1.fit(disp=-1)

        mod2 = UnobservedComponents(endog, irregular=True,
                                    exog=exog, mle_regression=True,
                                    use_exact_diffuse=use_exact_diffuse)
        res2 = mod2.fit(disp=-1)

    assert_allclose(res1.regression_coefficients.filtered[0, -1],
                    0.5,
                    atol=1e-5)
    assert_allclose(res2.params[1], 0.5, atol=1e-5)

    # When the regression component is part of the state vector with exact
    # diffuse initialization, we have two diffuse observations
    if use_exact_diffuse:
        print(res1.predicted_diffuse_state_cov)
        assert_equal(res1.nobs_diffuse, 2)
        assert_equal(res2.nobs_diffuse, 0)
    else:
        assert_equal(res1.loglikelihood_burn, 1)
        assert_equal(res2.loglikelihood_burn, 0)


def test_specifications():
    # Clear warnings
    structural.__warningregistry__ = {}

    endog = [1, 2]

    # Test that when nothing specified, a warning is issued and the model that
    # is fit is one with irregular=True and nothing else.
    warning = SpecificationWarning
    match = 'irregular component added'
    with pytest.warns(warning, match=match):
        mod = UnobservedComponents(endog)
        assert_equal(mod.trend_specification, 'irregular')

    # Test an invalid string trend specification
    with pytest.raises(ValueError):
        UnobservedComponents(endog, 'invalid spec')

    # Test that if a trend component is specified without a level component,
    # a warning is issued and a deterministic level component is added
    warning = SpecificationWarning
    match = 'Trend component specified without'
    with pytest.warns(warning, match=match):
        mod = UnobservedComponents(endog, trend=True, irregular=True)
        assert_equal(mod.trend_specification, 'deterministic trend')

    # Test that if a string specification is provided, a warning is issued if
    # the boolean attributes are also specified
    trend_attributes = ['irregular', 'trend', 'stochastic_level',
                        'stochastic_trend']
    for attribute in trend_attributes:
        kwargs = {attribute: True}

        warning = SpecificationWarning
        match = 'may be overridden when the trend'
        with pytest.warns(warning, match=match):
            UnobservedComponents(endog, 'deterministic trend', **kwargs)

    # Test that a seasonal with period less than two is invalid
    with pytest.raises(ValueError):
        UnobservedComponents(endog, seasonal=1)


def test_start_params():
    # Test that the behavior is correct for multiple exogenous and / or
    # autoregressive components

    # Parameters
    nobs = int(1e4)
    beta = np.r_[10, -2]
    phi = np.r_[0.5, 0.1]

    # Generate data
    np.random.seed(1234)
    exog = np.c_[np.ones(nobs), np.arange(nobs)*1.0]
    eps = np.random.normal(size=nobs)
    endog = np.zeros(nobs+2)
    for t in range(1, nobs):
        endog[t+1] = phi[0] * endog[t] + phi[1] * endog[t-1] + eps[t]
Loading ...