Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_mlemodel.py

"""
Tests for the generic MLEModel

Author: Chad Fulton
License: Simplified-BSD
"""

import os
import re
import warnings

import numpy as np
import pandas as pd
import pytest

from statsmodels.tsa.statespace import (sarimax, varmax, kalman_filter,
                                        kalman_smoother)
from statsmodels.tsa.statespace.mlemodel import MLEModel, MLEResultsWrapper
from statsmodels.datasets import nile
from numpy.testing import (
    assert_almost_equal, assert_equal, assert_allclose, assert_raises)
from statsmodels.tsa.statespace.tests.results import (
    results_sarimax, results_var_misc)

current_path = os.path.dirname(os.path.abspath(__file__))

# Basic kwargs
kwargs = {
    'k_states': 1, 'design': [[1]], 'transition': [[1]],
    'selection': [[1]], 'state_cov': [[1]],
    'initialization': 'approximate_diffuse'
}


def get_dummy_mod(fit=True, pandas=False):
    # This tests time-varying parameters regression when in fact the parameters
    # are not time-varying, and in fact the regression fit is perfect
    endog = np.arange(100)*1.0
    exog = 2*endog

    if pandas:
        index = pd.date_range('1960-01-01', periods=100, freq='MS')
        endog = pd.Series(endog, index=index)
        exog = pd.Series(exog, index=index)

    mod = sarimax.SARIMAX(
        endog, exog=exog, order=(0, 0, 0),
        time_varying_regression=True, mle_regression=False,
        use_exact_diffuse=True)

    if fit:
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            res = mod.fit(disp=-1)
    else:
        res = None

    return mod, res


def test_init_matrices_time_invariant():
    # Test setting state space system matrices in __init__, with time-invariant
    # matrices
    k_endog = 2
    k_states = 3
    k_posdef = 1

    endog = np.zeros((10, 2))
    obs_intercept = np.arange(k_endog) * 1.0
    design = np.reshape(
        np.arange(k_endog * k_states) * 1.0, (k_endog, k_states))
    obs_cov = np.reshape(np.arange(k_endog**2) * 1.0, (k_endog, k_endog))
    state_intercept = np.arange(k_states) * 1.0
    transition = np.reshape(np.arange(k_states**2) * 1.0, (k_states, k_states))
    selection = np.reshape(
        np.arange(k_states * k_posdef) * 1.0, (k_states, k_posdef))
    state_cov = np.reshape(np.arange(k_posdef**2) * 1.0, (k_posdef, k_posdef))

    mod = MLEModel(endog, k_states=k_states, k_posdef=k_posdef,
                   obs_intercept=obs_intercept, design=design,
                   obs_cov=obs_cov, state_intercept=state_intercept,
                   transition=transition, selection=selection,
                   state_cov=state_cov)

    assert_allclose(mod['obs_intercept'], obs_intercept)
    assert_allclose(mod['design'], design)
    assert_allclose(mod['obs_cov'], obs_cov)
    assert_allclose(mod['state_intercept'], state_intercept)
    assert_allclose(mod['transition'], transition)
    assert_allclose(mod['selection'], selection)
    assert_allclose(mod['state_cov'], state_cov)


def test_init_matrices_time_varying():
    # Test setting state space system matrices in __init__, with time-varying
    # matrices
    nobs = 10
    k_endog = 2
    k_states = 3
    k_posdef = 1

    endog = np.zeros((10, 2))
    obs_intercept = np.reshape(np.arange(k_endog * nobs) * 1.0,
                               (k_endog, nobs))
    design = np.reshape(
        np.arange(k_endog * k_states * nobs) * 1.0, (k_endog, k_states, nobs))
    obs_cov = np.reshape(
        np.arange(k_endog**2 * nobs) * 1.0, (k_endog, k_endog, nobs))
    state_intercept = np.reshape(
        np.arange(k_states * nobs) * 1.0, (k_states, nobs))
    transition = np.reshape(
        np.arange(k_states**2 * nobs) * 1.0, (k_states, k_states, nobs))
    selection = np.reshape(
        np.arange(k_states * k_posdef * nobs) * 1.0,
        (k_states, k_posdef, nobs))
    state_cov = np.reshape(
        np.arange(k_posdef**2 * nobs) * 1.0, (k_posdef, k_posdef, nobs))

    mod = MLEModel(endog, k_states=k_states, k_posdef=k_posdef,
                   obs_intercept=obs_intercept, design=design,
                   obs_cov=obs_cov, state_intercept=state_intercept,
                   transition=transition, selection=selection,
                   state_cov=state_cov)

    assert_allclose(mod['obs_intercept'], obs_intercept)
    assert_allclose(mod['design'], design)
    assert_allclose(mod['obs_cov'], obs_cov)
    assert_allclose(mod['state_intercept'], state_intercept)
    assert_allclose(mod['transition'], transition)
    assert_allclose(mod['selection'], selection)
    assert_allclose(mod['state_cov'], state_cov)


