Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / tests / test_tools.py

"""
Tests for tools

Author: Chad Fulton
License: Simplified-BSD
"""

import pytest
import numpy as np
from numpy.testing import (assert_allclose, assert_equal, assert_array_less,
                           assert_array_equal, assert_almost_equal)
import pandas as pd
from scipy.linalg import solve_discrete_lyapunov

from statsmodels.tsa.statespace import tools
from statsmodels.tsa.api import acovf


class TestCompanionMatrix(object):

    cases = [
        (2, np.array([[0, 1], [0, 0]])),
        ([1, -1, -2], np.array([[1, 1],
                                [2, 0]])),
        ([1, -1, -2, -3], np.array([[1, 1, 0],
                                    [2, 0, 1],
                                    [3, 0, 0]])),
        ([1, -np.array([[1, 2], [3, 4]]), -np.array([[5, 6], [7, 8]])],
         np.array([[1, 2, 5, 6],
                   [3, 4, 7, 8],
                   [1, 0, 0, 0],
                   [0, 1, 0, 0]]).T),
        # GH 5570
        (np.int64(2), np.array([[0, 1], [0, 0]]))
    ]

    def test_cases(self):
        for polynomial, result in self.cases:
            assert_equal(tools.companion_matrix(polynomial), result)


class TestDiff(object):

    x = np.arange(10)
    cases = [
        # diff = 1
        ([1, 2, 3], 1, None, 1, [1, 1]),
        # diff = 2
        (x, 2, None, 1, [0]*8),
        # diff = 1, seasonal_diff=1, seasonal_periods=4
        (x, 1, 1, 4, [0]*5),
        (x**2, 1, 1, 4, [8]*5),
        (x**3, 1, 1, 4, [60, 84, 108, 132, 156]),
        # diff = 1, seasonal_diff=2, seasonal_periods=2
        (x, 1, 2, 2, [0]*5),
        (x**2, 1, 2, 2, [0]*5),
        (x**3, 1, 2, 2, [24]*5),
        (x**4, 1, 2, 2, [240, 336, 432, 528, 624]),
    ]

    # TODO: use pytest.mark.parametrize?
    def test_cases(self):
        # Basic cases
        for series, diff, seas_diff, seasonal_periods, result in self.cases:
            seasonal_diff = seas_diff

            # Test numpy array
            x = tools.diff(series, diff, seasonal_diff, seasonal_periods)
            assert_almost_equal(x, result)

            # Test as Pandas Series
            series = pd.Series(series)

            # Rewrite to test as n-dimensional array
            series = np.c_[series, series]
            result = np.c_[result, result]

            # Test Numpy array
            x = tools.diff(series, diff, seasonal_diff, seasonal_periods)
            assert_almost_equal(x, result)

            # Test as Pandas DataFrame
            series = pd.DataFrame(series)
            x = tools.diff(series, diff, seasonal_diff, seasonal_periods)
            assert_almost_equal(x, result)


class TestSolveDiscreteLyapunov(object):

    def solve_dicrete_lyapunov_direct(self, a, q, complex_step=False):
        # This is the discrete Lyapunov solver as "real function of real
        # variables":  the difference between this and the usual, complex,
        # version is that in the Kronecker product the second argument is
        # *not* conjugated here.
        if not complex_step:
            lhs = np.kron(a, a.conj())
            lhs = np.eye(lhs.shape[0]) - lhs
            x = np.linalg.solve(lhs, q.flatten())
        else:
            lhs = np.kron(a, a)
            lhs = np.eye(lhs.shape[0]) - lhs
            x = np.linalg.solve(lhs, q.flatten())

        return np.reshape(x, q.shape)

    def test_univariate(self):
        # Real case
        a = np.array([[0.5]])
        q = np.array([[10.]])
        actual = tools.solve_discrete_lyapunov(a, q)
        desired = solve_discrete_lyapunov(a, q)
        assert_allclose(actual, desired)

        # Complex case (where the Lyapunov equation is taken as a complex
        # function)
        a = np.array([[0.5+1j]])
        q = np.array([[10.]])
        actual = tools.solve_discrete_lyapunov(a, q)
        desired = solve_discrete_lyapunov(a, q)
        assert_allclose(actual, desired)

        # Complex case (where the Lyapunov equation is taken as a real
        # function)
        a = np.array([[0.5+1j]])
        q = np.array([[10.]])
        actual = tools.solve_discrete_lyapunov(a, q, complex_step=True)
        desired = self.solve_dicrete_lyapunov_direct(a, q, complex_step=True)
        assert_allclose(actual, desired)

    def test_multivariate(self):
        # Real case
        a = tools.companion_matrix([1, -0.4, 0.5])
        q = np.diag([10., 5.])
        actual = tools.solve_discrete_lyapunov(a, q)
        desired = solve_discrete_lyapunov(a, q)
        assert_allclose(actual, desired)

        # Complex case (where the Lyapunov equation is taken as a complex
        # function)
        a = tools.companion_matrix([1, -0.4+0.1j, 0.5])
        q = np.diag([10., 5.])
        actual = tools.solve_discrete_lyapunov(a, q, complex_step=False)
        desired = self.solve_dicrete_lyapunov_direct(a, q, complex_step=False)
        assert_allclose(actual, desired)

        # Complex case (where the Lyapunov equation is taken as a real
        # function)
        a = tools.companion_matrix([1, -0.4+0.1j, 0.5])
        q = np.diag([10., 5.])
        actual = tools.solve_discrete_lyapunov(a, q, complex_step=True)
        desired = self.solve_dicrete_lyapunov_direct(a, q, complex_step=True)
        assert_allclose(actual, desired)


