Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_kalman.py

"""
Tests for _representation and _kalman_filter modules

Author: Chad Fulton
License: Simplified-BSD

References
----------

Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.

Hamilton, James D. 1994.
Time Series Analysis.
Princeton, N.J.: Princeton University Press.
"""
import copy
import pickle

import numpy as np
import pandas as pd
import os
import pytest

from scipy.linalg.blas import find_best_blas_type
from scipy.linalg import solve_discrete_lyapunov
from statsmodels.tsa.statespace.kalman_filter import (
    MEMORY_NO_FORECAST, MEMORY_NO_PREDICTED, MEMORY_CONSERVE)
from statsmodels.tsa.statespace.mlemodel import MLEModel
from statsmodels.tsa.statespace import _representation, _kalman_filter
from .results import results_kalman_filter
from numpy.testing import assert_almost_equal, assert_allclose

prefix_statespace_map = {
    's': _representation.sStatespace, 'd': _representation.dStatespace,
    'c': _representation.cStatespace, 'z': _representation.zStatespace
}
prefix_kalman_filter_map = {
    's': _kalman_filter.sKalmanFilter, 'd': _kalman_filter.dKalmanFilter,
    'c': _kalman_filter.cKalmanFilter, 'z': _kalman_filter.zKalmanFilter
}

current_path = os.path.dirname(os.path.abspath(__file__))


class Clark1987(object):
    """
    Clark's (1987) univariate unobserved components model of real GDP (as
    presented in Kim and Nelson, 1999)

    Test data produced using GAUSS code described in Kim and Nelson (1999) and
    found at http://econ.korea.ac.kr/~cjkim/SSMARKOV.htm

    See `results.results_kalman_filter` for more information.
    """
    @classmethod
    def setup_class(cls, dtype=float, conserve_memory=0, loglikelihood_burn=0):
        cls.true = results_kalman_filter.uc_uni
        cls.true_states = pd.DataFrame(cls.true['states'])

        # GDP, Quarterly, 1947.1 - 1995.3
        data = pd.DataFrame(
            cls.true['data'],
            index=pd.date_range('1947-01-01', '1995-07-01', freq='QS'),
            columns=['GDP']
        )
        data['lgdp'] = np.log(data['GDP'])

        # Parameters
        cls.conserve_memory = conserve_memory
        cls.loglikelihood_burn = loglikelihood_burn

        # Observed data
        cls.obs = np.array(data['lgdp'], ndmin=2, dtype=dtype, order="F")

        # Measurement equation
        cls.k_endog = k_endog = 1  # dimension of observed data
        # design matrix
        cls.design = np.zeros((k_endog, 4, 1), dtype=dtype, order="F")
        cls.design[:, :, 0] = [1, 1, 0, 0]
        # observation intercept
        cls.obs_intercept = np.zeros((k_endog, 1), dtype=dtype, order="F")
        # observation covariance matrix
        cls.obs_cov = np.zeros((k_endog, k_endog, 1), dtype=dtype, order="F")

        # Transition equation
        cls.k_states = k_states = 4  # dimension of state space
        # transition matrix
        cls.transition = np.zeros((k_states, k_states, 1),
                                  dtype=dtype, order="F")
        cls.transition[([0, 0, 1, 1, 2, 3],
                        [0, 3, 1, 2, 1, 3],
                        [0, 0, 0, 0, 0, 0])] = [1, 1, 0, 0, 1, 1]
        # state intercept
        cls.state_intercept = np.zeros((k_states, 1), dtype=dtype, order="F")
        # selection matrix
        cls.selection = np.asfortranarray(np.eye(k_states)[:, :, None],
                                          dtype=dtype)
        # state covariance matrix
        cls.state_cov = np.zeros((k_states, k_states, 1),
                                 dtype=dtype, order="F")

        # Initialization: Diffuse priors
        cls.initial_state = np.zeros((k_states,), dtype=dtype, order="F")
        cls.initial_state_cov = np.asfortranarray(np.eye(k_states)*100,
                                                  dtype=dtype)

        # Update matrices with given parameters
        (sigma_v, sigma_e, sigma_w, phi_1, phi_2) = np.array(
            cls.true['parameters'], dtype=dtype
        )
        cls.transition[([1, 1], [1, 2], [0, 0])] = [phi_1, phi_2]
        cls.state_cov[
            np.diag_indices(k_states)+(np.zeros(k_states, dtype=int),)] = [
            sigma_v**2, sigma_e**2, 0, sigma_w**2
        ]

        # Initialization: modification
        # Due to the difference in the way Kim and Nelson (1999) and Durbin
        # and Koopman (2012) define the order of the Kalman filter routines,
        # we need to modify the initial state covariance matrix to match
        # Kim and Nelson's results, since the *Statespace models follow Durbin
        # and Koopman.
        cls.initial_state_cov = np.asfortranarray(
            np.dot(
                np.dot(cls.transition[:, :, 0], cls.initial_state_cov),
                cls.transition[:, :, 0].T
            )
        )

    @classmethod
    def init_filter(cls):
        # Use the appropriate Statespace model
        prefix = find_best_blas_type((cls.obs,))
        klass = prefix_statespace_map[prefix[0]]

        # Instantiate the statespace model
        model = klass(
            cls.obs, cls.design, cls.obs_intercept, cls.obs_cov,
            cls.transition, cls.state_intercept, cls.selection,
            cls.state_cov
        )
        model.initialize_known(cls.initial_state, cls.initial_state_cov)

        # Initialize the appropriate Kalman filter
        klass = prefix_kalman_filter_map[prefix[0]]
        kfilter = klass(model, conserve_memory=cls.conserve_memory,
                        loglikelihood_burn=cls.loglikelihood_burn)

        return model, kfilter

    @classmethod
    def run_filter(cls):
        # Filter the data
        cls.filter()

        # Get results
        return {
            'loglike': lambda burn: np.sum(cls.filter.loglikelihood[burn:]),
            'state': np.array(cls.filter.filtered_state),
        }

    def test_loglike(self):
        assert_almost_equal(
            self.result['loglike'](self.true['start']), self.true['loglike'], 5
        )

    def test_filtered_state(self):
        assert_almost_equal(
            self.result['state'][0][self.true['start']:],
            self.true_states.iloc[:, 0], 4
        )
        assert_almost_equal(
            self.result['state'][1][self.true['start']:],
            self.true_states.iloc[:, 1], 4
        )
        assert_almost_equal(
            self.result['state'][3][self.true['start']:],
            self.true_states.iloc[:, 2], 4
        )

    def test_pickled_filter(self):
        pickled = pickle.loads(pickle.dumps(self.filter))
        #  Run the filters
        self.filter()
        pickled()

        assert id(filter) != id(pickled)
        assert_allclose(np.array(self.filter.filtered_state),
                        np.array(pickled.filtered_state))
        assert_allclose(np.array(self.filter.loglikelihood),
                        np.array(pickled.loglikelihood))

    def test_copied_filter(self):
        copied = copy.deepcopy(self.filter)
        #  Run the filters
        self.filter()
        copied()

        assert id(filter) != id(copied)
        assert_allclose(np.array(self.filter.filtered_state),
                        np.array(copied.filtered_state))

        assert_allclose(np.array(self.filter.loglikelihood),
                        np.array(copied.loglikelihood))


