Repository URL to install this package:
|
Version:
0.10.2 ▾
|
"""
Tests for structural time series models
Author: Chad Fulton
License: Simplified-BSD
"""
from __future__ import division, absolute_import, print_function
from statsmodels.compat import PY3
import warnings
import numpy as np
from numpy.testing import assert_equal, assert_allclose
import pandas as pd
import pytest
from statsmodels.datasets import macrodata
from statsmodels.tools.sm_exceptions import SpecificationWarning
from statsmodels.tsa.statespace import structural
from statsmodels.tsa.statespace.structural import UnobservedComponents
from statsmodels.tsa.statespace.tests.results import results_structural
dta = macrodata.load_pandas().data
dta.index = pd.date_range(start='1959-01-01', end='2009-07-01', freq='QS')
def run_ucm(name):
true = getattr(results_structural, name)
for model in true['models']:
kwargs = model.copy()
kwargs.update(true['kwargs'])
# Make a copy of the data
values = dta.copy()
freq = kwargs.pop('freq', None)
if freq is not None:
values.index = pd.date_range(start='1959-01-01', periods=len(dta),
freq=freq)
# Test pandas exog
if 'exog' in kwargs:
# Default value here is pd.Series object
exog = np.log(values['realgdp'])
# Also allow a check with a 1-dim numpy array
if kwargs['exog'] == 'numpy':
exog = exog.values.squeeze()
kwargs['exog'] = exog
# Create the model
mod = UnobservedComponents(values['unemp'], **kwargs)
# Smoke test for starting parameters, untransform, transform
# Also test that transform and untransform are inverses
mod.start_params
roundtrip = mod.transform_params(
mod.untransform_params(mod.start_params))
assert_allclose(mod.start_params, roundtrip)
# Fit the model at the true parameters
res_true = mod.filter(true['params'])
# Check that the cycle bounds were computed correctly
freqstr = freq[0] if freq is not None else values.index.freqstr[0]
if 'cycle_period_bounds' in kwargs:
cycle_period_bounds = kwargs['cycle_period_bounds']
elif freqstr == 'A':
cycle_period_bounds = (1.5, 12)
elif freqstr == 'Q':
cycle_period_bounds = (1.5*4, 12*4)
elif freqstr == 'M':
cycle_period_bounds = (1.5*12, 12*12)
else:
# If we have no information on data frequency, require the
# cycle frequency to be between 0 and pi
cycle_period_bounds = (2, np.inf)
# Test that the cycle frequency bound is correct
assert_equal(mod.cycle_frequency_bound,
(2*np.pi / cycle_period_bounds[1],
2*np.pi / cycle_period_bounds[0]))
# Test that the likelihood is correct
rtol = true.get('rtol', 1e-7)
atol = true.get('atol', 0)
assert_allclose(res_true.llf, true['llf'], rtol=rtol, atol=atol)
# Optional smoke test for plot_components
try:
import matplotlib.pyplot as plt
try:
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
except ImportError:
pass
fig = plt.figure()
res_true.plot_components(fig=fig)
except ImportError:
pass
# Now fit the model via MLE
with warnings.catch_warnings(record=True):
res = mod.fit(disp=-1)
# If we found a higher likelihood, no problem; otherwise check
# that we're very close to that found by R
if res.llf <= true['llf']:
assert_allclose(res.llf, true['llf'], rtol=1e-4)
# Smoke test for summary
res.summary()
def test_irregular(close_figures):
run_ucm('irregular')
def test_fixed_intercept(close_figures):
# Clear warnings
structural.__warningregistry__ = {}
warning = SpecificationWarning if PY3 else None
match = 'Specified model does not contain' if PY3 else None
with pytest.warns(warning, match=match):
run_ucm('fixed_intercept')
def test_deterministic_constant(close_figures):
run_ucm('deterministic_constant')
def test_random_walk(close_figures):
run_ucm('random_walk')
def test_local_level(close_figures):
run_ucm('local_level')
def test_fixed_slope(close_figures):
warning = SpecificationWarning if PY3 else None
match = 'irregular component added' if PY3 else None
with pytest.warns(warning, match=match):
run_ucm('fixed_slope')
def test_fixed_slope_warn(close_figures):
# Clear warnings
structural.__warningregistry__ = {}
warning = SpecificationWarning if PY3 else None
match = 'irregular component added' if PY3 else None
with pytest.warns(warning, match=match):
run_ucm('fixed_slope')
def test_deterministic_trend(close_figures):
run_ucm('deterministic_trend')
def test_random_walk_with_drift(close_figures):
run_ucm('random_walk_with_drift')
def test_local_linear_deterministic_trend(close_figures):
run_ucm('local_linear_deterministic_trend')
def test_local_linear_trend(close_figures):
run_ucm('local_linear_trend')
def test_smooth_trend(close_figures):
run_ucm('smooth_trend')
def test_random_trend(close_figures):
run_ucm('random_trend')
def test_cycle(close_figures):
run_ucm('cycle')
def test_seasonal(close_figures):
run_ucm('seasonal')
def test_freq_seasonal(close_figures):
run_ucm('freq_seasonal')
def test_reg(close_figures):
run_ucm('reg')
def test_rtrend_ar1(close_figures):
run_ucm('rtrend_ar1')
@pytest.mark.slow
def test_lltrend_cycle_seasonal_reg_ar1(close_figures):
run_ucm('lltrend_cycle_seasonal_reg_ar1')
def test_mle_reg():
endog = np.arange(100)*1.0
exog = endog*2
# Make the fit not-quite-perfect
endog[::2] += 0.01
endog[1::2] -= 0.01
with warnings.catch_warnings(record=True):
mod1 = UnobservedComponents(endog, irregular=True,
exog=exog, mle_regression=False)
res1 = mod1.fit(disp=-1)
mod2 = UnobservedComponents(endog, irregular=True,
exog=exog, mle_regression=True)
res2 = mod2.fit(disp=-1)
assert_allclose(res1.regression_coefficients.filtered[0, -1],
0.5,
atol=1e-5)
assert_allclose(res2.params[1], 0.5, atol=1e-5)
def test_specifications():
# Clear warnings
structural.__warningregistry__ = {}
endog = [1, 2]
# Test that when nothing specified, a warning is issued and the model that
# is fit is one with irregular=True and nothing else.
warning = SpecificationWarning if PY3 else None
match = 'irregular component added' if PY3 else None
with pytest.warns(warning, match=match):
mod = UnobservedComponents(endog)
assert_equal(mod.trend_specification, 'irregular')
# Test an invalid string trend specification
with pytest.raises(ValueError):
UnobservedComponents(endog, 'invalid spec')
# Test that if a trend component is specified without a level component,
# a warning is issued and a deterministic level component is added
warning = SpecificationWarning if PY3 else None
match = 'Trend component specified without' if PY3 else None
with pytest.warns(warning, match=match):
mod = UnobservedComponents(endog, trend=True, irregular=True)
assert_equal(mod.trend_specification, 'deterministic trend')
# Test that if a string specification is provided, a warning is issued if
# the boolean attributes are also specified
trend_attributes = ['irregular', 'trend', 'stochastic_level',
'stochastic_trend']
for attribute in trend_attributes:
kwargs = {attribute: True}
warning = SpecificationWarning if PY3 else None
match = 'may be overridden when the trend' if PY3 else None
with pytest.warns(warning, match=match):
UnobservedComponents(endog, 'deterministic trend', **kwargs)
# Test that a seasonal with period less than two is invalid
with pytest.raises(ValueError):
UnobservedComponents(endog, seasonal=1)
def test_start_params():
# Test that the behavior is correct for multiple exogenous and / or
# autoregressive components
# Parameters
nobs = int(1e4)
beta = np.r_[10, -2]
phi = np.r_[0.5, 0.1]
# Generate data
np.random.seed(1234)
exog = np.c_[np.ones(nobs), np.arange(nobs)*1.0]
eps = np.random.normal(size=nobs)
endog = np.zeros(nobs+2)
for t in range(1, nobs):
endog[t+1] = phi[0] * endog[t] + phi[1] * endog[t-1] + eps[t]
endog = endog[2:]
endog += np.dot(exog, beta)
# Now just test that the starting parameters are approximately what they
# ought to be (could make this arbitrarily precise by increasing nobs,
# but that would slow down the test for no real gain)
mod = UnobservedComponents(endog, exog=exog, autoregressive=2)
assert_allclose(mod.start_params, [1., 0.5, 0.1, 10, -2], atol=1e-1)
def test_forecast():
endog = np.arange(50) + 10
exog = np.arange(50)
mod = UnobservedComponents(endog, exog=exog, level='dconstant', seasonal=4)
res = mod.smooth([1e-15, 0, 1])
actual = res.forecast(10, exog=np.arange(50, 60)[:, np.newaxis])
desired = np.arange(50, 60) + 10
assert_allclose(actual, desired)
def test_misc_exog():
# Tests for missing data
nobs = 20
k_endog = 1
np.random.seed(1208)
endog = np.random.normal(size=(nobs, k_endog))
endog[:4, 0] = np.nan
exog1 = np.random.normal(size=(nobs, 1))
exog2 = np.random.normal(size=(nobs, 2))
index = pd.date_range('1970-01-01', freq='QS', periods=nobs)
endog_pd = pd.DataFrame(endog, index=index)
exog1_pd = pd.Series(exog1.squeeze(), index=index)
exog2_pd = pd.DataFrame(exog2, index=index)
models = [
UnobservedComponents(endog, 'llevel', exog=exog1),
UnobservedComponents(endog, 'llevel', exog=exog2),
UnobservedComponents(endog, 'llevel', exog=exog2),
UnobservedComponents(endog_pd, 'llevel', exog=exog1_pd),
UnobservedComponents(endog_pd, 'llevel', exog=exog2_pd),
UnobservedComponents(endog_pd, 'llevel', exog=exog2_pd),
]
for mod in models:
# Smoke tests
mod.start_params
res = mod.fit(disp=False)
res.summary()
res.predict()
res.predict(dynamic=True)
res.get_prediction()
oos_exog = np.random.normal(size=(1, mod.k_exog))
res.forecast(steps=1, exog=oos_exog)
res.get_forecast(steps=1, exog=oos_exog)
# Smoke tests for invalid exog
oos_exog = np.random.normal(size=(1))
with pytest.raises(ValueError):
res.forecast(steps=1, exog=oos_exog)
oos_exog = np.random.normal(size=(2, mod.k_exog))
with pytest.raises(ValueError):
res.forecast(steps=1, exog=oos_exog)
oos_exog = np.random.normal(size=(1, mod.k_exog + 1))
with pytest.raises(ValueError):
res.forecast(steps=1, exog=oos_exog)
# Test invalid model specifications
with pytest.raises(ValueError):
UnobservedComponents(endog, 'llevel', exog=np.zeros((10, 4)))
def test_predict_custom_index():
np.random.seed(328423)
endog = pd.DataFrame(np.random.normal(size=50))
mod = structural.UnobservedComponents(endog, 'llevel')
res = mod.smooth(mod.start_params)
out = res.predict(start=1, end=1, index=['a'])
assert_equal(out.index.equals(pd.Index(['a'])), True)
def test_matrices_somewhat_complicated_model():
values = dta.copy()
model = UnobservedComponents(values['unemp'],
level='lltrend',
freq_seasonal=[{'period': 4},
{'period': 9, 'harmonics': 3}],
cycle=True,
cycle_period_bounds=[2, 30],
damped_cycle=True,
stochastic_freq_seasonal=[True, False],
stochastic_cycle=True
)
# Selected parameters
params = [1, # irregular_var
3, 4, # lltrend parameters: level_var, trend_var
5, # freq_seasonal parameters: freq_seasonal_var_0
# cycle parameters: cycle_var, cycle_freq, cycle_damp
6, 2*np.pi/30., .9
]
model.update(params)
# Check scalar properties
assert_equal(model.k_states, 2 + 4 + 6 + 2)
assert_equal(model.k_state_cov, 2 + 1 + 0 + 1)
assert_equal(model.loglikelihood_burn, 2 + 4 + 6 + 2)
assert_allclose(model.ssm.k_posdef, 2 + 4 + 0 + 2)
assert_equal(model.k_params, len(params))
# Check the statespace model matrices against hand-constructed answers
# We group the terms by the component
expected_design = np.r_[[1, 0],
[1, 0, 1, 0],
[1, 0, 1, 0, 1, 0],
[1, 0]].reshape(1, 14)
assert_allclose(model.ssm.design[:, :, 0], expected_design)
expected_transition = __direct_sum([
np.array([[1, 1],
[0, 1]]),
np.array([[0, 1, 0, 0],
[-1, 0, 0, 0],
[0, 0, -1, 0],
[0, 0, 0, -1]]),
np.array([[np.cos(2*np.pi*1/9.), np.sin(2*np.pi*1/9.), 0, 0, 0, 0],
[-np.sin(2*np.pi*1/9.), np.cos(2*np.pi*1/9.), 0, 0, 0, 0],
[0, 0, np.cos(2*np.pi*2/9.), np.sin(2*np.pi*2/9.), 0, 0],
[0, 0, -np.sin(2*np.pi*2/9.), np.cos(2*np.pi*2/9.), 0, 0],
[0, 0, 0, 0, np.cos(2*np.pi/3.), np.sin(2*np.pi/3.)],
[0, 0, 0, 0, -np.sin(2*np.pi/3.), np.cos(2*np.pi/3.)]]),
np.array([[.9*np.cos(2*np.pi/30.), .9*np.sin(2*np.pi/30.)],
[-.9*np.sin(2*np.pi/30.), .9*np.cos(2*np.pi/30.)]])
])
assert_allclose(
model.ssm.transition[:, :, 0], expected_transition, atol=1e-7)
# Since the second seasonal term is not stochastic,
# the dimensionality of the state disturbance is 14 - 6 = 8
expected_selection = np.zeros((14, 14 - 6))
expected_selection[0:2, 0:2] = np.eye(2)
expected_selection[2:6, 2:6] = np.eye(4)
expected_selection[-2:, -2:] = np.eye(2)
assert_allclose(model.ssm.selection[:, :, 0], expected_selection)
expected_state_cov = __direct_sum([
np.diag(params[1:3]),
np.eye(4)*params[3],
np.eye(2)*params[4]
])
assert_allclose(model.ssm.state_cov[:, :, 0], expected_state_cov)
def __direct_sum(square_matrices):
"""Compute the matrix direct sum of an iterable of square numpy 2-d arrays
"""
new_shape = np.sum([m.shape for m in square_matrices], axis=0)
new_array = np.zeros(new_shape)
offset = 0
for m in square_matrices:
rows, cols = m.shape
assert rows == cols
new_array[offset:offset + rows, offset:offset + rows] = m
offset += rows
return new_array