Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_representation.py

"""
Tests for python wrapper of state space representation and filtering

Author: Chad Fulton
License: Simplified-BSD

References
----------

Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.
"""

import os
import warnings

import numpy as np
import pandas as pd
import pytest

from statsmodels.tsa.statespace.representation import Representation
from statsmodels.tsa.statespace.kalman_filter import (
    KalmanFilter, FilterResults, PredictionResults)
from statsmodels.tsa.statespace.simulation_smoother import SimulationSmoother
from statsmodels.tsa.statespace import tools, sarimax
from .results import results_kalman_filter
from numpy.testing import (
    assert_equal, assert_almost_equal, assert_raises, assert_allclose)

current_path = os.path.dirname(os.path.abspath(__file__))

clark1989_path = os.path.join('results', 'results_clark1989_R.csv')
clark1989_results = pd.read_csv(os.path.join(current_path, clark1989_path))


class Clark1987(object):
    """
    Clark's (1987) univariate unobserved components model of real GDP (as
    presented in Kim and Nelson, 1999)

    Test data produced using GAUSS code described in Kim and Nelson (1999) and
    found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm

    See `results.results_kalman_filter` for more information.
    """
    @classmethod
    def setup_class(cls, dtype=float, **kwargs):
        cls.true = results_kalman_filter.uc_uni
        cls.true_states = pd.DataFrame(cls.true['states'])

        # GDP, Quarterly, 1947.1 - 1995.3
        data = pd.DataFrame(
            cls.true['data'],
            index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
            columns=['GDP']
        )
        data['lgdp'] = np.log(data['GDP'])

        # Construct the statespace representation
        k_states = 4
        cls.model = KalmanFilter(k_endog=1, k_states=k_states, **kwargs)
        cls.model.bind(data['lgdp'].values)

        cls.model.design[:, :, 0] = [1, 1, 0, 0]
        cls.model.transition[([0, 0, 1, 1, 2, 3],
                              [0, 3, 1, 2, 1, 3],
                              [0, 0, 0, 0, 0, 0])] = [1, 1, 0, 0, 1, 1]
        cls.model.selection = np.eye(cls.model.k_states)

        # Update matrices with given parameters
        (sigma_v, sigma_e, sigma_w, phi_1, phi_2) = np.array(
            cls.true['parameters']
        )
        cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
        cls.model.state_cov[
            np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
            sigma_v**2, sigma_e**2, 0, sigma_w**2
        ]

        # Initialization
        initial_state = np.zeros((k_states,))
        initial_state_cov = np.eye(k_states)*100

        # Initialization: modification
        initial_state_cov = np.dot(
            np.dot(cls.model.transition[:, :, 0], initial_state_cov),
            cls.model.transition[:, :, 0].T
        )
        cls.model.initialize_known(initial_state, initial_state_cov)

    @classmethod
    def run_filter(cls):
        # Filter the data
        return cls.model.filter()

    def test_loglike(self):
        assert_almost_equal(
            self.results.llf_obs[self.true['start']:].sum(),
            self.true['loglike'], 5
        )

    def test_filtered_state(self):
        assert_almost_equal(
            self.results.filtered_state[0][self.true['start']:],
            self.true_states.iloc[:, 0], 4
        )
        assert_almost_equal(
            self.results.filtered_state[1][self.true['start']:],
            self.true_states.iloc[:, 1], 4
        )
        assert_almost_equal(
            self.results.filtered_state[3][self.true['start']:],
            self.true_states.iloc[:, 2], 4
        )


class TestClark1987Single(Clark1987):
    """
    Basic single precision test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        pytest.skip('Not implemented')
        super(TestClark1987Single, cls).setup_class(
            dtype=np.float32, conserve_memory=0
        )
        cls.results = cls.run_filter()


class TestClark1987Double(Clark1987):
    """
    Basic double precision test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987Double, cls).setup_class(
            dtype=float, conserve_memory=0
        )
        cls.results = cls.run_filter()


@pytest.mark.skip('Not implemented')
class TestClark1987SingleComplex(Clark1987):
    """
    Basic single precision complex test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987SingleComplex, cls).setup_class(
            dtype=np.complex64, conserve_memory=0
        )
        cls.results = cls.run_filter()


class TestClark1987DoubleComplex(Clark1987):
    """
    Basic double precision complex test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987DoubleComplex, cls).setup_class(
            dtype=complex, conserve_memory=0
        )
        cls.results = cls.run_filter()


