Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_concentrated.py

"""
Tests for concentrating the scale out of the loglikelihood function

Note: the univariate cases is well tested in test_sarimax.py

Author: Chad Fulton
License: Simplified-BSD
"""

import numpy as np
import pandas as pd
from statsmodels.tools.tools import Bunch
from .results import results_varmax
from statsmodels.tsa.statespace import sarimax, varmax
from numpy.testing import assert_raises, assert_allclose


def get_sarimax_models(endog, filter_univariate=False, **kwargs):
    kwargs.setdefault('tolerance', 0)
    # Construct a concentrated version of the given SARIMAX model, and get
    # the estimate of the scale
    mod_conc = sarimax.SARIMAX(endog, **kwargs)
    mod_conc.ssm.filter_concentrated = True
    mod_conc.ssm.filter_univariate = filter_univariate
    params_conc = mod_conc.start_params
    params_conc[-1] = 1
    res_conc = mod_conc.smooth(params_conc)
    scale = res_conc.scale

    # Construct the non-concentrated version
    mod_orig = sarimax.SARIMAX(endog, **kwargs)
    mod_orig.ssm.filter_univariate = filter_univariate
    params_orig = params_conc.copy()
    k_vars = 1 + kwargs.get('measurement_error', False)
    params_orig[-k_vars:] = scale * params_conc[-k_vars:]
    res_orig = mod_orig.smooth(params_orig)

    return Bunch(**{'mod_conc': mod_conc, 'params_conc': params_conc,
                    'mod_orig': mod_orig, 'params_orig': params_orig,
                    'res_conc': res_conc, 'res_orig': res_orig,
                    'scale': scale})


def test_concentrated_loglike_sarimax():
    # Note: we will not use the "concentrate_scale" option to SARIMAX for this
    # test, which is a lower-level test of the Kalman filter using the SARIMAX
    # model as an example
    nobs = 30
    np.random.seed(28953)
    endog = np.random.normal(size=nobs)
    kwargs = {}

    # Typical model
    out = get_sarimax_models(endog)
    assert_allclose(out.res_conc.llf, out.res_orig.llf)
    assert_allclose(out.res_conc.llf_obs, out.res_orig.llf_obs)
    assert_allclose(out.mod_conc.loglike(out.params_conc),
                    out.mod_orig.loglike(out.params_orig))
    assert_allclose(out.mod_conc.loglikeobs(out.params_conc),
                    out.mod_orig.loglikeobs(out.params_orig))

    # Add missing entries
    endog[2:10] = np.nan
    out = get_sarimax_models(endog)
    assert_allclose(out.res_conc.llf, out.res_orig.llf)
    assert_allclose(out.res_conc.llf_obs, out.res_orig.llf_obs)
    assert_allclose(out.mod_conc.loglike(out.params_conc),
                    out.mod_orig.loglike(out.params_orig))
    assert_allclose(out.mod_conc.loglikeobs(out.params_conc),
                    out.mod_orig.loglikeobs(out.params_orig))

    # Add seasonal differencing
    # Note: now, due to differences in approximate diffuse initialization,
    # we may have differences in the first 2 observations (but notice that
    # this does not affect the computed joint log-likelihood because those
    # observations are not included there)
    kwargs['seasonal_order'] = (1, 1, 1, 2)
    out = get_sarimax_models(endog, **kwargs)
    assert_allclose(out.res_conc.llf, out.res_orig.llf)
    assert_allclose(out.res_conc.llf_obs[2:], out.res_orig.llf_obs[2:])
    assert_allclose(out.mod_conc.loglike(out.params_conc),
                    out.mod_orig.loglike(out.params_orig))
    assert_allclose(out.mod_conc.loglikeobs(out.params_conc)[2:],
                    out.mod_orig.loglikeobs(out.params_orig)[2:])

    # Add loglikelihood burn, trend, and exog
    kwargs['loglikelihood_burn'] = 5
    kwargs['trend'] = 'c'
    kwargs['exog'] = np.ones(nobs)
    out = get_sarimax_models(endog, **kwargs)
    assert_allclose(out.res_conc.llf, out.res_orig.llf)
    assert_allclose(out.res_conc.llf_obs[2:], out.res_orig.llf_obs[2:])
    assert_allclose(out.mod_conc.loglike(out.params_conc),
                    out.mod_orig.loglike(out.params_orig))
    assert_allclose(out.mod_conc.loglikeobs(out.params_conc)[2:],
                    out.mod_orig.loglikeobs(out.params_orig)[2:])


