Repository URL to install this package:
|
Version:
0.10.2 ▾
|
"""
Tests for python wrapper of state space representation and filtering
Author: Chad Fulton
License: Simplified-BSD
References
----------
Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.
"""
from __future__ import division, absolute_import, print_function
import os
import warnings
import numpy as np
import pandas as pd
import pytest
from statsmodels.tsa.statespace.representation import Representation
from statsmodels.tsa.statespace.kalman_filter import (
KalmanFilter, FilterResults, PredictionResults)
from statsmodels.tsa.statespace import tools, sarimax
from .results import results_kalman_filter
from numpy.testing import (
assert_equal, assert_almost_equal, assert_raises, assert_allclose)
current_path = os.path.dirname(os.path.abspath(__file__))
clark1989_path = os.path.join('results', 'results_clark1989_R.csv')
clark1989_results = pd.read_csv(os.path.join(current_path, clark1989_path))
class Clark1987(object):
"""
Clark's (1987) univariate unobserved components model of real GDP (as
presented in Kim and Nelson, 1999)
Test data produced using GAUSS code described in Kim and Nelson (1999) and
found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm
See `results.results_kalman_filter` for more information.
"""
@classmethod
def setup_class(cls, dtype=float, **kwargs):
cls.true = results_kalman_filter.uc_uni
cls.true_states = pd.DataFrame(cls.true['states'])
# GDP, Quarterly, 1947.1 - 1995.3
data = pd.DataFrame(
cls.true['data'],
index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
columns=['GDP']
)
data['lgdp'] = np.log(data['GDP'])
# Construct the statespace representation
k_states = 4
cls.model = KalmanFilter(k_endog=1, k_states=k_states, **kwargs)
cls.model.bind(data['lgdp'].values)
cls.model.design[:, :, 0] = [1, 1, 0, 0]
cls.model.transition[([0, 0, 1, 1, 2, 3],
[0, 3, 1, 2, 1, 3],
[0, 0, 0, 0, 0, 0])] = [1, 1, 0, 0, 1, 1]
cls.model.selection = np.eye(cls.model.k_states)
# Update matrices with given parameters
(sigma_v, sigma_e, sigma_w, phi_1, phi_2) = np.array(
cls.true['parameters']
)
cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
cls.model.state_cov[
np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
sigma_v**2, sigma_e**2, 0, sigma_w**2
]
# Initialization
initial_state = np.zeros((k_states,))
initial_state_cov = np.eye(k_states)*100
# Initialization: modification
initial_state_cov = np.dot(
np.dot(cls.model.transition[:, :, 0], initial_state_cov),
cls.model.transition[:, :, 0].T
)
cls.model.initialize_known(initial_state, initial_state_cov)
@classmethod
def run_filter(cls):
# Filter the data
return cls.model.filter()
def test_loglike(self):
assert_almost_equal(
self.results.llf_obs[self.true['start']:].sum(),
self.true['loglike'], 5
)
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[3][self.true['start']:],
self.true_states.iloc[:, 2], 4
)
class TestClark1987Single(Clark1987):
"""
Basic single precision test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
pytest.skip('Not implemented')
super(TestClark1987Single, cls).setup_class(
dtype=np.float32, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987Double(Clark1987):
"""
Basic double precision test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987Double, cls).setup_class(
dtype=float, conserve_memory=0
)
cls.results = cls.run_filter()
@pytest.mark.skip('Not implemented')
class TestClark1987SingleComplex(Clark1987):
"""
Basic single precision complex test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987SingleComplex, cls).setup_class(
dtype=np.complex64, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987DoubleComplex(Clark1987):
"""
Basic double precision complex test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987DoubleComplex, cls).setup_class(
dtype=complex, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987Conserve(Clark1987):
"""
Memory conservation test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987Conserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class Clark1987Forecast(Clark1987):
"""
Forecasting test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls, dtype=float, nforecast=100, conserve_memory=0):
super(Clark1987Forecast, cls).setup_class(
dtype=dtype, conserve_memory=conserve_memory
)
cls.nforecast = nforecast
# Add missing observations to the end (to forecast)
cls.model.endog = np.array(
np.r_[cls.model.endog[0, :], [np.nan]*nforecast],
ndmin=2, dtype=dtype, order="F"
)
cls.model.nobs = cls.model.endog.shape[1]
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[3][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 2], 4
)
class TestClark1987ForecastDouble(Clark1987Forecast):
"""
Basic double forecasting test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastDouble, cls).setup_class()
cls.results = cls.run_filter()
class TestClark1987ForecastDoubleComplex(Clark1987Forecast):
"""
Basic double complex forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastDoubleComplex, cls).setup_class(
dtype=complex
)
cls.results = cls.run_filter()
class TestClark1987ForecastConserve(Clark1987Forecast):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastConserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class TestClark1987ConserveAll(Clark1987):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ConserveAll, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02 | 0x04 | 0x08
)
cls.model.loglikelihood_burn = cls.true['start']
cls.results = cls.run_filter()
def test_loglike(self):
assert_almost_equal(
self.results.llf_obs[0], self.true['loglike'], 5
)
def test_filtered_state(self):
end = self.true_states.shape[0]
assert_almost_equal(
self.results.filtered_state[0][-1],
self.true_states.iloc[end-1, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][-1],
self.true_states.iloc[end-1, 1], 4
)
class Clark1989(object):
"""
Clark's (1989) bivariate unobserved components model of real GDP (as
presented in Kim and Nelson, 1999)
Tests two-dimensional observation data.
Test data produced using GAUSS code described in Kim and Nelson (1999) and
found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm
See `results.results_kalman_filter` for more information.
"""
@classmethod
def setup_class(cls, dtype=float, **kwargs):
cls.true = results_kalman_filter.uc_bi
cls.true_states = pd.DataFrame(cls.true['states'])
# GDP and Unemployment, Quarterly, 1948.1 - 1995.3
data = pd.DataFrame(
cls.true['data'],
index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
columns=['GDP', 'UNEMP']
)[4:]
data['GDP'] = np.log(data['GDP'])
data['UNEMP'] = (data['UNEMP']/100)
k_states = 6
cls.model = KalmanFilter(k_endog=2, k_states=k_states, **kwargs)
cls.model.bind(np.ascontiguousarray(data.values))
# Statespace representation
cls.model.design[:, :, 0] = [[1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1]]
cls.model.transition[
([0, 0, 1, 1, 2, 3, 4, 5],
[0, 4, 1, 2, 1, 2, 4, 5],
[0, 0, 0, 0, 0, 0, 0, 0])
] = [1, 1, 0, 0, 1, 1, 1, 1]
cls.model.selection = np.eye(cls.model.k_states)
# Update matrices with given parameters
(sigma_v, sigma_e, sigma_w, sigma_vl, sigma_ec,
phi_1, phi_2, alpha_1, alpha_2, alpha_3) = np.array(
cls.true['parameters'],
)
cls.model.design[([1, 1, 1], [1, 2, 3], [0, 0, 0])] = [
alpha_1, alpha_2, alpha_3
]
cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
cls.model.obs_cov[1, 1, 0] = sigma_ec**2
cls.model.state_cov[
np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
sigma_v**2, sigma_e**2, 0, 0, sigma_w**2, sigma_vl**2
]
# Initialization
initial_state = np.zeros((k_states,))
initial_state_cov = np.eye(k_states)*100
# Initialization: cls.modelification
initial_state_cov = np.dot(
np.dot(cls.model.transition[:, :, 0], initial_state_cov),
cls.model.transition[:, :, 0].T
)
cls.model.initialize_known(initial_state, initial_state_cov)
@classmethod
def run_filter(cls):
# Filter the data
return cls.model.filter()
def test_loglike(self):
assert_almost_equal(
# self.results.llf_obs[self.true['start']:].sum(),
self.results.llf_obs[0:].sum(),
self.true['loglike'], 2
)
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[4][self.true['start']:],
self.true_states.iloc[:, 2], 4
)
assert_almost_equal(
self.results.filtered_state[5][self.true['start']:],
self.true_states.iloc[:, 3], 4
)
class TestClark1989(Clark1989):
"""
Basic double precision test for the loglikelihood and filtered
states with two-dimensional observation vector.
"""
@classmethod
def setup_class(cls):
super(TestClark1989, cls).setup_class(dtype=float, conserve_memory=0)
cls.results = cls.run_filter()
def test_kalman_gain(self):
assert_allclose(self.results.kalman_gain.sum(axis=1).sum(axis=0),
clark1989_results['V1'], atol=1e-4)
class TestClark1989Conserve(Clark1989):
"""
Memory conservation test for the loglikelihood and filtered states with
two-dimensional observation vector.
"""
@classmethod
def setup_class(cls):
super(TestClark1989Conserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class Clark1989Forecast(Clark1989):
"""
Memory conservation test for the loglikelihood and filtered states with
two-dimensional observation vector.
"""
@classmethod
def setup_class(cls, dtype=float, nforecast=100, conserve_memory=0):
super(Clark1989Forecast, cls).setup_class(
dtype=dtype, conserve_memory=conserve_memory
)
cls.nforecast = nforecast
# Add missing observations to the end (to forecast)
cls.model.endog = np.array(
np.c_[
cls.model.endog,
np.r_[[np.nan, np.nan]*nforecast].reshape(2, nforecast)
],
ndmin=2, dtype=dtype, order="F"
)
cls.model.nobs = cls.model.endog.shape[1]
cls.results = cls.run_filter()
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[4][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 2], 4
)
assert_almost_equal(
self.results.filtered_state[5][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 3], 4
)
class TestClark1989ForecastDouble(Clark1989Forecast):
"""
Basic double forecasting test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1989ForecastDouble, cls).setup_class()
cls.results = cls.run_filter()
class TestClark1989ForecastDoubleComplex(Clark1989Forecast):
"""
Basic double complex forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1989ForecastDoubleComplex, cls).setup_class(
dtype=complex
)
cls.results = cls.run_filter()
class TestClark1989ForecastConserve(Clark1989Forecast):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1989ForecastConserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class TestClark1989ConserveAll(Clark1989):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1989ConserveAll, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02 | 0x04 | 0x08
)
# cls.model.loglikelihood_burn = cls.true['start']
cls.model.loglikelihood_burn = 0
cls.results = cls.run_filter()
def test_loglike(self):
assert_almost_equal(
self.results.llf_obs[0], self.true['loglike'], 2
)
def test_filtered_state(self):
end = self.true_states.shape[0]
assert_almost_equal(
self.results.filtered_state[0][-1],
self.true_states.iloc[end-1, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][-1],
self.true_states.iloc[end-1, 1], 4
)
assert_almost_equal(
self.results.filtered_state[4][-1],
self.true_states.iloc[end-1, 2], 4
)
assert_almost_equal(
self.results.filtered_state[5][-1],
self.true_states.iloc[end-1, 3], 4
)
class TestClark1989PartialMissing(Clark1989):
@classmethod
def setup_class(cls):
super(TestClark1989PartialMissing, cls).setup_class()
endog = cls.model.endog
endog[1, -51:] = np.NaN
cls.model.bind(endog)
cls.results = cls.run_filter()
def test_loglike(self):
assert_allclose(self.results.llf_obs[0:].sum(), 1232.113456)
def test_filtered_state(self):
# Could do this, but no need really.
pass
def test_predicted_state(self):
assert_allclose(
self.results.predicted_state.T[1:], clark1989_results.iloc[:, 1:],
atol=1e-8
)
# Miscellaneous coverage-related tests
def test_slice_notation():
# Test setting and getting state space representation matrices using the
# slice notation.
endog = np.arange(10)*1.0
mod = KalmanFilter(k_endog=1, k_states=2)
mod.bind(endog)
# Test invalid __setitem__
def set_designs():
mod['designs'] = 1
def set_designs2():
mod['designs', 0, 0] = 1
def set_designs3():
mod[0] = 1
assert_raises(IndexError, set_designs)
assert_raises(IndexError, set_designs2)
assert_raises(IndexError, set_designs3)
# Test invalid __getitem__
assert_raises(IndexError, lambda: mod['designs'])
assert_raises(IndexError, lambda: mod['designs', 0, 0, 0])
assert_raises(IndexError, lambda: mod[0])
# Test valid __setitem__, __getitem__
assert_equal(mod.design[0, 0, 0], 0)
mod['design', 0, 0, 0] = 1
assert_equal(mod['design'].sum(), 1)
assert_equal(mod.design[0, 0, 0], 1)
assert_equal(mod['design', 0, 0, 0], 1)
# Test valid __setitem__, __getitem__ with unspecified time index
mod['design'] = np.zeros(mod['design'].shape)
assert_equal(mod.design[0, 0], 0)
mod['design', 0, 0] = 1
assert_equal(mod.design[0, 0], 1)
assert_equal(mod['design', 0, 0], 1)
def test_representation():
# Test Representation construction
# Test an invalid number of states
def zero_kstates():
Representation(1, 0)
assert_raises(ValueError, zero_kstates)
# Test an invalid endogenous array
def empty_endog():
endog = np.zeros((0, 0))
Representation(endog, k_states=2)
assert_raises(ValueError, empty_endog)
# Test a Fortran-ordered endogenous array (which will be assumed to be in
# wide format: k_endog x nobs)
nobs = 10
k_endog = 2
arr = np.arange(nobs*k_endog).reshape(k_endog, nobs)*1.
endog = np.asfortranarray(arr)
mod = Representation(endog, k_states=2)
assert_equal(mod.nobs, nobs)
assert_equal(mod.k_endog, k_endog)
# Test a C-ordered endogenous array (which will be assumed to be in
# tall format: nobs x k_endog)
nobs = 10
k_endog = 2
endog = np.arange(nobs*k_endog).reshape(nobs, k_endog)*1.
mod = Representation(endog, k_states=2)
assert_equal(mod.nobs, nobs)
assert_equal(mod.k_endog, k_endog)
# Test getting the statespace representation
assert_equal(mod._statespace, None)
mod._initialize_representation()
assert_equal(mod._statespace is not None, True)
def test_bind():
# Test binding endogenous data to Kalman filter
mod = Representation(2, k_states=2)
# Test invalid endogenous array (it must be ndarray)
assert_raises(ValueError, lambda: mod.bind([1, 2, 3, 4]))
# Test valid (nobs x 1) endogenous array
mod.bind(np.arange(10).reshape((5, 2))*1.)
assert_equal(mod.nobs, 5)
# Test valid (k_endog x 0) endogenous array
mod.bind(np.zeros((0, 2), dtype=np.float64))
# Test invalid (3-dim) endogenous array
with pytest.raises(ValueError):
mod.bind(np.arange(12).reshape(2, 2, 3)*1.)
# Test valid F-contiguous
mod.bind(np.asfortranarray(np.arange(10).reshape(2, 5)))
assert_equal(mod.nobs, 5)
# Test valid C-contiguous
mod.bind(np.arange(10).reshape(5, 2))
assert_equal(mod.nobs, 5)
# Test invalid F-contiguous
with pytest.raises(ValueError):
mod.bind(np.asfortranarray(np.arange(10).reshape(5, 2)))
# Test invalid C-contiguous
assert_raises(ValueError, lambda: mod.bind(np.arange(10).reshape(2, 5)))
def test_initialization():
# Test Kalman filter initialization
mod = Representation(1, k_states=2)
# Test invalid state initialization
assert_raises(RuntimeError, lambda: mod._initialize_state())
# Test valid initialization
initial_state = np.zeros(2,) + 1.5
initial_state_cov = np.eye(2) * 3.
mod.initialize_known(initial_state, initial_state_cov)
assert_equal(mod.initialization.constant.sum(), 3)
assert_equal(mod.initialization.stationary_cov.diagonal().sum(), 6)
# Test invalid initial_state
initial_state = np.zeros(10,)
with pytest.raises(ValueError):
mod.initialize_known(initial_state, initial_state_cov)
initial_state = np.zeros((10, 10))
with pytest.raises(ValueError):
mod.initialize_known(initial_state, initial_state_cov)
# Test invalid initial_state_cov
initial_state = np.zeros(2,) + 1.5
initial_state_cov = np.eye(3)
with pytest.raises(ValueError):
mod.initialize_known(initial_state, initial_state_cov)
def test_init_matrices_time_invariant():
# Test setting state space system matrices in __init__, with time-invariant
# matrices
k_endog = 2
k_states = 3
k_posdef = 1
endog = np.zeros((10, 2))
obs_intercept = np.arange(k_endog) * 1.0
design = np.reshape(
np.arange(k_endog * k_states) * 1.0, (k_endog, k_states))
obs_cov = np.reshape(np.arange(k_endog**2) * 1.0, (k_endog, k_endog))
state_intercept = np.arange(k_states) * 1.0
transition = np.reshape(np.arange(k_states**2) * 1.0, (k_states, k_states))
selection = np.reshape(
np.arange(k_states * k_posdef) * 1.0, (k_states, k_posdef))
state_cov = np.reshape(np.arange(k_posdef**2) * 1.0, (k_posdef, k_posdef))
mod = Representation(endog, k_states=k_states, k_posdef=k_posdef,
obs_intercept=obs_intercept, design=design,
obs_cov=obs_cov, state_intercept=state_intercept,
transition=transition, selection=selection,
state_cov=state_cov)
assert_allclose(mod['obs_intercept'], obs_intercept)
assert_allclose(mod['design'], design)
assert_allclose(mod['obs_cov'], obs_cov)
assert_allclose(mod['state_intercept'], state_intercept)
assert_allclose(mod['transition'], transition)
assert_allclose(mod['selection'], selection)
assert_allclose(mod['state_cov'], state_cov)
def test_init_matrices_time_varying():
# Test setting state space system matrices in __init__, with time-varying
# matrices
nobs = 10
k_endog = 2
k_states = 3
k_posdef = 1
endog = np.zeros((10, 2))
obs_intercept = np.reshape(np.arange(k_endog * nobs) * 1.0,
(k_endog, nobs))
design = np.reshape(
np.arange(k_endog * k_states * nobs) * 1.0, (k_endog, k_states, nobs))
obs_cov = np.reshape(
np.arange(k_endog**2 * nobs) * 1.0, (k_endog, k_endog, nobs))
state_intercept = np.reshape(
np.arange(k_states * nobs) * 1.0, (k_states, nobs))
transition = np.reshape(
np.arange(k_states**2 * nobs) * 1.0, (k_states, k_states, nobs))
selection = np.reshape(
np.arange(k_states * k_posdef * nobs) * 1.0,
(k_states, k_posdef, nobs))
state_cov = np.reshape(
np.arange(k_posdef**2 * nobs) * 1.0, (k_posdef, k_posdef, nobs))
mod = Representation(endog, k_states=k_states, k_posdef=k_posdef,
obs_intercept=obs_intercept, design=design,
obs_cov=obs_cov, state_intercept=state_intercept,
transition=transition, selection=selection,
state_cov=state_cov)
assert_allclose(mod['obs_intercept'], obs_intercept)
assert_allclose(mod['design'], design)
assert_allclose(mod['obs_cov'], obs_cov)
assert_allclose(mod['state_intercept'], state_intercept)
assert_allclose(mod['transition'], transition)
assert_allclose(mod['selection'], selection)
assert_allclose(mod['state_cov'], state_cov)
def test_no_endog():
# Test for RuntimeError when no endog is provided by the time filtering
# is initialized.
mod = KalmanFilter(k_endog=1, k_states=1)
# directly call the _initialize_filter function
assert_raises(RuntimeError, mod._initialize_filter)
# indirectly call it through filtering
mod.initialize_approximate_diffuse()
assert_raises(RuntimeError, mod.filter)
def test_cython():
# Test the cython _kalman_filter creation, re-creation, calling, etc.
# Check that datatypes are correct:
for prefix, dtype in tools.prefix_dtype_map.items():
endog = np.array(1., ndmin=2, dtype=dtype)
mod = KalmanFilter(k_endog=1, k_states=1, dtype=dtype)
# Bind data and initialize the ?KalmanFilter object
mod.bind(endog)
mod._initialize_filter()
# Check that the dtype and prefix are correct
assert_equal(mod.prefix, prefix)
assert_equal(mod.dtype, dtype)
# Test that a dKalmanFilter instance was created
assert_equal(prefix in mod._kalman_filters, True)
kf = mod._kalman_filters[prefix]
assert isinstance(kf, tools.prefix_kalman_filter_map[prefix])
# Test that the default returned _kalman_filter is the above instance
assert_equal(mod._kalman_filter, kf)
# Check that upcasting datatypes / ?KalmanFilter works (e.g. d -> z)
mod = KalmanFilter(k_endog=1, k_states=1)
# Default dtype is float
assert_equal(mod.prefix, 'd')
assert_equal(mod.dtype, np.float64)
# Prior to initialization, no ?KalmanFilter exists
assert_equal(mod._kalman_filter, None)
# Bind data and initialize the ?KalmanFilter object
endog = np.ascontiguousarray(np.array([1., 2.], dtype=np.float64))
mod.bind(endog)
mod._initialize_filter()
kf = mod._kalman_filters['d']
# Rebind data, still float, check that we haven't changed
mod.bind(endog)
mod._initialize_filter()
assert_equal(mod._kalman_filter, kf)
# Force creating new ?Statespace and ?KalmanFilter, by changing the
# time-varying character of an array
mod.design = np.zeros((1, 1, 2))
mod._initialize_filter()
assert_equal(mod._kalman_filter == kf, False)
kf = mod._kalman_filters['d']
# Rebind data, now complex, check that the ?KalmanFilter instance has
# changed
endog = np.ascontiguousarray(np.array([1., 2.], dtype=np.complex128))
mod.bind(endog)
assert_equal(mod._kalman_filter == kf, False)
def test_filter():
# Tests of invalid calls to the filter function
endog = np.ones((10, 1))
mod = KalmanFilter(endog, k_states=1, initialization='approximate_diffuse')
mod['design', :] = 1
mod['selection', :] = 1
mod['state_cov', :] = 1
# Test default filter results
res = mod.filter()
assert_equal(isinstance(res, FilterResults), True)
def test_loglike():
# Tests of invalid calls to the loglike function
endog = np.ones((10, 1))
mod = KalmanFilter(endog, k_states=1, initialization='approximate_diffuse')
mod['design', :] = 1
mod['selection', :] = 1
mod['state_cov', :] = 1
# Test that self.memory_no_likelihood = True raises an error
mod.memory_no_likelihood = True
assert_raises(RuntimeError, mod.loglike)
assert_raises(RuntimeError, mod.loglikeobs)
def test_predict():
# Tests of invalid calls to the predict function
warnings.simplefilter("always")
endog = np.ones((10, 1))
mod = KalmanFilter(endog, k_states=1, initialization='approximate_diffuse')
mod['design', :] = 1
mod['obs_intercept'] = np.zeros((1, 10))
mod['selection', :] = 1
mod['state_cov', :] = 1
# Check that we need both forecasts and predicted output for prediction
mod.memory_no_forecast = True
res = mod.filter()
assert_raises(ValueError, res.predict)
mod.memory_no_forecast = False
mod.memory_no_predicted = True
res = mod.filter()
assert_raises(ValueError, res.predict)
mod.memory_no_predicted = False
# Now get a clean filter object
res = mod.filter()
# Check that start < 0 is an error
assert_raises(ValueError, res.predict, start=-1)
# Check that end < start is an error
assert_raises(ValueError, res.predict, start=2, end=1)
# Check that dynamic < 0 is an error
assert_raises(ValueError, res.predict, dynamic=-1)
# Check that dynamic > end is an warning
with warnings.catch_warnings(record=True) as w:
res.predict(end=1, dynamic=2)
message = ('Dynamic prediction specified to begin after the end of'
' prediction, and so has no effect.')
assert_equal(str(w[0].message), message)
# Check that dynamic > nobs is an warning
with warnings.catch_warnings(record=True) as w:
res.predict(end=11, dynamic=11, obs_intercept=np.zeros((1, 1)))
message = ('Dynamic prediction specified to begin during'
' out-of-sample forecasting period, and so has no'
' effect.')
assert_equal(str(w[0].message), message)
# Check for a warning when providing a non-used statespace matrix
with warnings.catch_warnings(record=True) as w:
res.predict(end=res.nobs+1, design=True,
obs_intercept=np.zeros((1, 1)))
message = ('Model has time-invariant design matrix, so the design'
' argument to `predict` has been ignored.')
assert_equal(str(w[0].message), message)
# Check that an error is raised when a new time-varying matrix is not
# provided
assert_raises(ValueError, res.predict, end=res.nobs+1)
# Check that an error is raised when a non-two-dimensional obs_intercept
# is given
assert_raises(ValueError, res.predict, end=res.nobs+1,
obs_intercept=np.zeros(1))
# Check that an error is raised when an obs_intercept with incorrect length
# is given
assert_raises(ValueError, res.predict, end=res.nobs+1,
obs_intercept=np.zeros(2))
# Check that start=None gives start=0 and end=None gives end=nobs
assert_equal(res.predict().forecasts.shape, (1, res.nobs))
# Check that dynamic=True begins dynamic prediction immediately
# TODO just a smoke test
res.predict(dynamic=True)
# Check that on success, PredictionResults object is returned
prediction_results = res.predict(start=3, end=5)
assert_equal(isinstance(prediction_results, PredictionResults), True)
# Check for correctly subset representation arrays
# (k_endog, npredictions) = (1, 2)
assert_equal(prediction_results.endog.shape, (1, 2))
# (k_endog, npredictions) = (1, 2)
assert_equal(prediction_results.obs_intercept.shape, (1, 2))
# (k_endog, k_states) = (1, 1)
assert_equal(prediction_results.design.shape, (1, 1))
# (k_endog, k_endog) = (1, 1)
assert_equal(prediction_results.obs_cov.shape, (1, 1))
# (k_state,) = (1,)
assert_equal(prediction_results.state_intercept.shape, (1,))
# (k_state, npredictions) = (1, 2)
assert_equal(prediction_results.obs_intercept.shape, (1, 2))
# (k_state, k_state) = (1, 1)
assert_equal(prediction_results.transition.shape, (1, 1))
# (k_state, k_posdef) = (1, 1)
assert_equal(prediction_results.selection.shape, (1, 1))
# (k_posdef, k_posdef) = (1, 1)
assert_equal(prediction_results.state_cov.shape, (1, 1))
# Check for correctly subset filter output arrays
# (k_endog, npredictions) = (1, 2)
assert_equal(prediction_results.forecasts.shape, (1, 2))
assert_equal(prediction_results.forecasts_error.shape, (1, 2))
# (k_states, npredictions) = (1, 2)
assert_equal(prediction_results.filtered_state.shape, (1, 2))
assert_equal(prediction_results.predicted_state.shape, (1, 2))
# (k_endog, k_endog, npredictions) = (1, 1, 2)
assert_equal(prediction_results.forecasts_error_cov.shape, (1, 1, 2))
# (k_states, k_states, npredictions) = (1, 1, 2)
assert_equal(prediction_results.filtered_state_cov.shape, (1, 1, 2))
assert_equal(prediction_results.predicted_state_cov.shape, (1, 1, 2))
# Check for invalid attribute
assert_raises(AttributeError, getattr, prediction_results, 'test')
# Check that an error is raised when a non-two-dimensional obs_cov
# is given
# ...and...
# Check that an error is raised when an obs_cov with incorrect length
# is given
mod = KalmanFilter(endog, k_states=1, initialization='approximate_diffuse')
mod['design', :] = 1
mod['obs_cov'] = np.zeros((1, 1, 10))
mod['selection', :] = 1
mod['state_cov', :] = 1
res = mod.filter()
assert_raises(ValueError, res.predict, end=res.nobs+1,
obs_cov=np.zeros((1, 1)))
assert_raises(ValueError, res.predict, end=res.nobs+1,
obs_cov=np.zeros((1, 1, 2)))
def test_standardized_forecasts_error():
# Simple test that standardized forecasts errors are calculated correctly.
# Just uses a different calculation method on a univariate series.
# Get the dataset
true = results_kalman_filter.uc_uni
data = pd.DataFrame(
true['data'],
index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
columns=['GDP']
)
data['lgdp'] = np.log(data['GDP'])
# Fit an ARIMA(1, 1, 0) to log GDP
mod = sarimax.SARIMAX(data['lgdp'], order=(1, 1, 0))
res = mod.fit(disp=-1)
standardized_forecasts_error = (
res.filter_results.forecasts_error[0] /
np.sqrt(res.filter_results.forecasts_error_cov[0, 0])
)
assert_allclose(
res.filter_results.standardized_forecasts_error[0],
standardized_forecasts_error,
)
def test_simulate():
# Test for simulation of new time-series
from scipy.signal import lfilter
# Common parameters
nsimulations = 10
sigma2 = 2
measurement_shocks = np.zeros(nsimulations)
state_shocks = np.random.normal(scale=sigma2**0.5, size=nsimulations)
# Random walk model, so simulated series is just the cumulative sum of
# the shocks
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
actual = mod.simulate(
nsimulations, measurement_shocks=measurement_shocks,
state_shocks=state_shocks)[0].squeeze()
desired = np.r_[0, np.cumsum(state_shocks)[:-1]]
assert_allclose(actual, desired)
# Local level model, so simulated series is just the cumulative sum of
# the shocks plus the measurement shock
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
actual = mod.simulate(
nsimulations, measurement_shocks=np.ones(nsimulations),
state_shocks=state_shocks)[0].squeeze()
desired = np.r_[1, np.cumsum(state_shocks)[:-1] + 1]
assert_allclose(actual, desired)
# Local level-like model with observation and state intercepts, so
# simulated series is just the cumulative sum of the shocks minus the state
# intercept, plus the observation intercept and the measurement shock
mod = KalmanFilter(k_endog=1, k_states=1)
mod['obs_intercept', 0, 0] = 5.
mod['design', 0, 0] = 1.
mod['state_intercept', 0, 0] = -2.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
actual = mod.simulate(
nsimulations, measurement_shocks=np.ones(nsimulations),
state_shocks=state_shocks)[0].squeeze()
desired = np.r_[1 + 5, np.cumsum(state_shocks - 2)[:-1] + 1 + 5]
assert_allclose(actual, desired)
# Model with time-varying observation intercept
mod = KalmanFilter(k_endog=1, k_states=1, nobs=10)
mod['obs_intercept'] = (np.arange(10)*1.).reshape(1, 10)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
actual = mod.simulate(
nsimulations, measurement_shocks=measurement_shocks,
state_shocks=state_shocks)[0].squeeze()
desired = np.r_[0, np.cumsum(state_shocks)[:-1] + np.arange(1, 10)]
assert_allclose(actual, desired)
# Model with time-varying observation intercept, check that error is raised
# if more simulations are requested than are nobs.
mod = KalmanFilter(k_endog=1, k_states=1, nobs=10)
mod['obs_intercept'] = (np.arange(10)*1.).reshape(1, 10)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
assert_raises(ValueError, mod.simulate, nsimulations+1, measurement_shocks,
state_shocks)
# ARMA(1, 1): phi = [0.1], theta = [0.5], sigma^2 = 2
phi = 0.1
theta = 0.5
mod = sarimax.SARIMAX([0], order=(1, 0, 1))
mod.update(np.r_[phi, theta, sigma2])
actual = mod.ssm.simulate(
nsimulations, measurement_shocks=measurement_shocks,
state_shocks=state_shocks,
initial_state=np.zeros(mod.k_states))[0].squeeze()
desired = lfilter([1, theta], [1, -phi], np.r_[0, state_shocks[:-1]])
assert_allclose(actual, desired)
# SARIMAX(1, 0, 1)x(1, 0, 1, 4), this time using the results object call
mod = sarimax.SARIMAX([0.1, 0.5, -0.2], order=(1, 0, 1),
seasonal_order=(1, 0, 1, 4))
res = mod.filter([0.1, 0.5, 0.2, -0.3, 1])
actual = res.simulate(
nsimulations, measurement_shocks=measurement_shocks,
state_shocks=state_shocks, initial_state=np.zeros(mod.k_states))
desired = lfilter(
res.polynomial_reduced_ma, res.polynomial_reduced_ar,
np.r_[0, state_shocks[:-1]])
assert_allclose(actual, desired)
def test_impulse_responses():
# Test for impulse response functions
# Random walk: 1-unit impulse response (i.e. non-orthogonalized irf) is 1
# for all periods
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
mod['state_cov', 0, 0] = 2.
actual = mod.impulse_responses(steps=10)
desired = np.ones((11, 1))
assert_allclose(actual, desired)
# Random walk: 2-unit impulse response (i.e. non-orthogonalized irf) is 2
# for all periods
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
mod['state_cov', 0, 0] = 2.
actual = mod.impulse_responses(steps=10, impulse=[2])
desired = np.ones((11, 1)) * 2
assert_allclose(actual, desired)
# Random walk: 1-standard-deviation response (i.e. orthogonalized irf) is
# sigma for all periods (here sigma^2 = 2)
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
mod['state_cov', 0, 0] = 2.
actual = mod.impulse_responses(steps=10, orthogonalized=True)
desired = np.ones((11, 1)) * 2**0.5
assert_allclose(actual, desired)
# Random walk: 1-standard-deviation cumulative response (i.e. cumulative
# orthogonalized irf)
mod = KalmanFilter(k_endog=1, k_states=1)
mod['design', 0, 0] = 1.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
mod['state_cov', 0, 0] = 2.
actual = mod.impulse_responses(steps=10, orthogonalized=True,
cumulative=True)
desired = np.cumsum(np.ones((11, 1)) * 2**0.5)[:, np.newaxis]
actual = mod.impulse_responses(steps=10, impulse=[1], orthogonalized=True,
cumulative=True)
desired = np.cumsum(np.ones((11, 1)) * 2**0.5)[:, np.newaxis]
assert_allclose(actual, desired)
# Random walk: 1-unit impulse response (i.e. non-orthogonalized irf) is 1
# for all periods, even when intercepts are present
mod = KalmanFilter(k_endog=1, k_states=1)
mod['state_intercept', 0] = 100.
mod['design', 0, 0] = 1.
mod['obs_intercept', 0] = -1000.
mod['transition', 0, 0] = 1.
mod['selection', 0, 0] = 1.
mod['state_cov', 0, 0] = 2.
actual = mod.impulse_responses(steps=10)
desired = np.ones((11, 1))
assert_allclose(actual, desired)
# Univariate model (random walk): test that an error is thrown when
# a multivariate or empty "impulse" is sent
mod = KalmanFilter(k_endog=1, k_states=1)
assert_raises(ValueError, mod.impulse_responses, impulse=1)
assert_raises(ValueError, mod.impulse_responses, impulse=[1, 1])
assert_raises(ValueError, mod.impulse_responses, impulse=[])
# Univariate model with two uncorrelated shocks
mod = KalmanFilter(k_endog=1, k_states=2)
mod['design', 0, 0:2] = 1.
mod['transition', :, :] = np.eye(2)
mod['selection', :, :] = np.eye(2)
mod['state_cov', :, :] = np.eye(2)
desired = np.ones((11, 1))
actual = mod.impulse_responses(steps=10, impulse=0)
assert_allclose(actual, desired)
actual = mod.impulse_responses(steps=10, impulse=[1, 0])
assert_allclose(actual, desired)
actual = mod.impulse_responses(steps=10, impulse=1)
assert_allclose(actual, desired)
actual = mod.impulse_responses(steps=10, impulse=[0, 1])
assert_allclose(actual, desired)
# In this case (with sigma=sigma^2=1), orthogonalized is the same as not
actual = mod.impulse_responses(steps=10, impulse=0, orthogonalized=True)
assert_allclose(actual, desired)
actual = mod.impulse_responses(
steps=10, impulse=[1, 0], orthogonalized=True)
assert_allclose(actual, desired)
actual = mod.impulse_responses(
steps=10, impulse=[0, 1], orthogonalized=True)
assert_allclose(actual, desired)
# Univariate model with two correlated shocks
mod = KalmanFilter(k_endog=1, k_states=2)
mod['design', 0, 0:2] = 1.
mod['transition', :, :] = np.eye(2)
mod['selection', :, :] = np.eye(2)
mod['state_cov', :, :] = np.array([[1, 0.5], [0.5, 1.25]])
desired = np.ones((11, 1))
# Non-orthogonalized (i.e. 1-unit) impulses still just generate 1's
actual = mod.impulse_responses(steps=10, impulse=0)
assert_allclose(actual, desired)
actual = mod.impulse_responses(steps=10, impulse=1)
assert_allclose(actual, desired)
# Orthogonalized (i.e. 1-std-dev) impulses now generate different responses
actual = mod.impulse_responses(steps=10, impulse=0, orthogonalized=True)
assert_allclose(actual, desired + desired * 0.5)
actual = mod.impulse_responses(steps=10, impulse=1, orthogonalized=True)
assert_allclose(actual, desired)
# Multivariate model with two correlated shocks
mod = KalmanFilter(k_endog=2, k_states=2)
mod['design', :, :] = np.eye(2)
mod['transition', :, :] = np.eye(2)
mod['selection', :, :] = np.eye(2)
mod['state_cov', :, :] = np.array([[1, 0.5], [0.5, 1.25]])
ones = np.ones((11, 1))
zeros = np.zeros((11, 1))
# Non-orthogonalized (i.e. 1-unit) impulses still just generate 1's, but
# only for the appropriate series
actual = mod.impulse_responses(steps=10, impulse=0)
assert_allclose(actual, np.c_[ones, zeros])
actual = mod.impulse_responses(steps=10, impulse=1)
assert_allclose(actual, np.c_[zeros, ones])
# Orthogonalized (i.e. 1-std-dev) impulses now generate different
# responses, and only for the appropriate series
actual = mod.impulse_responses(steps=10, impulse=0, orthogonalized=True)
assert_allclose(actual, np.c_[ones, ones * 0.5])
actual = mod.impulse_responses(steps=10, impulse=1, orthogonalized=True)
assert_allclose(actual, np.c_[zeros, ones])
# AR(1) model generates a geometrically declining series
mod = sarimax.SARIMAX([0.1, 0.5, -0.2], order=(1, 0, 0))
phi = 0.5
mod.update([phi, 1])
desired = np.cumprod(np.r_[1, [phi]*10])
# Test going through the model directly
actual = mod.ssm.impulse_responses(steps=10)
assert_allclose(actual[:, 0], desired)
# Test going through the results object
res = mod.filter([phi, 1.])
actual = res.impulse_responses(steps=10)
assert_allclose(actual, desired)
def test_missing():
# Datasets
endog = np.arange(10).reshape(10, 1)
endog_pre_na = np.ascontiguousarray(np.c_[
endog.copy() * np.nan, endog.copy() * np.nan, endog, endog])
endog_post_na = np.ascontiguousarray(np.c_[
endog, endog, endog.copy() * np.nan, endog.copy() * np.nan])
endog_inject_na = np.ascontiguousarray(np.c_[
endog, endog.copy() * np.nan, endog, endog.copy() * np.nan])
# Base model
mod = KalmanFilter(np.ascontiguousarray(np.c_[endog, endog]), k_states=1,
initialization='approximate_diffuse')
mod['design', :, :] = 1
mod['obs_cov', :, :] = np.eye(mod.k_endog)*0.5
mod['transition', :, :] = 0.5
mod['selection', :, :] = 1
mod['state_cov', :, :] = 0.5
llf = mod.loglikeobs()
# Model with prepended nans
mod = KalmanFilter(endog_pre_na, k_states=1,
initialization='approximate_diffuse')
mod['design', :, :] = 1
mod['obs_cov', :, :] = np.eye(mod.k_endog)*0.5
mod['transition', :, :] = 0.5
mod['selection', :, :] = 1
mod['state_cov', :, :] = 0.5
llf_pre_na = mod.loglikeobs()
assert_allclose(llf_pre_na, llf)
# Model with appended nans
mod = KalmanFilter(endog_post_na, k_states=1,
initialization='approximate_diffuse')
mod['design', :, :] = 1
mod['obs_cov', :, :] = np.eye(mod.k_endog)*0.5
mod['transition', :, :] = 0.5
mod['selection', :, :] = 1
mod['state_cov', :, :] = 0.5
llf_post_na = mod.loglikeobs()
assert_allclose(llf_post_na, llf)
# Model with injected nans
mod = KalmanFilter(endog_inject_na, k_states=1,
initialization='approximate_diffuse')
mod['design', :, :] = 1
mod['obs_cov', :, :] = np.eye(mod.k_endog)*0.5
mod['transition', :, :] = 0.5
mod['selection', :, :] = 1
mod['state_cov', :, :] = 0.5
llf_inject_na = mod.loglikeobs()
assert_allclose(llf_inject_na, llf)