def test_wrapping():
    # Test the wrapping of various Representation / KalmanFilter /
    # KalmanSmoother methods / attributes
    mod, _ = get_dummy_mod(fit=False)

    # Test that we can get the design matrix
    assert_equal(mod['design', 0, 0], 2.0 * np.arange(100))

    # Test that we can set individual elements of the design matrix
    mod['design', 0, 0, :] = 2
    assert_equal(mod.ssm['design', 0, 0, :], 2)
    assert_equal(mod.ssm['design'].shape, (1, 1, 100))

    # Test that we can set the entire design matrix
    mod['design'] = [[3.]]
    assert_equal(mod.ssm['design', 0, 0], 3.)
    # (Now it's no longer time-varying, so only 2-dim)
    assert_equal(mod.ssm['design'].shape, (1, 1))

    # Test that we can change the following properties: loglikelihood_burn,
    # initial_variance, tolerance
    assert_equal(mod.loglikelihood_burn, 0)
    mod.loglikelihood_burn = 1
    assert_equal(mod.ssm.loglikelihood_burn, 1)

    assert_equal(mod.tolerance, mod.ssm.tolerance)
    mod.tolerance = 0.123
    assert_equal(mod.ssm.tolerance, 0.123)

    assert_equal(mod.initial_variance, 1e10)
    mod.initial_variance = 1e12
    assert_equal(mod.ssm.initial_variance, 1e12)

    # Test that we can use the following wrappers: initialization,
    # initialize_known, initialize_stationary, initialize_approximate_diffuse

    # Initialization starts off as none
    assert_equal(isinstance(mod.initialization, object), True)

    # Since the SARIMAX model may be fully stationary or may have diffuse
    # elements, it uses a custom initialization by default, but it can be
    # overridden by users
    mod.initialize_default()  # no-op here

    mod.initialize_approximate_diffuse(1e5)
    assert_equal(mod.initialization.initialization_type, 'approximate_diffuse')
    assert_equal(mod.initialization.approximate_diffuse_variance, 1e5)

    mod.initialize_known([5.], [[40]])
    assert_equal(mod.initialization.initialization_type, 'known')
    assert_equal(mod.initialization.constant, [5.])
    assert_equal(mod.initialization.stationary_cov, [[40]])

    mod.initialize_stationary()
    assert_equal(mod.initialization.initialization_type, 'stationary')

    # Test that we can use the following wrapper methods: set_filter_method,
    # set_stability_method, set_conserve_memory, set_smoother_output

    # The defaults are as follows:
    assert_equal(mod.ssm.filter_method, kalman_filter.FILTER_CONVENTIONAL)
    assert_equal(
        mod.ssm.stability_method,
        kalman_filter.STABILITY_FORCE_SYMMETRY)
    assert_equal(mod.ssm.conserve_memory, kalman_filter.MEMORY_STORE_ALL)
    assert_equal(mod.ssm.smoother_output, kalman_smoother.SMOOTHER_ALL)

    # Now, create the Cython filter object and assert that they have
    # transferred correctly
    mod.ssm._initialize_filter()
    kf = mod.ssm._kalman_filter
    assert_equal(kf.filter_method, kalman_filter.FILTER_CONVENTIONAL)
    assert_equal(kf.stability_method, kalman_filter.STABILITY_FORCE_SYMMETRY)
    assert_equal(kf.conserve_memory, kalman_filter.MEMORY_STORE_ALL)
    # (the smoother object is so far not in Cython, so there is no
    # transferring)

    # Change the attributes in the model class
    mod.set_filter_method(100)
    mod.set_stability_method(101)
    mod.set_conserve_memory(102)
    mod.set_smoother_output(103)

    # Assert that the changes have occurred in the ssm class
    assert_equal(mod.ssm.filter_method, 100)
    assert_equal(mod.ssm.stability_method, 101)
    assert_equal(mod.ssm.conserve_memory, 102)
    assert_equal(mod.ssm.smoother_output, 103)

    # Assert that the changes have *not yet* occurred in the filter object
    assert_equal(kf.filter_method, kalman_filter.FILTER_CONVENTIONAL)
    assert_equal(kf.stability_method, kalman_filter.STABILITY_FORCE_SYMMETRY)
    assert_equal(kf.conserve_memory, kalman_filter.MEMORY_STORE_ALL)

    # Now, test the setting of the other two methods by resetting the
    # filter method to a valid value
    mod.set_filter_method(1)
    mod.ssm._initialize_filter()
    # Retrieve the new kalman filter object (a new object had to be created
    # due to the changing filter method)
    kf = mod.ssm._kalman_filter

    assert_equal(kf.filter_method, 1)
    assert_equal(kf.stability_method, 101)
    assert_equal(kf.conserve_memory, 102)