def test_concentrated_predict_sarimax():
    # Note: we will not use the "concentrate_scale" option to SARIMAX for this
    # test, which is a lower-level test of the Kalman filter using the SARIMAX
    # model as an example
    nobs = 30
    np.random.seed(28953)
    endog = np.random.normal(size=nobs)

    # Typical model
    out = get_sarimax_models(endog)
    assert_allclose(out.res_conc.predict(), out.res_orig.predict())
    assert_allclose(out.res_conc.forecast(5), out.res_orig.forecast(5))
    assert_allclose(out.res_conc.predict(start=0, end=45, dynamic=10),
                    out.res_orig.predict(start=0, end=45, dynamic=10))


def test_fixed_scale_sarimax():
    # Test that the fixed_scale context manager works
    nobs = 30
    np.random.seed(28953)
    endog = np.random.normal(size=nobs)
    kwargs = {
        'seasonal_order': (1, 1, 1, 2),
        'trend': 'ct',
        'exog': np.ones(nobs)
    }

    # Construct a concentrated version of the given SARIMAX model
    mod_conc = sarimax.SARIMAX(endog, concentrate_scale=True, **kwargs)

    # Construct the non-concentrated version
    mod_orig = sarimax.SARIMAX(endog, **kwargs)

    # Modify the scale parameter
    params = mod_orig.start_params
    params[-1] *= 1.2

    # Test that these are not equal in the default computation
    assert_raises(AssertionError, assert_allclose,
                  mod_conc.loglike(params[:-1]), mod_orig.loglike(params))

    # Now test that the llf is equal when we use the fixed_scale decorator
    with mod_conc.ssm.fixed_scale(params[-1]):
        res1 = mod_conc.smooth(params[:-1])
        llf1 = mod_conc.loglike(params[:-1])
        llf_obs1 = mod_conc.loglikeobs(params[:-1])

    res2 = mod_orig.smooth(params)
    llf2 = mod_orig.loglike(params)
    llf_obs2 = mod_orig.loglikeobs(params)

    assert_allclose(res1.llf, res2.llf)
    assert_allclose(res1.llf_obs[2:], res2.llf_obs[2:])
    assert_allclose(llf1, llf2)
    assert_allclose(llf_obs1, llf_obs2)