class TestConcat(object):

    x = np.arange(10)

    valid = [
        (((1, 2, 3), (4,)), (1, 2, 3, 4)),
        (((1, 2, 3), [4]), (1, 2, 3, 4)),
        (([1, 2, 3], np.r_[4]), (1, 2, 3, 4)),
        ((np.r_[1, 2, 3], pd.Series([4])), 0, True, (1, 2, 3, 4)),
        ((pd.Series([1, 2, 3]), pd.Series([4])), 0, True, (1, 2, 3, 4)),
        ((np.c_[x[:2], x[:2]], np.c_[x[2:3], x[2:3]]), np.c_[x[:3], x[:3]]),
        ((np.c_[x[:2], x[:2]].T, np.c_[x[2:3], x[2:3]].T),
         1, np.c_[x[:3], x[:3]].T),
        ((pd.DataFrame(np.c_[x[:2], x[:2]]), np.c_[x[2:3], x[2:3]]),
         0, True, np.c_[x[:3], x[:3]]),
    ]

    invalid = [
        (((1, 2, 3), pd.Series([4])), ValueError),
        (((1, 2, 3), np.array([[1, 2]])), ValueError)
    ]

    def test_valid(self):
        for args in self.valid:
            assert_array_equal(tools.concat(*args[:-1]), args[-1])

    def test_invalid(self):
        for args in self.invalid:
            with pytest.raises(args[-1]):
                tools.concat(*args[:-1])


class TestIsInvertible(object):

    cases = [
        ([1, -0.5], True),
        ([1, 1-1e-9], True),
        ([1, 1], False),
        ([1, 0.9, 0.1], True),
        (np.array([1, 0.9, 0.1]), True),
        (pd.Series([1, 0.9, 0.1]), True)
    ]

    def test_cases(self):
        for polynomial, invertible in self.cases:
            assert_equal(tools.is_invertible(polynomial), invertible)


class TestConstrainStationaryUnivariate(object):

    cases = [
        (np.array([2.]), -2./((1+2.**2)**0.5))
    ]

    def test_cases(self):
        for unconstrained, constrained in self.cases:
            result = tools.constrain_stationary_univariate(unconstrained)
            assert_equal(result, constrained)


class TestUnconstrainStationaryUnivariate(object):

    cases = [
        (np.array([-2./((1+2.**2)**0.5)]), np.array([2.]))
    ]

    def test_cases(self):
        for constrained, unconstrained in self.cases:
            result = tools.unconstrain_stationary_univariate(constrained)
            assert_allclose(result, unconstrained)


class TestStationaryUnivariate(object):
    # Test that the constraint and unconstrained functions are inverses

    constrained_cases = [
        np.array([0]), np.array([0.1]), np.array([-0.5]), np.array([0.999])]
    unconstrained_cases = [
        np.array([10.]), np.array([-40.42]), np.array([0.123])]

    def test_cases(self):
        for constrained in self.constrained_cases:
            unconstrained = tools.unconstrain_stationary_univariate(constrained)  # noqa:E501
            reconstrained = tools.constrain_stationary_univariate(unconstrained)  # noqa:E501
            assert_allclose(reconstrained, constrained)

        for unconstrained in self.unconstrained_cases:
            constrained = tools.constrain_stationary_univariate(unconstrained)
            reunconstrained = tools.unconstrain_stationary_univariate(constrained)  # noqa:E501
            assert_allclose(reunconstrained, unconstrained)


