Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_models.py

"""
Tests for miscellaneous models

Author: Chad Fulton
License: Simplified-BSD
"""

import numpy as np
import pandas as pd
import os
import pytest

from statsmodels.tsa.statespace import mlemodel, sarimax
from statsmodels import datasets

from numpy.testing import assert_equal, assert_allclose, assert_raises

current_path = os.path.dirname(os.path.abspath(__file__))


class Intercepts(mlemodel.MLEModel):
    """
    Test class for observation and state intercepts (which usually do not
    get tested in other models).
    """
    def __init__(self, endog, **kwargs):
        k_states = 3
        k_posdef = 3
        super(Intercepts, self).__init__(
            endog, k_states=k_states, k_posdef=k_posdef, **kwargs)
        self['design'] = np.eye(3)
        self['obs_cov'] = np.eye(3)
        self['transition'] = np.eye(3)
        self['selection'] = np.eye(3)
        self['state_cov'] = np.eye(3)
        self.initialize_approximate_diffuse()

    @property
    def param_names(self):
        return ['d.1', 'd.2', 'd.3', 'c.1', 'c.2', 'c.3']

    @property
    def start_params(self):
        return np.arange(6)

    def update(self, params, **kwargs):
        params = super(Intercepts, self).update(params, **kwargs)

        self['obs_intercept'] = params[:3]
        self['state_intercept'] = params[3:]


class TestIntercepts(object):
    @classmethod
    def setup_class(cls, which='mixed', **kwargs):
        # Results
        path = current_path + os.sep + 'results/results_intercepts_R.csv'
        cls.desired = pd.read_csv(path)

        # Data
        dta = datasets.macrodata.load_pandas().data
        dta.index = pd.date_range(start='1959-01-01', end='2009-7-01',
                                  freq='QS')
        obs = dta[['realgdp', 'realcons', 'realinv']].copy()
        obs = obs / obs.std()

        if which == 'all':
            obs.iloc[:50, :] = np.nan
            obs.iloc[119:130, :] = np.nan
        elif which == 'partial':
            obs.iloc[0:50, 0] = np.nan
            obs.iloc[119:130, 0] = np.nan
        elif which == 'mixed':
            obs.iloc[0:50, 0] = np.nan
            obs.iloc[19:70, 1] = np.nan
            obs.iloc[39:90, 2] = np.nan
            obs.iloc[119:130, 0] = np.nan
            obs.iloc[119:130, 2] = np.nan

        mod = Intercepts(obs, **kwargs)

        cls.params = np.arange(6) + 1
        cls.model = mod

        cls.results = mod.smooth(cls.params, return_ssm=True)

        # Calculate the determinant of the covariance matrices (for easy
        # comparison to other languages without having to store 2-dim arrays)
        cls.results.det_scaled_smoothed_estimator_cov = (
            np.zeros((1, cls.model.nobs)))
        cls.results.det_predicted_state_cov = np.zeros((1, cls.model.nobs))
        cls.results.det_smoothed_state_cov = np.zeros((1, cls.model.nobs))
        cls.results.det_smoothed_state_disturbance_cov = (
            np.zeros((1, cls.model.nobs)))

        for i in range(cls.model.nobs):
            cls.results.det_scaled_smoothed_estimator_cov[0, i] = (
                np.linalg.det(
                    cls.results.scaled_smoothed_estimator_cov[:, :, i]))
            cls.results.det_predicted_state_cov[0, i] = np.linalg.det(
                cls.results.predicted_state_cov[:, :, i+1])
            cls.results.det_smoothed_state_cov[0, i] = np.linalg.det(
                cls.results.smoothed_state_cov[:, :, i])
            cls.results.det_smoothed_state_disturbance_cov[0, i] = (
                np.linalg.det(
                    cls.results.smoothed_state_disturbance_cov[:, :, i]))

    def test_loglike(self):
        assert_allclose(np.sum(self.results.llf_obs), -7924.03893566)

    def test_scaled_smoothed_estimator(self):
        assert_allclose(
            self.results.scaled_smoothed_estimator.T,
            self.desired[['r1', 'r2', 'r3']]
        )

    def test_scaled_smoothed_estimator_cov(self):
        assert_allclose(
            self.results.det_scaled_smoothed_estimator_cov.T,
            self.desired[['detN']]
        )

    def test_forecasts(self):
        assert_allclose(
            self.results.forecasts.T,
            self.desired[['m1', 'm2', 'm3']]
        )

    def test_forecasts_error(self):
        assert_allclose(
            self.results.forecasts_error.T,
            self.desired[['v1', 'v2', 'v3']]
        )

    def test_forecasts_error_cov(self):
        assert_allclose(
            self.results.forecasts_error_cov.diagonal(),
            self.desired[['F1', 'F2', 'F3']]
        )

    def test_predicted_states(self):
        assert_allclose(
            self.results.predicted_state[:, 1:].T,
            self.desired[['a1', 'a2', 'a3']]
        )

    def test_predicted_states_cov(self):
        assert_allclose(
            self.results.det_predicted_state_cov.T,
            self.desired[['detP']]
        )

    def test_smoothed_states(self):
        assert_allclose(
            self.results.smoothed_state.T,
            self.desired[['alphahat1', 'alphahat2', 'alphahat3']]
        )

    def test_smoothed_states_cov(self):
        assert_allclose(
            self.results.det_smoothed_state_cov.T,
            self.desired[['detV']]
        )

    def test_smoothed_forecasts(self):
        assert_allclose(
            self.results.smoothed_forecasts.T,
            self.desired[['muhat1', 'muhat2', 'muhat3']]
        )

    def test_smoothed_state_disturbance(self):
        assert_allclose(
            self.results.smoothed_state_disturbance.T,
            self.desired[['etahat1', 'etahat2', 'etahat3']]
        )

    def test_smoothed_state_disturbance_cov(self):
        assert_allclose(
            self.results.det_smoothed_state_disturbance_cov.T,
            self.desired[['detVeta']]
        )

    def test_smoothed_measurement_disturbance(self):
        assert_allclose(
            self.results.smoothed_measurement_disturbance.T,
            self.desired[['epshat1', 'epshat2', 'epshat3']], atol=1e-9
        )

    def test_smoothed_measurement_disturbance_cov(self):
        assert_allclose(
            self.results.smoothed_measurement_disturbance_cov.diagonal(),
            self.desired[['Veps1', 'Veps2', 'Veps3']]
        )


