"""
Tests for concentrating the scale out of the loglikelihood function
Note: the univariate cases is well tested in test_sarimax.py
Author: Chad Fulton
License: Simplified-BSD
"""
import numpy as np
import pandas as pd
from statsmodels.tools.tools import Bunch
from .results import results_varmax
from statsmodels.tsa.statespace import sarimax, varmax
from numpy.testing import assert_raises, assert_allclose
def get_sarimax_models(endog, filter_univariate=False, **kwargs):
kwargs.setdefault('tolerance', 0)
# Construct a concentrated version of the given SARIMAX model, and get
# the estimate of the scale
mod_conc = sarimax.SARIMAX(endog, **kwargs)
mod_conc.ssm.filter_concentrated = True
mod_conc.ssm.filter_univariate = filter_univariate
params_conc = mod_conc.start_params
params_conc[-1] = 1
res_conc = mod_conc.smooth(params_conc)
scale = res_conc.scale
# Construct the non-concentrated version
mod_orig = sarimax.SARIMAX(endog, **kwargs)
mod_orig.ssm.filter_univariate = filter_univariate
params_orig = params_conc.copy()
k_vars = 1 + kwargs.get('measurement_error', False)
params_orig[-k_vars:] = scale * params_conc[-k_vars:]
res_orig = mod_orig.smooth(params_orig)
return Bunch(**{'mod_conc': mod_conc, 'params_conc': params_conc,
'mod_orig': mod_orig, 'params_orig': params_orig,
'res_conc': res_conc, 'res_orig': res_orig,
'scale': scale})
def test_concentrated_loglike_sarimax():
# Note: we will not use the "concentrate_scale" option to SARIMAX for this
# test, which is a lower-level test of the Kalman filter using the SARIMAX
# model as an example
nobs = 30
np.random.seed(28953)
endog = np.random.normal(size=nobs)
kwargs = {}
# Typical model
out = get_sarimax_models(endog)
assert_allclose(out.res_conc.llf, out.res_orig.llf)
assert_allclose(out.res_conc.llf_obs, out.res_orig.llf_obs)
assert_allclose(out.mod_conc.loglike(out.params_conc),
out.mod_orig.loglike(out.params_orig))
assert_allclose(out.mod_conc.loglikeobs(out.params_conc),
out.mod_orig.loglikeobs(out.params_orig))
# Add missing entries
endog[2:10] = np.nan
out = get_sarimax_models(endog)
assert_allclose(out.res_conc.llf, out.res_orig.llf)
assert_allclose(out.res_conc.llf_obs, out.res_orig.llf_obs)
assert_allclose(out.mod_conc.loglike(out.params_conc),
out.mod_orig.loglike(out.params_orig))
assert_allclose(out.mod_conc.loglikeobs(out.params_conc),
out.mod_orig.loglikeobs(out.params_orig))
# Add seasonal differencing
# Note: now, due to differences in approximate diffuse initialization,
# we may have differences in the first 2 observations (but notice that
# this does not affect the computed joint log-likelihood because those
# observations are not included there)
kwargs['seasonal_order'] = (1, 1, 1, 2)
out = get_sarimax_models(endog, **kwargs)
assert_allclose(out.res_conc.llf, out.res_orig.llf)
assert_allclose(out.res_conc.llf_obs[2:], out.res_orig.llf_obs[2:])
assert_allclose(out.mod_conc.loglike(out.params_conc),
out.mod_orig.loglike(out.params_orig))
assert_allclose(out.mod_conc.loglikeobs(out.params_conc)[2:],
out.mod_orig.loglikeobs(out.params_orig)[2:])
# Add loglikelihood burn, trend, and exog
kwargs['loglikelihood_burn'] = 5
kwargs['trend'] = 'c'
kwargs['exog'] = np.ones(nobs)
out = get_sarimax_models(endog, **kwargs)
assert_allclose(out.res_conc.llf, out.res_orig.llf)
assert_allclose(out.res_conc.llf_obs[2:], out.res_orig.llf_obs[2:])
assert_allclose(out.mod_conc.loglike(out.params_conc),
out.mod_orig.loglike(out.params_orig))
assert_allclose(out.mod_conc.loglikeobs(out.params_conc)[2:],
out.mod_orig.loglikeobs(out.params_orig)[2:])
def test_concentrated_predict_sarimax():
# Note: we will not use the "concentrate_scale" option to SARIMAX for this
# test, which is a lower-level test of the Kalman filter using the SARIMAX
# model as an example
nobs = 30
np.random.seed(28953)
endog = np.random.normal(size=nobs)
# Typical model
out = get_sarimax_models(endog)
assert_allclose(out.res_conc.predict(), out.res_orig.predict())
assert_allclose(out.res_conc.forecast(5), out.res_orig.forecast(5))
assert_allclose(out.res_conc.predict(start=0, end=45, dynamic=10),
out.res_orig.predict(start=0, end=45, dynamic=10))
def test_fixed_scale_sarimax():
# Test that the fixed_scale context manager works
nobs = 30
np.random.seed(28953)
endog = np.random.normal(size=nobs)
kwargs = {
'seasonal_order': (1, 1, 1, 2),
'trend': 'ct',
'exog': np.ones(nobs)
}
# Construct a concentrated version of the given SARIMAX model
mod_conc = sarimax.SARIMAX(endog, concentrate_scale=True, **kwargs)
# Construct the non-concentrated version
mod_orig = sarimax.SARIMAX(endog, **kwargs)
# Modify the scale parameter
params = mod_orig.start_params
params[-1] *= 1.2
# Test that these are not equal in the default computation
assert_raises(AssertionError, assert_allclose,
mod_conc.loglike(params[:-1]), mod_orig.loglike(params))
# Now test that the llf is equal when we use the fixed_scale decorator
with mod_conc.ssm.fixed_scale(params[-1]):
res1 = mod_conc.smooth(params[:-1])
llf1 = mod_conc.loglike(params[:-1])
llf_obs1 = mod_conc.loglikeobs(params[:-1])
res2 = mod_orig.smooth(params)
llf2 = mod_orig.loglike(params)
llf_obs2 = mod_orig.loglikeobs(params)
assert_allclose(res1.llf, res2.llf)
assert_allclose(res1.llf_obs[2:], res2.llf_obs[2:])
assert_allclose(llf1, llf2)
assert_allclose(llf_obs1, llf_obs2)
def check_concentrated_scale(filter_univariate=False, missing=False, **kwargs):
# Test that concentrating the scale out of the likelihood function works
index = pd.date_range('1960-01-01', '1982-10-01', freq='QS')
dta = pd.DataFrame(results_varmax.lutkepohl_data,
columns=['inv', 'inc', 'consump'], index=index)
dta['dln_inv'] = np.log(dta['inv']).diff()
dta['dln_inc'] = np.log(dta['inc']).diff()
dta['dln_consump'] = np.log(dta['consump']).diff()
endog = dta.loc['1960-04-01':'1978-10-01', ['dln_inv', 'dln_inc']]
# Optionally add some missing observations
if missing:
endog.iloc[0, 0] = np.nan
endog.iloc[3:5, :] = np.nan
endog.iloc[8, 1] = np.nan
# Sometimes we can have slight differences if the Kalman filters
# converge at different observations, so disable convergence.
kwargs.update({'tolerance': 0})
mod_orig = varmax.VARMAX(endog, **kwargs)
mod_conc = varmax.VARMAX(endog, **kwargs)
mod_conc.ssm.filter_concentrated = True
mod_orig.ssm.filter_univariate = filter_univariate
mod_conc.ssm.filter_univariate = filter_univariate
# Since VARMAX does not explicitly allow concentrating out the scale, for
# now we will simulate it by setting the first variance to be 1.
# Note that start_scale will not be the scale used for the non-concentrated
# model, because we need to use the MLE scale estimated by the
# concentrated model.
conc_params = mod_conc.start_params
start_scale = conc_params[mod_conc._params_state_cov][0]
if kwargs.get('error_cov_type', 'unstructured') == 'diagonal':
conc_params[mod_conc._params_state_cov] /= start_scale
else:
conc_params[mod_conc._params_state_cov] /= start_scale**0.5
# Concentrated model smoothing
res_conc = mod_conc.smooth(conc_params)
scale = res_conc.scale
# Map the concentrated parameters to the non-concentrated model
orig_params = conc_params.copy()
if kwargs.get('error_cov_type', 'unstructured') == 'diagonal':
orig_params[mod_orig._params_state_cov] *= scale
else:
orig_params[mod_orig._params_state_cov] *= scale**0.5
# Measurement error variances also get rescaled
orig_params[mod_orig._params_obs_cov] *= scale
# Non-oncentrated model smoothing
res_orig = mod_orig.smooth(orig_params)
# Test loglike
# Need to reduce the tolerance when we have measurement error.
assert_allclose(res_conc.llf, res_orig.llf)
# Test state space representation matrices
for name in mod_conc.ssm.shapes:
if name == 'obs':
continue
assert_allclose(getattr(res_conc.filter_results, name),
getattr(res_orig.filter_results, name))
# Test filter / smoother output
scale = res_conc.scale
d = res_conc.loglikelihood_burn
filter_attr = ['predicted_state', 'filtered_state', 'forecasts',
'forecasts_error', 'kalman_gain']
for name in filter_attr:
actual = getattr(res_conc.filter_results, name)
desired = getattr(res_orig.filter_results, name)
assert_allclose(actual, desired, atol=1e-7)
# Note: do not want to compare the elements from any diffuse
# initialization for things like covariances, so only compare for
# periods past the loglikelihood_burn period
filter_attr_burn = ['standardized_forecasts_error',
'predicted_state_cov', 'filtered_state_cov',
'tmp1', 'tmp2', 'tmp3', 'tmp4']
for name in filter_attr_burn:
actual = getattr(res_conc.filter_results, name)[..., d:]
desired = getattr(res_orig.filter_results, name)[..., d:]
assert_allclose(actual, desired, atol=1e-7)
smoothed_attr = ['smoothed_state', 'smoothed_state_cov',
'smoothed_state_autocov',
'smoothed_state_disturbance',
'smoothed_state_disturbance_cov',
'smoothed_measurement_disturbance',
'smoothed_measurement_disturbance_cov',
'scaled_smoothed_estimator',
'scaled_smoothed_estimator_cov', 'smoothing_error',
'smoothed_forecasts', 'smoothed_forecasts_error',
'smoothed_forecasts_error_cov']
for name in smoothed_attr:
actual = getattr(res_conc.filter_results, name)
desired = getattr(res_orig.filter_results, name)
assert_allclose(actual, desired, atol=1e-7)
# Test prediction output
nobs = mod_conc.nobs
pred_conc = res_conc.get_prediction(start=10, end=nobs + 50, dynamic=40)
pred_orig = res_conc.get_prediction(start=10, end=nobs + 50, dynamic=40)
assert_allclose(pred_conc.predicted_mean, pred_orig.predicted_mean)
assert_allclose(pred_conc.se_mean, pred_orig.se_mean)
def test_concentrated_scale_conventional():
check_concentrated_scale(filter_univariate=False)
check_concentrated_scale(filter_univariate=False, measurement_error=True)
check_concentrated_scale(filter_univariate=False,
error_cov_type='diagonal')
check_concentrated_scale(filter_univariate=False, missing=True)
check_concentrated_scale(filter_univariate=False, missing=True,
loglikelihood_burn=10)
def test_concentrated_scale_univariate():
check_concentrated_scale(filter_univariate=True)
check_concentrated_scale(filter_univariate=True, measurement_error=True)
check_concentrated_scale(filter_univariate=True, error_cov_type='diagonal')
check_concentrated_scale(filter_univariate=True, missing=True)
check_concentrated_scale(filter_univariate=True, missing=True,
loglikelihood_burn=10)