"""
Tests for python wrapper of state space representation and filtering
Author: Chad Fulton
License: Simplified-BSD
References
----------
Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.
"""
import os
import warnings
import numpy as np
import pandas as pd
import pytest
from statsmodels.tsa.statespace.representation import Representation
from statsmodels.tsa.statespace.kalman_filter import (
KalmanFilter, FilterResults, PredictionResults)
from statsmodels.tsa.statespace.simulation_smoother import SimulationSmoother
from statsmodels.tsa.statespace import tools, sarimax
from .results import results_kalman_filter
from numpy.testing import (
assert_equal, assert_almost_equal, assert_raises, assert_allclose)
current_path = os.path.dirname(os.path.abspath(__file__))
clark1989_path = os.path.join('results', 'results_clark1989_R.csv')
clark1989_results = pd.read_csv(os.path.join(current_path, clark1989_path))
class Clark1987(object):
"""
Clark's (1987) univariate unobserved components model of real GDP (as
presented in Kim and Nelson, 1999)
Test data produced using GAUSS code described in Kim and Nelson (1999) and
found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm
See `results.results_kalman_filter` for more information.
"""
@classmethod
def setup_class(cls, dtype=float, **kwargs):
cls.true = results_kalman_filter.uc_uni
cls.true_states = pd.DataFrame(cls.true['states'])
# GDP, Quarterly, 1947.1 - 1995.3
data = pd.DataFrame(
cls.true['data'],
index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
columns=['GDP']
)
data['lgdp'] = np.log(data['GDP'])
# Construct the statespace representation
k_states = 4
cls.model = KalmanFilter(k_endog=1, k_states=k_states, **kwargs)
cls.model.bind(data['lgdp'].values)
cls.model.design[:, :, 0] = [1, 1, 0, 0]
cls.model.transition[([0, 0, 1, 1, 2, 3],
[0, 3, 1, 2, 1, 3],
[0, 0, 0, 0, 0, 0])] = [1, 1, 0, 0, 1, 1]
cls.model.selection = np.eye(cls.model.k_states)
# Update matrices with given parameters
(sigma_v, sigma_e, sigma_w, phi_1, phi_2) = np.array(
cls.true['parameters']
)
cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
cls.model.state_cov[
np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
sigma_v**2, sigma_e**2, 0, sigma_w**2
]
# Initialization
initial_state = np.zeros((k_states,))
initial_state_cov = np.eye(k_states)*100
# Initialization: modification
initial_state_cov = np.dot(
np.dot(cls.model.transition[:, :, 0], initial_state_cov),
cls.model.transition[:, :, 0].T
)
cls.model.initialize_known(initial_state, initial_state_cov)
@classmethod
def run_filter(cls):
# Filter the data
return cls.model.filter()
def test_loglike(self):
assert_almost_equal(
self.results.llf_obs[self.true['start']:].sum(),
self.true['loglike'], 5
)
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[3][self.true['start']:],
self.true_states.iloc[:, 2], 4
)
class TestClark1987Single(Clark1987):
"""
Basic single precision test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
pytest.skip('Not implemented')
super(TestClark1987Single, cls).setup_class(
dtype=np.float32, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987Double(Clark1987):
"""
Basic double precision test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987Double, cls).setup_class(
dtype=float, conserve_memory=0
)
cls.results = cls.run_filter()
@pytest.mark.skip('Not implemented')
class TestClark1987SingleComplex(Clark1987):
"""
Basic single precision complex test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987SingleComplex, cls).setup_class(
dtype=np.complex64, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987DoubleComplex(Clark1987):
"""
Basic double precision complex test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987DoubleComplex, cls).setup_class(
dtype=complex, conserve_memory=0
)
cls.results = cls.run_filter()
class TestClark1987Conserve(Clark1987):
"""
Memory conservation test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987Conserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class Clark1987Forecast(Clark1987):
"""
Forecasting test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls, dtype=float, nforecast=100, conserve_memory=0):
super(Clark1987Forecast, cls).setup_class(
dtype=dtype, conserve_memory=conserve_memory
)
cls.nforecast = nforecast
# Add missing observations to the end (to forecast)
cls.model.endog = np.array(
np.r_[cls.model.endog[0, :], [np.nan]*nforecast],
ndmin=2, dtype=dtype, order="F"
)
cls.model.nobs = cls.model.endog.shape[1]
def test_filtered_state(self):
assert_almost_equal(
self.results.filtered_state[0][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 1], 4
)
assert_almost_equal(
self.results.filtered_state[3][self.true['start']:-self.nforecast],
self.true_states.iloc[:, 2], 4
)
class TestClark1987ForecastDouble(Clark1987Forecast):
"""
Basic double forecasting test for the loglikelihood and filtered states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastDouble, cls).setup_class()
cls.results = cls.run_filter()
class TestClark1987ForecastDoubleComplex(Clark1987Forecast):
"""
Basic double complex forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastDoubleComplex, cls).setup_class(
dtype=complex
)
cls.results = cls.run_filter()
class TestClark1987ForecastConserve(Clark1987Forecast):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ForecastConserve, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02
)
cls.results = cls.run_filter()
class TestClark1987ConserveAll(Clark1987):
"""
Memory conservation forecasting test for the loglikelihood and filtered
states.
"""
@classmethod
def setup_class(cls):
super(TestClark1987ConserveAll, cls).setup_class(
dtype=float, conserve_memory=0x01 | 0x02 | 0x04 | 0x08
)
cls.model.loglikelihood_burn = cls.true['start']
cls.results = cls.run_filter()
def test_loglike(self):
assert_almost_equal(
self.results.llf, self.true['loglike'], 5
)
def test_filtered_state(self):
end = self.true_states.shape[0]
assert_almost_equal(
self.results.filtered_state[0][-1],
self.true_states.iloc[end-1, 0], 4
)
assert_almost_equal(
self.results.filtered_state[1][-1],
self.true_states.iloc[end-1, 1], 4
)
class Clark1989(object):
"""
Clark's (1989) bivariate unobserved components model of real GDP (as
presented in Kim and Nelson, 1999)
Tests two-dimensional observation data.
Test data produced using GAUSS code described in Kim and Nelson (1999) and
found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm
See `results.results_kalman_filter` for more information.
"""
@classmethod
def setup_class(cls, dtype=float, **kwargs):
cls.true = results_kalman_filter.uc_bi
cls.true_states = pd.DataFrame(cls.true['states'])
# GDP and Unemployment, Quarterly, 1948.1 - 1995.3
data = pd.DataFrame(
cls.true['data'],
index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
columns=['GDP', 'UNEMP']
)[4:]
data['GDP'] = np.log(data['GDP'])
data['UNEMP'] = (data['UNEMP']/100)
k_states = 6
cls.model = KalmanFilter(k_endog=2, k_states=k_states, **kwargs)
cls.model.bind(np.ascontiguousarray(data.values))
# Statespace representation
cls.model.design[:, :, 0] = [[1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1]]
cls.model.transition[
([0, 0, 1, 1, 2, 3, 4, 5],
[0, 4, 1, 2, 1, 2, 4, 5],
[0, 0, 0, 0, 0, 0, 0, 0])
] = [1, 1, 0, 0, 1, 1, 1, 1]
cls.model.selection = np.eye(cls.model.k_states)
# Update matrices with given parameters
(sigma_v, sigma_e, sigma_w, sigma_vl, sigma_ec,
phi_1, phi_2, alpha_1, alpha_2, alpha_3) = np.array(
cls.true['parameters'],
)
cls.model.design[([1, 1, 1], [1, 2, 3], [0, 0, 0])] = [
alpha_1, alpha_2, alpha_3
]
cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
cls.model.obs_cov[1, 1, 0] = sigma_ec**2
cls.model.state_cov[
np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
sigma_v**2, sigma_e**2, 0, 0, sigma_w**2, sigma_vl**2
]
# Initialization
initial_state = np.zeros((k_states,))
initial_state_cov = np.eye(k_states)*100
# Initialization: cls.modelification
initial_state_cov = np.dot(
np.dot(cls.model.transition[:, :, 0], initial_state_cov),
cls.model.transition[:, :, 0].T
)
cls.model.initialize_known(initial_state, initial_state_cov)
Loading ...