class TestValidateMatrixShape(object):
    # name, shape, nrows, ncols, nobs
    valid = [
        ('TEST', (5, 2), 5, 2, None),
        ('TEST', (5, 2), 5, 2, 10),
        ('TEST', (5, 2, 10), 5, 2, 10),
    ]
    invalid = [
        ('TEST', (5,), 5, None, None),
        ('TEST', (5, 1, 1, 1), 5, 1, None),
        ('TEST', (5, 2), 10, 2, None),
        ('TEST', (5, 2), 5, 1, None),
        ('TEST', (5, 2, 10), 5, 2, None),
        ('TEST', (5, 2, 10), 5, 2, 5),
    ]

    def test_valid_cases(self):
        for args in self.valid:
            # Just testing that no exception is raised
            tools.validate_matrix_shape(*args)

    def test_invalid_cases(self):
        for args in self.invalid:
            with pytest.raises(ValueError):
                tools.validate_matrix_shape(*args)


class TestValidateVectorShape(object):
    # name, shape, nrows, ncols, nobs
    valid = [
        ('TEST', (5,), 5, None),
        ('TEST', (5,), 5, 10),
        ('TEST', (5, 10), 5, 10),
    ]
    invalid = [
        ('TEST', (5, 2, 10), 5, 10),
        ('TEST', (5,), 10, None),
        ('TEST', (5, 10), 5, None),
        ('TEST', (5, 10), 5, 5),
    ]

    def test_valid_cases(self):
        for args in self.valid:
            # Just testing that no exception is raised
            tools.validate_vector_shape(*args)

    def test_invalid_cases(self):
        for args in self.invalid:
            with pytest.raises(ValueError):
                tools.validate_vector_shape(*args)


def test_multivariate_acovf():
    _acovf = tools._compute_multivariate_acovf_from_coefficients

    # Test for a VAR(1) process. From Lutkepohl (2007), pages 27-28.
    # See (2.1.14) for Phi_1, (2.1.33) for Sigma_u, and (2.1.34) for Gamma_0
    Sigma_u = np.array([[2.25, 0,   0],
                        [0,    1.0, 0.5],
                        [0,    0.5, 0.74]])
    Phi_1 = np.array([[0.5, 0,   0],
                      [0.1, 0.1, 0.3],
                      [0,   0.2, 0.3]])
    Gamma_0 = np.array([[3.0,   0.161, 0.019],
                        [0.161, 1.172, 0.674],
                        [0.019, 0.674, 0.954]])
    assert_allclose(_acovf([Phi_1], Sigma_u)[0], Gamma_0, atol=1e-3)

    # Test for a VAR(2) process. From Lutkepohl (2007), pages 28-29
    # See (2.1.40) for Phi_1, Phi_2, (2.1.14) for Sigma_u, and (2.1.42) for
    # Gamma_0, Gamma_1
    Sigma_u = np.diag([0.09, 0.04])
    Phi_1 = np.array([[0.5, 0.1],
                      [0.4, 0.5]])
    Phi_2 = np.array([[0,    0],
                      [0.25, 0]])
    Gamma_0 = np.array([[0.131, 0.066],
                        [0.066, 0.181]])
    Gamma_1 = np.array([[0.072, 0.051],
                        [0.104, 0.143]])
    Gamma_2 = np.array([[0.046, 0.040],
                        [0.113, 0.108]])
    Gamma_3 = np.array([[0.035, 0.031],
                        [0.093, 0.083]])

    assert_allclose(
        _acovf([Phi_1, Phi_2], Sigma_u, maxlag=0),
        [Gamma_0], atol=1e-3)

    assert_allclose(
        _acovf([Phi_1, Phi_2], Sigma_u, maxlag=1),
        [Gamma_0, Gamma_1], atol=1e-3)

    assert_allclose(
        _acovf([Phi_1, Phi_2], Sigma_u),
        [Gamma_0, Gamma_1], atol=1e-3)

    assert_allclose(
        _acovf([Phi_1, Phi_2], Sigma_u, maxlag=2),
Loading ...