"""
Tests for VARMAX models
Author: Chad Fulton
License: Simplified-BSD
"""
import os
import re
import warnings
import numpy as np
from numpy.testing import assert_equal, assert_raises, assert_allclose
import pandas as pd
import pytest
from statsmodels.tsa.statespace import dynamic_factor
from .results import results_varmax, results_dynamic_factor
from statsmodels.iolib.summary import forg
current_path = os.path.dirname(os.path.abspath(__file__))
output_path = os.path.join('results', 'results_dynamic_factor_stata.csv')
output_results = pd.read_csv(os.path.join(current_path, output_path))
class CheckDynamicFactor(object):
@classmethod
def setup_class(cls, true, k_factors, factor_order, cov_type='approx',
included_vars=['dln_inv', 'dln_inc', 'dln_consump'],
demean=False, filter=True, **kwargs):
cls.true = true
# 1960:Q1 - 1982:Q4
dta = pd.DataFrame(
results_varmax.lutkepohl_data, columns=['inv', 'inc', 'consump'],
index=pd.date_range('1960-01-01', '1982-10-01', freq='QS'))
dta['dln_inv'] = np.log(dta['inv']).diff()
dta['dln_inc'] = np.log(dta['inc']).diff()
dta['dln_consump'] = np.log(dta['consump']).diff()
endog = dta.loc['1960-04-01':'1978-10-01', included_vars]
if demean:
endog -= dta.iloc[1:][included_vars].mean()
cls.model = dynamic_factor.DynamicFactor(endog, k_factors=k_factors,
factor_order=factor_order,
**kwargs)
if filter:
cls.results = cls.model.smooth(true['params'], cov_type=cov_type)
def test_params(self):
# Smoke test to make sure the start_params are well-defined and
# lead to a well-defined model
self.model.filter(self.model.start_params)
# Similarly a smoke test for param_names
assert_equal(len(self.model.start_params), len(self.model.param_names))
# Finally make sure the transform and untransform do their job
actual = self.model.transform_params(
self.model.untransform_params(self.model.start_params))
assert_allclose(actual, self.model.start_params)
# Also in the case of enforce stationarity = False
self.model.enforce_stationarity = False
actual = self.model.transform_params(
self.model.untransform_params(self.model.start_params))
self.model.enforce_stationarity = True
assert_allclose(actual, self.model.start_params)
def test_results(self, close_figures):
# Smoke test for creating the summary
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.results.summary()
# Test cofficient matrix creation
# (via a different, more direct, method)
if self.model.factor_order > 0:
model = self.model
k_factors = model.k_factors
pft_params = self.results.params[model._params_factor_transition]
coefficients = np.array(pft_params).reshape(
k_factors, k_factors * model.factor_order)
coefficient_matrices = np.array([
coefficients[:self.model.k_factors,
i*self.model.k_factors:(i+1)*self.model.k_factors]
for i in range(self.model.factor_order)
])
assert_equal(
self.results.coefficient_matrices_var,
coefficient_matrices)
else:
assert_equal(self.results.coefficient_matrices_var, None)
@pytest.mark.matplotlib
def test_plot_coefficients_of_determination(self, close_figures):
# Smoke test for plot_coefficients_of_determination
self.results.plot_coefficients_of_determination()
def test_no_enforce(self):
return
# Test that nothing goes wrong when we do not enforce stationarity
params = self.model.untransform_params(self.true['params'])
params[self.model._params_transition] = (
self.true['params'][self.model._params_transition])
self.model.enforce_stationarity = False
results = self.model.filter(params, transformed=False)
self.model.enforce_stationarity = True
assert_allclose(results.llf, self.results.llf, rtol=1e-5)
def test_mle(self, init_powell=True):
with warnings.catch_warnings(record=True):
warnings.simplefilter('always')
start_params = self.model.start_params
if init_powell:
results = self.model.fit(method='powell',
maxiter=100, disp=False)
start_params = results.params
results = self.model.fit(start_params, maxiter=1000, disp=False)
results = self.model.fit(results.params, method='nm', maxiter=1000,
disp=False)
if not results.llf > self.results.llf:
assert_allclose(results.llf, self.results.llf, rtol=1e-5)
def test_loglike(self):
assert_allclose(self.results.llf, self.true['loglike'], rtol=1e-6)
def test_aic(self):
# We only get 3 digits from Stata
assert_allclose(self.results.aic, self.true['aic'], atol=3)
def test_bic(self):
# We only get 3 digits from Stata
assert_allclose(self.results.bic, self.true['bic'], atol=3)
def test_predict(self, **kwargs):
# Tests predict + forecast
assert_allclose(
self.results.predict(end='1982-10-01', **kwargs),
self.true['predict'],
atol=1e-6)
def test_dynamic_predict(self, **kwargs):
# Tests predict + dynamic predict + forecast
assert_allclose(
self.results.predict(end='1982-10-01', dynamic='1961-01-01',
**kwargs),
self.true['dynamic_predict'],
atol=1e-6)
class TestDynamicFactor(CheckDynamicFactor):
"""
Test for a dynamic factor model with 1 AR(2) factor
"""
@classmethod
def setup_class(cls):
true = results_dynamic_factor.lutkepohl_dfm.copy()
true['predict'] = output_results.iloc[1:][[
'predict_dfm_1', 'predict_dfm_2', 'predict_dfm_3']]
true['dynamic_predict'] = output_results.iloc[1:][[
'dyn_predict_dfm_1', 'dyn_predict_dfm_2', 'dyn_predict_dfm_3']]
super(TestDynamicFactor, cls).setup_class(
true, k_factors=1, factor_order=2)
def test_bse_approx(self):
bse = self.results._cov_params_approx().diagonal()**0.5
assert_allclose(bse, self.true['bse_oim'], atol=1e-5)
class TestDynamicFactor2(CheckDynamicFactor):
"""
Test for a dynamic factor model with two VAR(1) factors
"""
@classmethod
def setup_class(cls):
true = results_dynamic_factor.lutkepohl_dfm2.copy()
true['predict'] = output_results.iloc[1:][[
'predict_dfm2_1', 'predict_dfm2_2', 'predict_dfm2_3']]
true['dynamic_predict'] = output_results.iloc[1:][[
'dyn_predict_dfm2_1', 'dyn_predict_dfm2_2', 'dyn_predict_dfm2_3']]
super(TestDynamicFactor2, cls).setup_class(
true, k_factors=2, factor_order=1)
def test_mle(self):
# Stata's MLE on this model does not converge, so no reason to check
pass
def test_bse(self):
# Stata's MLE on this model does not converge, and four of their
# params do not even have bse (possibly they are still at starting
# values?), so no reason to check this
pass
def test_aic(self):
# Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
# model did not coverge, 4 of the parameters are not fully estimated
# (possibly they are still at starting values?) so the AIC is off
pass
def test_bic(self):
# Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
# model did not coverge, 4 of the parameters are not fully estimated
# (possibly they are still at starting values?) so the BIC is off
pass
def test_summary(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
summary = self.results.summary()
tables = [str(table) for table in summary.tables]
params = self.true['params']
# Make sure we have the right number of tables
assert_equal(
len(tables),
2 + self.model.k_endog + self.model.k_factors + 1)
# Check the model overview table
assert re.search(
r'Model:.*DynamicFactor\(factors=2, order=1\)',
tables[0])
# For each endogenous variable, check the output
for i in range(self.model.k_endog):
offset_loading = self.model.k_factors * i
table = tables[i + 2]
# -> Make sure we have the right table / table name
name = self.model.endog_names[i]
assert re.search('Results for equation %s' % name, table)
# -> Make sure it's the right size
assert_equal(len(table.split('\n')), 7)
# -> Check that we have the right coefficients
assert re.search(
'loading.f1 +' + forg(params[offset_loading + 0], prec=4),
table)
assert re.search(
'loading.f2 +' + forg(params[offset_loading + 1], prec=4),
table)
# For each factor, check the output
for i in range(self.model.k_factors):
offset = (self.model.k_endog * (self.model.k_factors + 1) +
i * self.model.k_factors)
table = tables[self.model.k_endog + i + 2]
# -> Make sure we have the right table / table name
name = self.model.endog_names[i]
assert re.search('Results for factor equation f%d' % (i+1), table)
# -> Make sure it's the right size
assert_equal(len(table.split('\n')), 7)
# -> Check that we have the right coefficients
assert re.search('L1.f1 +' + forg(params[offset + 0], prec=4),
table)
assert re.search('L1.f2 +' + forg(params[offset + 1], prec=4),
table)
# Check the Error covariance matrix output
table = tables[2 + self.model.k_endog + self.model.k_factors]
# -> Make sure we have the right table / table name
name = self.model.endog_names[i]
assert re.search('Error covariance matrix', table)
# -> Make sure it's the right size
assert_equal(len(table.split('\n')), 8)
# -> Check that we have the right coefficients
offset = self.model.k_endog * self.model.k_factors
for i in range(self.model.k_endog):
iname = self.model.endog_names[i]
iparam = forg(params[offset + i], prec=4)
assert re.search('sigma2.%s +%s' % (iname, iparam), table)
class TestDynamicFactor_exog1(CheckDynamicFactor):
"""
Test for a dynamic factor model with 1 exogenous regressor: a constant
"""
@classmethod
def setup_class(cls):
true = results_dynamic_factor.lutkepohl_dfm_exog1.copy()
true['predict'] = output_results.iloc[1:][[
'predict_dfm_exog1_1',
'predict_dfm_exog1_2',
'predict_dfm_exog1_3']]
true['dynamic_predict'] = output_results.iloc[1:][[
'dyn_predict_dfm_exog1_1',
'dyn_predict_dfm_exog1_2',
'dyn_predict_dfm_exog1_3']]
exog = np.ones((75, 1))
super(TestDynamicFactor_exog1, cls).setup_class(
true, k_factors=1, factor_order=1, exog=exog)
def test_predict(self):
exog = np.ones((16, 1))
super(TestDynamicFactor_exog1, self).test_predict(exog=exog)
def test_dynamic_predict(self):
exog = np.ones((16, 1))
super(TestDynamicFactor_exog1, self).test_dynamic_predict(exog=exog)
def test_bse_approx(self):
bse = self.results._cov_params_approx().diagonal()**0.5
assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)
class TestDynamicFactor_exog2(CheckDynamicFactor):
"""
Test for a dynamic factor model with 2 exogenous regressors: a constant
and a time-trend
"""
@classmethod
def setup_class(cls):
true = results_dynamic_factor.lutkepohl_dfm_exog2.copy()
true['predict'] = output_results.iloc[1:][[
'predict_dfm_exog2_1',
'predict_dfm_exog2_2',
'predict_dfm_exog2_3']]
true['dynamic_predict'] = output_results.iloc[1:][[
'dyn_predict_dfm_exog2_1',
'dyn_predict_dfm_exog2_2',
'dyn_predict_dfm_exog2_3']]
exog = np.c_[np.ones((75, 1)), (np.arange(75) + 2)[:, np.newaxis]]
super(TestDynamicFactor_exog2, cls).setup_class(
true, k_factors=1, factor_order=1, exog=exog)
def test_bse_approx(self):
bse = self.results._cov_params_approx().diagonal()**0.5
assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)
def test_predict(self):
exog = np.c_[np.ones((16, 1)),
(np.arange(75, 75+16) + 2)[:, np.newaxis]]
super(TestDynamicFactor_exog2, self).test_predict(exog=exog)
def test_dynamic_predict(self):
exog = np.c_[np.ones((16, 1)),
(np.arange(75, 75+16) + 2)[:, np.newaxis]]
super(TestDynamicFactor_exog2, self).test_dynamic_predict(exog=exog)
Loading ...