class TestClark1987Single(Clark1987):
    """
    Basic single precision test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        # TODO: Can we be more specific?  How can a contributor help?
        pytest.skip('Not implemented')
        super(TestClark1987Single, cls).setup_class(
            dtype=np.float32, conserve_memory=0
        )
        cls.model, cls.filter = cls.init_filter()
        cls.result = cls.run_filter()

    def test_loglike(self):
        assert_allclose(
            self.result['loglike'](self.true['start']), self.true['loglike'],
            rtol=1e-3
        )

    def test_filtered_state(self):
        assert_allclose(
            self.result['state'][0][self.true['start']:],
            self.true_states.iloc[:, 0],
            atol=1e-2
        )
        assert_allclose(
            self.result['state'][1][self.true['start']:],
            self.true_states.iloc[:, 1],
            atol=1e-2
        )
        assert_allclose(
            self.result['state'][3][self.true['start']:],
            self.true_states.iloc[:, 2],
            atol=1e-2
        )


class TestClark1987Double(Clark1987):
    """
    Basic double precision test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987Double, cls).setup_class(
            dtype=float, conserve_memory=0
        )
        cls.model, cls.filter = cls.init_filter()
        cls.result = cls.run_filter()


class TestClark1987SingleComplex(Clark1987):
    """
    Basic single precision complex test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        # TODO: Can we be more specific?  How can a contributor help?
        pytest.skip('Not implemented')
        super(TestClark1987SingleComplex, cls).setup_class(
            dtype=np.complex64, conserve_memory=0
        )
        cls.model, cls.filter = cls.init_filter()
        cls.result = cls.run_filter()

    def test_loglike(self):
        assert_allclose(
            self.result['loglike'](self.true['start']), self.true['loglike'],
            rtol=1e-3
        )

    def test_filtered_state(self):
        assert_allclose(
            self.result['state'][0][self.true['start']:],
            self.true_states.iloc[:, 0],
            atol=1e-2
        )
        assert_allclose(
            self.result['state'][1][self.true['start']:],
            self.true_states.iloc[:, 1],
            atol=1e-2
        )
        assert_allclose(
            self.result['state'][3][self.true['start']:],
            self.true_states.iloc[:, 2],
            atol=1e-2
        )


class TestClark1987DoubleComplex(Clark1987):
    """
    Basic double precision complex test for the loglikelihood and filtered
    states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987DoubleComplex, cls).setup_class(
            dtype=complex, conserve_memory=0
        )
        cls.model, cls.filter = cls.init_filter()
        cls.result = cls.run_filter()


class TestClark1987Conserve(Clark1987):
    """
    Memory conservation test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls):
        super(TestClark1987Conserve, cls).setup_class(
            dtype=float,
            conserve_memory=MEMORY_NO_FORECAST | MEMORY_NO_PREDICTED
        )
        cls.model, cls.filter = cls.init_filter()
        cls.result = cls.run_filter()


class Clark1987Forecast(Clark1987):
    """
    Forecasting test for the loglikelihood and filtered states.
    """
    @classmethod
    def setup_class(cls, dtype=float, nforecast=100, conserve_memory=0):
        super(Clark1987Forecast, cls).setup_class(
            dtype, conserve_memory
        )
        cls.nforecast = nforecast

        # Add missing observations to the end (to forecast)
        cls._obs = cls.obs
        cls.obs = np.array(np.r_[cls.obs[0, :], [np.nan]*nforecast],
                           ndmin=2, dtype=dtype, order="F")

    def test_filtered_state(self):
        assert_almost_equal(
Loading ...