class LargeStateCovAR1(mlemodel.MLEModel):
    """
    Test class for k_posdef > k_states (which usually do not get tested in
    other models).

    This is just an AR(1) model with an extra unused state innovation
    """
    def __init__(self, endog, **kwargs):
        k_states = 1
        k_posdef = 2
        super(LargeStateCovAR1, self).__init__(
            endog, k_states=k_states, k_posdef=k_posdef, **kwargs)
        self['design', 0, 0] = 1
        self['selection', 0, 0] = 1
        self['state_cov', 1, 1] = 1
        self.initialize_stationary()

    @property
    def param_names(self):
        return ['phi', 'sigma2']

    @property
    def start_params(self):
        return [0.5, 1]

    def update(self, params, **kwargs):
        params = super(LargeStateCovAR1, self).update(params, **kwargs)

        self['transition', 0, 0] = params[0]
        self['state_cov', 0, 0] = params[1]


def test_large_kposdef():
    assert_raises(ValueError, LargeStateCovAR1, np.arange(10))


class TestLargeStateCovAR1(object):
    @classmethod
    def setup_class(cls):
        pytest.skip(
            'TODO: This test is skipped since an exception is currently '
            'raised if k_posdef > k_states. However, this test could be '
            'used if models of those types were allowed'
        )

        # Data: just some sample data
        endog = [0.2, -1.5, -.3, -.1, 1.5, 0.2, -0.3, 0.2, 0.5, 0.8]

        # Params
        params = [0.5, 1]

        # Desired model: AR(1)
        mod_desired = sarimax.SARIMAX(endog)
        cls.res_desired = mod_desired.smooth(params)

        # Test class
        mod = LargeStateCovAR1(endog)
        cls.res = mod.smooth(params)

    def test_dimensions(self):
        assert_equal(self.res.filter_results.k_states, 1)
        assert_equal(self.res.filter_results.k_posdef, 2)
        assert_equal(self.res.smoothed_state_disturbance.shape, (2, 10))

        assert_equal(self.res_desired.filter_results.k_states, 1)
        assert_equal(self.res_desired.filter_results.k_posdef, 1)
        assert_equal(self.res_desired.smoothed_state_disturbance.shape,
                     (1, 10))

    def test_loglike(self):
        assert_allclose(self.res.llf_obs, self.res_desired.llf_obs)

    def test_scaled_smoothed_estimator(self):
        assert_allclose(self.res.scaled_smoothed_estimator[0],
                        self.res_desired.scaled_smoothed_estimator[0])

    def test_scaled_smoothed_estimator_cov(self):
        assert_allclose(self.res.scaled_smoothed_estimator_cov[0],
                        self.res_desired.scaled_smoothed_estimator_cov[0])

    def test_forecasts(self):
        assert_allclose(self.res.forecasts, self.res_desired.forecasts)

    def test_forecasts_error(self):
        assert_allclose(self.res.forecasts_error,
                        self.res_desired.forecasts_error)

    def test_forecasts_error_cov(self):
        assert_allclose(self.res.forecasts_error_cov,
                        self.res_desired.forecasts_error_cov)

    def test_predicted_states(self):
        assert_allclose(self.res.predicted_state[0],
                        self.res_desired.predicted_state[0])

    def test_predicted_states_cov(self):
        assert_allclose(self.res.predicted_state_cov[0, 0],
                        self.res_desired.predicted_state_cov[0, 0])

    def test_smoothed_states(self):
        assert_allclose(self.res.smoothed_state[0],
                        self.res_desired.smoothed_state[0])

    def test_smoothed_states_cov(self):
        assert_allclose(self.res.smoothed_state_cov[0, 0],
                        self.res_desired.smoothed_state_cov[0, 0])

    def test_smoothed_state_disturbance(self):
        assert_allclose(self.res.smoothed_state_disturbance[0],
                        self.res_desired.smoothed_state_disturbance[0])
        assert_allclose(self.res.smoothed_state_disturbance[1], 0)

    def test_smoothed_state_disturbance_cov(self):
        assert_allclose(self.res.smoothed_state_disturbance_cov[0, 0],
                        self.res_desired.smoothed_state_disturbance_cov[0, 0])
        assert_allclose(self.res.smoothed_state_disturbance[1, 1], 0)

    def test_smoothed_measurement_disturbance(self):
        assert_allclose(self.res.smoothed_measurement_disturbance,
                        self.res_desired.smoothed_measurement_disturbance)

    def test_smoothed_measurement_disturbance_cov(self):
        assert_allclose(self.res.smoothed_measurement_disturbance_cov,
                        self.res_desired.smoothed_measurement_disturbance_cov)