def check_concentrated_scale(filter_univariate=False, missing=False, **kwargs):
    # Test that concentrating the scale out of the likelihood function works
    index = pd.date_range('1960-01-01', '1982-10-01', freq='QS')
    dta = pd.DataFrame(results_varmax.lutkepohl_data,
                       columns=['inv', 'inc', 'consump'], index=index)
    dta['dln_inv'] = np.log(dta['inv']).diff()
    dta['dln_inc'] = np.log(dta['inc']).diff()
    dta['dln_consump'] = np.log(dta['consump']).diff()

    endog = dta.loc['1960-04-01':'1978-10-01', ['dln_inv', 'dln_inc']]

    # Optionally add some missing observations
    if missing:
        endog.iloc[0, 0] = np.nan
        endog.iloc[3:5, :] = np.nan
        endog.iloc[8, 1] = np.nan

    # Sometimes we can have slight differences if the Kalman filters
    # converge at different observations, so disable convergence.
    kwargs.update({'tolerance': 0})

    mod_orig = varmax.VARMAX(endog, **kwargs)
    mod_conc = varmax.VARMAX(endog, **kwargs)
    mod_conc.ssm.filter_concentrated = True

    mod_orig.ssm.filter_univariate = filter_univariate
    mod_conc.ssm.filter_univariate = filter_univariate

    # Since VARMAX does not explicitly allow concentrating out the scale, for
    # now we will simulate it by setting the first variance to be 1.
    # Note that start_scale will not be the scale used for the non-concentrated
    # model, because we need to use the MLE scale estimated by the
    # concentrated model.
    conc_params = mod_conc.start_params
    start_scale = conc_params[mod_conc._params_state_cov][0]
    if kwargs.get('error_cov_type', 'unstructured') == 'diagonal':
        conc_params[mod_conc._params_state_cov] /= start_scale
    else:
        conc_params[mod_conc._params_state_cov] /= start_scale**0.5

    # Concentrated model smoothing
    res_conc = mod_conc.smooth(conc_params)
    scale = res_conc.scale

    # Map the concentrated parameters to the non-concentrated model
    orig_params = conc_params.copy()
    if kwargs.get('error_cov_type', 'unstructured') == 'diagonal':
        orig_params[mod_orig._params_state_cov] *= scale
    else:
        orig_params[mod_orig._params_state_cov] *= scale**0.5

    # Measurement error variances also get rescaled
    orig_params[mod_orig._params_obs_cov] *= scale

    # Non-oncentrated model smoothing
    res_orig = mod_orig.smooth(orig_params)

    # Test loglike
    # Need to reduce the tolerance when we have measurement error.
    assert_allclose(res_conc.llf, res_orig.llf)

    # Test state space representation matrices
    for name in mod_conc.ssm.shapes:
        if name == 'obs':
            continue
        assert_allclose(getattr(res_conc.filter_results, name),
                        getattr(res_orig.filter_results, name))

    # Test filter / smoother output
    scale = res_conc.scale
    d = res_conc.loglikelihood_burn

    filter_attr = ['predicted_state', 'filtered_state', 'forecasts',
                   'forecasts_error', 'kalman_gain']

    for name in filter_attr:
        actual = getattr(res_conc.filter_results, name)
        desired = getattr(res_orig.filter_results, name)
        assert_allclose(actual, desired, atol=1e-7)

    # Note: do not want to compare the elements from any diffuse
    # initialization for things like covariances, so only compare for
    # periods past the loglikelihood_burn period
    filter_attr_burn = ['standardized_forecasts_error',
                        'predicted_state_cov', 'filtered_state_cov',
                        'tmp1', 'tmp2', 'tmp3', 'tmp4']

    for name in filter_attr_burn:
        actual = getattr(res_conc.filter_results, name)[..., d:]
        desired = getattr(res_orig.filter_results, name)[..., d:]
        assert_allclose(actual, desired, atol=1e-7)

    smoothed_attr = ['smoothed_state', 'smoothed_state_cov',
                     'smoothed_state_autocov',
                     'smoothed_state_disturbance',
                     'smoothed_state_disturbance_cov',
                     'smoothed_measurement_disturbance',
                     'smoothed_measurement_disturbance_cov',
                     'scaled_smoothed_estimator',
                     'scaled_smoothed_estimator_cov', 'smoothing_error',
                     'smoothed_forecasts', 'smoothed_forecasts_error',
                     'smoothed_forecasts_error_cov']

    for name in smoothed_attr:
        actual = getattr(res_conc.filter_results, name)
        desired = getattr(res_orig.filter_results, name)
        assert_allclose(actual, desired, atol=1e-7)

    # Test prediction output
    nobs = mod_conc.nobs
    pred_conc = res_conc.get_prediction(start=10, end=nobs + 50, dynamic=40)
    pred_orig = res_conc.get_prediction(start=10, end=nobs + 50, dynamic=40)

    assert_allclose(pred_conc.predicted_mean, pred_orig.predicted_mean)
    assert_allclose(pred_conc.se_mean, pred_orig.se_mean)


def test_concentrated_scale_conventional():
    check_concentrated_scale(filter_univariate=False)
    check_concentrated_scale(filter_univariate=False, measurement_error=True)
    check_concentrated_scale(filter_univariate=False,
                             error_cov_type='diagonal')
    check_concentrated_scale(filter_univariate=False, missing=True)
    check_concentrated_scale(filter_univariate=False, missing=True,
                             loglikelihood_burn=10)


def test_concentrated_scale_univariate():
    check_concentrated_scale(filter_univariate=True)
    check_concentrated_scale(filter_univariate=True, measurement_error=True)
    check_concentrated_scale(filter_univariate=True, error_cov_type='diagonal')
    check_concentrated_scale(filter_univariate=True, missing=True)
    check_concentrated_scale(filter_univariate=True, missing=True,
                             loglikelihood_burn=10)