Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_dynamic_factor.py

"""
Tests for VARMAX models

Author: Chad Fulton
License: Simplified-BSD
"""
import os
import re
import warnings

import numpy as np
from numpy.testing import assert_equal, assert_raises, assert_allclose
import pandas as pd
import pytest

from statsmodels.tsa.statespace import dynamic_factor
from .results import results_varmax, results_dynamic_factor
from statsmodels.iolib.summary import forg

current_path = os.path.dirname(os.path.abspath(__file__))

output_path = os.path.join('results', 'results_dynamic_factor_stata.csv')
output_results = pd.read_csv(os.path.join(current_path, output_path))


class CheckDynamicFactor(object):
    @classmethod
    def setup_class(cls, true, k_factors, factor_order, cov_type='approx',
                    included_vars=['dln_inv', 'dln_inc', 'dln_consump'],
                    demean=False, filter=True, **kwargs):
        cls.true = true
        # 1960:Q1 - 1982:Q4
        dta = pd.DataFrame(
            results_varmax.lutkepohl_data, columns=['inv', 'inc', 'consump'],
            index=pd.date_range('1960-01-01', '1982-10-01', freq='QS'))

        dta['dln_inv'] = np.log(dta['inv']).diff()
        dta['dln_inc'] = np.log(dta['inc']).diff()
        dta['dln_consump'] = np.log(dta['consump']).diff()

        endog = dta.loc['1960-04-01':'1978-10-01', included_vars]

        if demean:
            endog -= dta.iloc[1:][included_vars].mean()

        cls.model = dynamic_factor.DynamicFactor(endog, k_factors=k_factors,
                                                 factor_order=factor_order,
                                                 **kwargs)

        if filter:
            cls.results = cls.model.smooth(true['params'], cov_type=cov_type)

    def test_params(self):
        # Smoke test to make sure the start_params are well-defined and
        # lead to a well-defined model
        self.model.filter(self.model.start_params)
        # Similarly a smoke test for param_names
        assert_equal(len(self.model.start_params), len(self.model.param_names))
        # Finally make sure the transform and untransform do their job
        actual = self.model.transform_params(
            self.model.untransform_params(self.model.start_params))
        assert_allclose(actual, self.model.start_params)
        # Also in the case of enforce stationarity = False
        self.model.enforce_stationarity = False
        actual = self.model.transform_params(
            self.model.untransform_params(self.model.start_params))
        self.model.enforce_stationarity = True
        assert_allclose(actual, self.model.start_params)

    def test_results(self, close_figures):
        # Smoke test for creating the summary
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            self.results.summary()

        # Test cofficient matrix creation
        #  (via a different, more direct, method)
        if self.model.factor_order > 0:
            model = self.model
            k_factors = model.k_factors
            pft_params = self.results.params[model._params_factor_transition]
            coefficients = np.array(pft_params).reshape(
                k_factors, k_factors * model.factor_order)
            coefficient_matrices = np.array([
                coefficients[:self.model.k_factors,
                             i*self.model.k_factors:(i+1)*self.model.k_factors]
                for i in range(self.model.factor_order)
            ])
            assert_equal(
                self.results.coefficient_matrices_var,
                coefficient_matrices)
        else:
            assert_equal(self.results.coefficient_matrices_var, None)

    @pytest.mark.matplotlib
    def test_plot_coefficients_of_determination(self, close_figures):
        # Smoke test for plot_coefficients_of_determination
        self.results.plot_coefficients_of_determination()

    def test_no_enforce(self):
        return
        # Test that nothing goes wrong when we do not enforce stationarity
        params = self.model.untransform_params(self.true['params'])
        params[self.model._params_transition] = (
            self.true['params'][self.model._params_transition])
        self.model.enforce_stationarity = False
        results = self.model.filter(params, transformed=False)
        self.model.enforce_stationarity = True
        assert_allclose(results.llf, self.results.llf, rtol=1e-5)

    def test_mle(self, init_powell=True):
        with warnings.catch_warnings(record=True):
            warnings.simplefilter('always')
            start_params = self.model.start_params
            if init_powell:
                results = self.model.fit(method='powell',
                                         maxiter=100, disp=False)
                start_params = results.params
            results = self.model.fit(start_params, maxiter=1000, disp=False)
            results = self.model.fit(results.params, method='nm', maxiter=1000,
                                     disp=False)
            if not results.llf > self.results.llf:
                assert_allclose(results.llf, self.results.llf, rtol=1e-5)

    def test_loglike(self):
        assert_allclose(self.results.llf, self.true['loglike'], rtol=1e-6)

    def test_aic(self):
        # We only get 3 digits from Stata
        assert_allclose(self.results.aic, self.true['aic'], atol=3)

    def test_bic(self):
        # We only get 3 digits from Stata
        assert_allclose(self.results.bic, self.true['bic'], atol=3)

    def test_predict(self, **kwargs):
        # Tests predict + forecast
        assert_allclose(
            self.results.predict(end='1982-10-01', **kwargs),
            self.true['predict'],
            atol=1e-6)

    def test_dynamic_predict(self, **kwargs):
        # Tests predict + dynamic predict + forecast
        assert_allclose(
            self.results.predict(end='1982-10-01', dynamic='1961-01-01',
                                 **kwargs),
            self.true['dynamic_predict'],
            atol=1e-6)