class TestClark1987Conserve(Clark1987):
    """
    Memory conservation test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987Conserve, cls).setup_class(
            dtype=float, conserve_memory=0x01 | 0x02
        )
        cls.results = cls.run_filter()


class Clark1987Forecast(Clark1987):
    """
    Forecasting test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls, dtype=float, nforecast=100, conserve_memory=0):
        super(Clark1987Forecast, cls).setup_class(
            dtype=dtype, conserve_memory=conserve_memory
        )
        cls.nforecast = nforecast

        # Add missing observations to the end (to forecast)
        cls.model.endog = np.array(
            np.r_[cls.model.endog[0, :], [np.nan]*nforecast],
            ndmin=2, dtype=dtype, order="F"
        )
        cls.model.nobs = cls.model.endog.shape[1]

    def test_filtered_state(self):
        assert_almost_equal(
            self.results.filtered_state[0][self.true['start']:-self.nforecast],
            self.true_states.iloc[:, 0], 4
        )
        assert_almost_equal(
            self.results.filtered_state[1][self.true['start']:-self.nforecast],
            self.true_states.iloc[:, 1], 4
        )
        assert_almost_equal(
            self.results.filtered_state[3][self.true['start']:-self.nforecast],
            self.true_states.iloc[:, 2], 4
        )


class TestClark1987ForecastDouble(Clark1987Forecast):
    """
    Basic double forecasting test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987ForecastDouble, cls).setup_class()
        cls.results = cls.run_filter()


class TestClark1987ForecastDoubleComplex(Clark1987Forecast):
    """
    Basic double complex forecasting test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987ForecastDoubleComplex, cls).setup_class(
            dtype=complex
        )
        cls.results = cls.run_filter()


class TestClark1987ForecastConserve(Clark1987Forecast):
    """
    Memory conservation forecasting test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987ForecastConserve, cls).setup_class(
            dtype=float, conserve_memory=0x01 | 0x02
        )
        cls.results = cls.run_filter()


class TestClark1987ConserveAll(Clark1987):
    """
    Memory conservation forecasting test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987ConserveAll, cls).setup_class(
            dtype=float, conserve_memory=0x01 | 0x02 | 0x04 | 0x08
        )
        cls.model.loglikelihood_burn = cls.true['start']
        cls.results = cls.run_filter()

    def test_loglike(self):
        assert_almost_equal(
            self.results.llf, self.true['loglike'], 5
        )

    def test_filtered_state(self):
        end = self.true_states.shape[0]
        assert_almost_equal(
            self.results.filtered_state[0][-1],
            self.true_states.iloc[end-1, 0], 4
        )
        assert_almost_equal(
            self.results.filtered_state[1][-1],
            self.true_states.iloc[end-1, 1], 4
        )


class Clark1989(object):
    """
    Clark's (1989) bivariate unobserved components model of real GDP (as
    presented in Kim and Nelson, 1999)

    Tests two-dimensional observation data.

    Test data produced using GAUSS code described in Kim and Nelson (1999) and
    found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm

    See `results.results_kalman_filter` for more information.
    """
    @classmethod
    def setup_class(cls, dtype=float, **kwargs):
        cls.true = results_kalman_filter.uc_bi
        cls.true_states = pd.DataFrame(cls.true['states'])

        # GDP and Unemployment, Quarterly, 1948.1 - 1995.3
        data = pd.DataFrame(
            cls.true['data'],
            index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
            columns=['GDP', 'UNEMP']
        )[4:]
        data['GDP'] = np.log(data['GDP'])
        data['UNEMP'] = (data['UNEMP']/100)

        k_states = 6
        cls.model = KalmanFilter(k_endog=2, k_states=k_states, **kwargs)
        cls.model.bind(np.ascontiguousarray(data.values))

        # Statespace representation
        cls.model.design[:, :, 0] = [[1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1]]
        cls.model.transition[
            ([0, 0, 1, 1, 2, 3, 4, 5],
             [0, 4, 1, 2, 1, 2, 4, 5],
             [0, 0, 0, 0, 0, 0, 0, 0])
        ] = [1, 1, 0, 0, 1, 1, 1, 1]
        cls.model.selection = np.eye(cls.model.k_states)

        # Update matrices with given parameters
        (sigma_v, sigma_e, sigma_w, sigma_vl, sigma_ec,
         phi_1, phi_2, alpha_1, alpha_2, alpha_3) = np.array(
            cls.true['parameters'],
        )
        cls.model.design[([1, 1, 1], [1, 2, 3], [0, 0, 0])] = [
            alpha_1, alpha_2, alpha_3
        ]
        cls.model.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
        cls.model.obs_cov[1, 1, 0] = sigma_ec**2
        cls.model.state_cov[
            np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
            sigma_v**2, sigma_e**2, 0, 0, sigma_w**2, sigma_vl**2
        ]

        # Initialization
        initial_state = np.zeros((k_states,))
        initial_state_cov = np.eye(k_states)*100

        # Initialization: cls.modelification
        initial_state_cov = np.dot(
            np.dot(cls.model.transition[:, :, 0], initial_state_cov),
            cls.model.transition[:, :, 0].T
        )
        cls.model.initialize_known(initial_state, initial_state_cov)
Loading ...