"""
Tests for simulation smoothing
Author: Chad Fulton
License: Simplified-BSD
"""
import os
import numpy as np
from numpy.testing import assert_allclose, assert_equal
import pandas as pd
from statsmodels import datasets
from statsmodels.tsa.statespace import mlemodel, sarimax, structural
from statsmodels.tsa.statespace.simulation_smoother import (
SIMULATION_STATE, SIMULATION_DISTURBANCE, SIMULATION_ALL)
current_path = os.path.dirname(os.path.abspath(__file__))
class MultivariateVARKnown(object):
"""
Tests for simulation smoothing values in a couple of special cases of
variates. Both computed values and KFAS values are used for comparison
against the simulation smoother output.
"""
@classmethod
def setup_class(cls, missing=None, test_against_KFAS=True,
*args, **kwargs):
cls.test_against_KFAS = test_against_KFAS
# Data
dta = datasets.macrodata.load_pandas().data
dta.index = pd.date_range(start='1959-01-01', end='2009-7-01',
freq='QS')
obs = np.log(dta[['realgdp', 'realcons', 'realinv']]).diff().iloc[1:]
if missing == 'all':
obs.iloc[0:50, :] = np.nan
elif missing == 'partial':
obs.iloc[0:50, 0] = np.nan
elif missing == 'mixed':
obs.iloc[0:50, 0] = np.nan
obs.iloc[19:70, 1] = np.nan
obs.iloc[39:90, 2] = np.nan
obs.iloc[119:130, 0] = np.nan
obs.iloc[119:130, 2] = np.nan
obs.iloc[-10:, :] = np.nan
if test_against_KFAS:
obs = obs.iloc[:9]
# Create the model
mod = mlemodel.MLEModel(obs, k_states=3, k_posdef=3, **kwargs)
mod['design'] = np.eye(3)
mod['obs_cov'] = np.array([
[0.0000640649, 0., 0.],
[0., 0.0000572802, 0.],
[0., 0., 0.0017088585]])
mod['transition'] = np.array([
[-0.1119908792, 0.8441841604, 0.0238725303],
[0.2629347724, 0.4996718412, -0.0173023305],
[-3.2192369082, 4.1536028244, 0.4514379215]])
mod['selection'] = np.eye(3)
mod['state_cov'] = np.array([
[0.0000640649, 0.0000388496, 0.0002148769],
[0.0000388496, 0.0000572802, 0.000001555],
[0.0002148769, 0.000001555, 0.0017088585]])
mod.initialize_approximate_diffuse(1e6)
mod.ssm.filter_univariate = True
cls.model = mod
cls.results = mod.smooth([], return_ssm=True)
cls.sim = cls.model.simulation_smoother()
def test_loglike(self):
assert_allclose(np.sum(self.results.llf_obs), self.true_llf)
def test_simulate_0(self):
n = 10
# Test with all inputs as zeros
measurement_shocks = np.zeros((n, self.model.k_endog))
state_shocks = np.zeros((n, self.model.ssm.k_posdef))
initial_state = np.zeros(self.model.k_states)
obs, states = self.model.ssm.simulate(
nsimulations=n, measurement_shocks=measurement_shocks,
state_shocks=state_shocks, initial_state=initial_state)
assert_allclose(obs, np.zeros((n, self.model.k_endog)))
assert_allclose(states, np.zeros((n, self.model.k_states)))
def test_simulate_1(self):
n = 10
# Test with np.arange / 10 measurement shocks only
measurement_shocks = np.reshape(
np.arange(n * self.model.k_endog) / 10.,
(n, self.model.k_endog))
state_shocks = np.zeros((n, self.model.ssm.k_posdef))
initial_state = np.zeros(self.model.k_states)
obs, states = self.model.ssm.simulate(
nsimulations=n, measurement_shocks=measurement_shocks,
state_shocks=state_shocks, initial_state=initial_state)
assert_allclose(obs, np.reshape(
np.arange(n * self.model.k_endog) / 10.,
(n, self.model.k_endog)))
assert_allclose(states, np.zeros((n, self.model.k_states)))
def test_simulate_2(self):
n = 10
Z = self.model['design']
T = self.model['transition']
# Test with non-zero state shocks and initial state
measurement_shocks = np.zeros((n, self.model.k_endog))
state_shocks = np.ones((n, self.model.ssm.k_posdef))
initial_state = np.ones(self.model.k_states) * 2.5
obs, states = self.model.ssm.simulate(
nsimulations=n, measurement_shocks=measurement_shocks,
state_shocks=state_shocks, initial_state=initial_state)
desired_obs = np.zeros((n, self.model.k_endog))
desired_state = np.zeros((n, self.model.k_states))
desired_state[0] = initial_state
desired_obs[0] = np.dot(Z, initial_state)
for i in range(1, n):
desired_state[i] = np.dot(T, desired_state[i-1]) + state_shocks[i]
desired_obs[i] = np.dot(Z, desired_state[i])
assert_allclose(obs, desired_obs)
assert_allclose(states, desired_state)
def test_simulation_smoothing_0(self):
# Simulation smoothing when setting all variates to zeros
# In this case:
# - unconditional disturbances are zero, because they are simply
# transformed to have the appropriate variance matrix, but keep the
# same mean - of zero
# - generated states are zeros, because initial state is
# zeros and all state disturbances are zeros
# - generated observations are zeros, because states are zeros and all
# measurement disturbances are zeros
# - The simulated state is equal to the smoothed state from the
# original model, because
# simulated state = (generated state - smoothed generated state +
# smoothed state)
# and here generated state = smoothed generated state = 0
# - The simulated measurement disturbance is equal to the smoothed
# measurement disturbance for very similar reasons, because
# simulated measurement disturbance = (
# generated measurement disturbance -
# smoothed generated measurement disturbance +
# smoothed measurement disturbance)
# and here generated measurement disturbance and
# smoothed generated measurement disturbance are zero.
# - The simulated state disturbance is equal to the smoothed
# state disturbance for exactly the same reason as above.
sim = self.sim
Z = self.model['design']
n_disturbance_variates = (
(self.model.k_endog + self.model.ssm.k_posdef) * self.model.nobs)
# Test against known quantities (see above for description)
sim.simulate(disturbance_variates=np.zeros(n_disturbance_variates),
initial_state_variates=np.zeros(self.model.k_states))
assert_allclose(sim.generated_measurement_disturbance, 0)
assert_allclose(sim.generated_state_disturbance, 0)
assert_allclose(sim.generated_state, 0)
assert_allclose(sim.generated_obs, 0)
assert_allclose(sim.simulated_state, self.results.smoothed_state)
if not self.model.ssm.filter_collapsed:
assert_allclose(sim.simulated_measurement_disturbance,
self.results.smoothed_measurement_disturbance)
assert_allclose(sim.simulated_state_disturbance,
self.results.smoothed_state_disturbance)
# Test against R package KFAS values
if self.test_against_KFAS:
path = os.path.join(current_path, 'results',
'results_simulation_smoothing0.csv')
true = pd.read_csv(path)
assert_allclose(sim.simulated_state,
true[['state1', 'state2', 'state3']].T,
atol=1e-7)
assert_allclose(sim.simulated_measurement_disturbance,
true[['eps1', 'eps2', 'eps3']].T,
atol=1e-7)
assert_allclose(sim.simulated_state_disturbance,
true[['eta1', 'eta2', 'eta3']].T,
atol=1e-7)
signals = np.zeros((3, self.model.nobs))
for t in range(self.model.nobs):
signals[:, t] = np.dot(Z, sim.simulated_state[:, t])
assert_allclose(signals, true[['signal1', 'signal2', 'signal3']].T,
atol=1e-7)
def test_simulation_smoothing_1(self):
# Test with measurement disturbance as np.arange / 10., all other
# disturbances are zeros
sim = self.sim
Z = self.model['design']
# Construct the variates
measurement_disturbance_variates = np.reshape(
np.arange(self.model.nobs * self.model.k_endog) / 10.,
(self.model.nobs, self.model.k_endog))
disturbance_variates = np.r_[
measurement_disturbance_variates.ravel(),
np.zeros(self.model.nobs * self.model.ssm.k_posdef)]
# Compute some additional known quantities
generated_measurement_disturbance = np.zeros(
measurement_disturbance_variates.shape)
chol = np.linalg.cholesky(self.model['obs_cov'])
for t in range(self.model.nobs):
generated_measurement_disturbance[t] = np.dot(
chol, measurement_disturbance_variates[t])
generated_model = mlemodel.MLEModel(
generated_measurement_disturbance, k_states=self.model.k_states,
k_posdef=self.model.ssm.k_posdef)
for name in ['design', 'obs_cov', 'transition',
'selection', 'state_cov']:
generated_model[name] = self.model[name]
generated_model.initialize_approximate_diffuse(1e6)
generated_model.ssm.filter_univariate = True
generated_res = generated_model.ssm.smooth()
simulated_state = (
0 - generated_res.smoothed_state + self.results.smoothed_state)
if not self.model.ssm.filter_collapsed:
simulated_measurement_disturbance = (
generated_measurement_disturbance.T -
generated_res.smoothed_measurement_disturbance +
self.results.smoothed_measurement_disturbance)
simulated_state_disturbance = (
0 - generated_res.smoothed_state_disturbance +
self.results.smoothed_state_disturbance)
# Test against known values
sim.simulate(disturbance_variates=disturbance_variates,
initial_state_variates=np.zeros(self.model.k_states))
assert_allclose(sim.generated_measurement_disturbance,
generated_measurement_disturbance)
assert_allclose(sim.generated_state_disturbance, 0)
assert_allclose(sim.generated_state, 0)
assert_allclose(sim.generated_obs,
generated_measurement_disturbance.T)
assert_allclose(sim.simulated_state, simulated_state)
if not self.model.ssm.filter_collapsed:
assert_allclose(sim.simulated_measurement_disturbance,
simulated_measurement_disturbance)
assert_allclose(sim.simulated_state_disturbance,
simulated_state_disturbance)
# Test against R package KFAS values
if self.test_against_KFAS:
path = os.path.join(current_path, 'results',
'results_simulation_smoothing1.csv')
true = pd.read_csv(path)
assert_allclose(sim.simulated_state,
true[['state1', 'state2', 'state3']].T,
atol=1e-7)
assert_allclose(sim.simulated_measurement_disturbance,
true[['eps1', 'eps2', 'eps3']].T,
atol=1e-7)
assert_allclose(sim.simulated_state_disturbance,
true[['eta1', 'eta2', 'eta3']].T,
atol=1e-7)
signals = np.zeros((3, self.model.nobs))
for t in range(self.model.nobs):
signals[:, t] = np.dot(Z, sim.simulated_state[:, t])
assert_allclose(signals, true[['signal1', 'signal2', 'signal3']].T,
atol=1e-7)
def test_simulation_smoothing_2(self):
# Test with measurement and state disturbances as np.arange / 10.,
# initial state variates are zeros.
sim = self.sim
Z = self.model['design']
T = self.model['transition']
# Construct the variates
measurement_disturbance_variates = np.reshape(
np.arange(self.model.nobs * self.model.k_endog) / 10.,
(self.model.nobs, self.model.k_endog))
state_disturbance_variates = np.reshape(
np.arange(self.model.nobs * self.model.ssm.k_posdef) / 10.,
(self.model.nobs, self.model.ssm.k_posdef))
disturbance_variates = np.r_[
measurement_disturbance_variates.ravel(),
state_disturbance_variates.ravel()]
initial_state_variates = np.zeros(self.model.k_states)
# Compute some additional known quantities
generated_measurement_disturbance = np.zeros(
measurement_disturbance_variates.shape)
chol = np.linalg.cholesky(self.model['obs_cov'])
for t in range(self.model.nobs):
generated_measurement_disturbance[t] = np.dot(
chol, measurement_disturbance_variates[t])
generated_state_disturbance = np.zeros(
state_disturbance_variates.shape)
chol = np.linalg.cholesky(self.model['state_cov'])
for t in range(self.model.nobs):
generated_state_disturbance[t] = np.dot(
chol, state_disturbance_variates[t])
generated_obs = np.zeros((self.model.k_endog, self.model.nobs))
generated_state = np.zeros((self.model.k_states, self.model.nobs+1))
chol = np.linalg.cholesky(self.results.initial_state_cov)
generated_state[:, 0] = (
self.results.initial_state + np.dot(chol, initial_state_variates))
for t in range(self.model.nobs):
generated_state[:, t+1] = (np.dot(T, generated_state[:, t]) +
generated_state_disturbance.T[:, t])
generated_obs[:, t] = (np.dot(Z, generated_state[:, t]) +
generated_measurement_disturbance.T[:, t])
generated_model = mlemodel.MLEModel(
generated_obs.T, k_states=self.model.k_states,
k_posdef=self.model.ssm.k_posdef)
for name in ['design', 'obs_cov', 'transition',
'selection', 'state_cov']:
generated_model[name] = self.model[name]
generated_model.initialize_approximate_diffuse(1e6)
generated_model.ssm.filter_univariate = True
generated_res = generated_model.ssm.smooth()
simulated_state = (
generated_state[:, :-1] - generated_res.smoothed_state +
self.results.smoothed_state)
if not self.model.ssm.filter_collapsed:
simulated_measurement_disturbance = (
generated_measurement_disturbance.T -
generated_res.smoothed_measurement_disturbance +
self.results.smoothed_measurement_disturbance)
simulated_state_disturbance = (
generated_state_disturbance.T -
Loading ...