class TestDynamicFactor(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 1 AR(2) factor
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_1', 'predict_dfm_2', 'predict_dfm_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_1', 'dyn_predict_dfm_2', 'dyn_predict_dfm_3']]
        super(TestDynamicFactor, cls).setup_class(
            true, k_factors=1, factor_order=2)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse, self.true['bse_oim'], atol=1e-5)


class TestDynamicFactor2(CheckDynamicFactor):
    """
    Test for a dynamic factor model with two VAR(1) factors
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm2.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm2_1', 'predict_dfm2_2', 'predict_dfm2_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm2_1', 'dyn_predict_dfm2_2', 'dyn_predict_dfm2_3']]
        super(TestDynamicFactor2, cls).setup_class(
            true, k_factors=2, factor_order=1)

    def test_mle(self):
        # Stata's MLE on this model does not converge, so no reason to check
        pass

    def test_bse(self):
        # Stata's MLE on this model does not converge, and four of their
        # params do not even have bse (possibly they are still at starting
        # values?), so no reason to check this
        pass

    def test_aic(self):
        # Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
        # model did not coverge, 4 of the parameters are not fully estimated
        # (possibly they are still at starting values?) so the AIC is off
        pass

    def test_bic(self):
        # Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
        # model did not coverge, 4 of the parameters are not fully estimated
        # (possibly they are still at starting values?) so the BIC is off
        pass

    def test_summary(self):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            summary = self.results.summary()
        tables = [str(table) for table in summary.tables]
        params = self.true['params']

        # Make sure we have the right number of tables
        assert_equal(
            len(tables),
            2 + self.model.k_endog + self.model.k_factors + 1)

        # Check the model overview table
        assert re.search(
            r'Model:.*DynamicFactor\(factors=2, order=1\)',
            tables[0])

        # For each endogenous variable, check the output
        for i in range(self.model.k_endog):
            offset_loading = self.model.k_factors * i
            table = tables[i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for equation %s' % name, table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 7)

            # -> Check that we have the right coefficients
            assert re.search(
                'loading.f1 +' + forg(params[offset_loading + 0], prec=4),
                table)
            assert re.search(
                'loading.f2 +' + forg(params[offset_loading + 1], prec=4),
                table)

        # For each factor, check the output
        for i in range(self.model.k_factors):
            offset = (self.model.k_endog * (self.model.k_factors + 1) +
                      i * self.model.k_factors)
            table = tables[self.model.k_endog + i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for factor equation f%d' % (i+1), table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 7)

            # -> Check that we have the right coefficients
            assert re.search('L1.f1 +' + forg(params[offset + 0], prec=4),
                             table)
            assert re.search('L1.f2 +' + forg(params[offset + 1], prec=4),
                             table)

        # Check the Error covariance matrix output
        table = tables[2 + self.model.k_endog + self.model.k_factors]

        # -> Make sure we have the right table / table name
        name = self.model.endog_names[i]
        assert re.search('Error covariance matrix', table)

        # -> Make sure it's the right size
        assert_equal(len(table.split('\n')), 8)

        # -> Check that we have the right coefficients
        offset = self.model.k_endog * self.model.k_factors
        for i in range(self.model.k_endog):
            iname = self.model.endog_names[i]
            iparam = forg(params[offset + i], prec=4)
            assert re.search('sigma2.%s +%s' % (iname, iparam), table)


class TestDynamicFactor_exog1(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 1 exogenous regressor: a constant
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_exog1.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_exog1_1',
            'predict_dfm_exog1_2',
            'predict_dfm_exog1_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_exog1_1',
            'dyn_predict_dfm_exog1_2',
            'dyn_predict_dfm_exog1_3']]
        exog = np.ones((75, 1))
        super(TestDynamicFactor_exog1, cls).setup_class(
            true, k_factors=1, factor_order=1, exog=exog)

    def test_predict(self):
        exog = np.ones((16, 1))
        super(TestDynamicFactor_exog1, self).test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.ones((16, 1))
        super(TestDynamicFactor_exog1, self).test_dynamic_predict(exog=exog)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)


class TestDynamicFactor_exog2(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 2 exogenous regressors: a constant
    and a time-trend
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_exog2.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_exog2_1',
            'predict_dfm_exog2_2',
            'predict_dfm_exog2_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_exog2_1',
            'dyn_predict_dfm_exog2_2',
            'dyn_predict_dfm_exog2_3']]
        exog = np.c_[np.ones((75, 1)), (np.arange(75) + 2)[:, np.newaxis]]
        super(TestDynamicFactor_exog2, cls).setup_class(
            true, k_factors=1, factor_order=1, exog=exog)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)

    def test_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super(TestDynamicFactor_exog2, self).test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super(TestDynamicFactor_exog2, self).test_dynamic_predict(exog=exog)
Loading ...