def test_fit_misc():
    true = results_sarimax.wpi1_stationary
    endog = np.diff(true['data'])[1:]

    mod = sarimax.SARIMAX(endog, order=(1, 0, 1), trend='c')

    # Test optim_hessian={'opg','oim','approx'}
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        res1 = mod.fit(method='ncg', disp=0, optim_hessian='opg',
                       optim_complex_step=False)
        res2 = mod.fit(method='ncg', disp=0, optim_hessian='oim',
                       optim_complex_step=False)
    # Check that the Hessians broadly result in the same optimum
    assert_allclose(res1.llf, res2.llf, rtol=1e-2)

    # Test return_params=True
    mod, _ = get_dummy_mod(fit=False)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        res_params = mod.fit(disp=-1, return_params=True)

    # 5 digits necessary to accommodate 32-bit numpy/scipy with OpenBLAS 0.2.18
    assert_almost_equal(res_params, [0, 0], 5)


@pytest.mark.smoke
def test_score_misc():
    mod, res = get_dummy_mod()

    # Test that the score function works
    mod.score(res.params)


def test_from_formula():
    assert_raises(NotImplementedError, lambda: MLEModel.from_formula(1, 2, 3))


def test_score_analytic_ar1():
    # Test the score against the analytic score for an AR(1) model with 2
    # observations
    # Let endog = [1, 0.5], params=[0, 1]
    mod = sarimax.SARIMAX([1, 0.5], order=(1, 0, 0))

    def partial_phi(phi, sigma2):
        return -0.5 * (phi**2 + 2*phi*sigma2 - 1) / (sigma2 * (1 - phi**2))

    def partial_sigma2(phi, sigma2):
        return -0.5 * (2*sigma2 + phi - 1.25) / (sigma2**2)

    params = np.r_[0., 2]

    # Compute the analytic score
    analytic_score = np.r_[
        partial_phi(params[0], params[1]),
        partial_sigma2(params[0], params[1])]

    # Check each of the approximations, transformed parameters
    approx_cs = mod.score(params, transformed=True, approx_complex_step=True)
    assert_allclose(approx_cs, analytic_score)

    approx_fd = mod.score(params, transformed=True, approx_complex_step=False)
    assert_allclose(approx_fd, analytic_score, atol=1e-5)

    approx_fd_centered = (
        mod.score(params, transformed=True, approx_complex_step=False,
                  approx_centered=True))
    assert_allclose(approx_fd, analytic_score, atol=1e-5)

    harvey_cs = mod.score(params, transformed=True, method='harvey',
                          approx_complex_step=True)
    assert_allclose(harvey_cs, analytic_score)
    harvey_fd = mod.score(params, transformed=True, method='harvey',
                          approx_complex_step=False)
    assert_allclose(harvey_fd, analytic_score, atol=1e-5)
    harvey_fd_centered = mod.score(params, transformed=True, method='harvey',
                                   approx_complex_step=False,
                                   approx_centered=True)
    assert_allclose(harvey_fd_centered, analytic_score, atol=1e-5)

    # Check the approximations for untransformed parameters. The analytic
    # check now comes from chain rule with the analytic derivative of the
    # transformation
    # if L* is the likelihood evaluated at untransformed parameters and
    # L is the likelihood evaluated at transformed parameters, then we have:
    # L*(u) = L(t(u))
    # and then
    # L'*(u) = L'(t(u)) * t'(u)
    def partial_transform_phi(phi):
        return -1. / (1 + phi**2)**(3./2)

    def partial_transform_sigma2(sigma2):
        return 2. * sigma2

    uparams = mod.untransform_params(params)

    analytic_score = np.dot(
        np.diag(np.r_[partial_transform_phi(uparams[0]),
                      partial_transform_sigma2(uparams[1])]),
        np.r_[partial_phi(params[0], params[1]),
              partial_sigma2(params[0], params[1])])

    approx_cs = mod.score(uparams, transformed=False, approx_complex_step=True)
    assert_allclose(approx_cs, analytic_score)
Loading ...