Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / statespace / representation.py

State Space Representation

Author: Chad Fulton
License: Simplified-BSD

import numpy as np
from .tools import (
    find_best_blas_type, validate_matrix_shape, validate_vector_shape
from .initialization import Initialization
from . import tools

class OptionWrapper(object):
    def __init__(self, mask_attribute, mask_value):
        # Name of the class-level bitmask attribute
        self.mask_attribute = mask_attribute
        # Value of this option
        self.mask_value = mask_value

    def __get__(self, obj, objtype):
        # Return True / False based on whether the bit is set in the bitmask
        return bool(getattr(obj, self.mask_attribute, 0) & self.mask_value)

    def __set__(self, obj, value):
        mask_attribute_value = getattr(obj, self.mask_attribute, 0)
        if bool(value):
            value = mask_attribute_value | self.mask_value
            value = mask_attribute_value & ~self.mask_value
        setattr(obj, self.mask_attribute, value)

class MatrixWrapper(object):
    def __init__(self, name, attribute):
        self.name = name
        self.attribute = attribute
        self._attribute = '_' + attribute

    def __get__(self, obj, objtype):
        matrix = getattr(obj, self._attribute, None)
        # # Remove last dimension if the array is not actually time-varying
        # if matrix is not None and matrix.shape[-1] == 1:
        #     return np.squeeze(matrix, -1)
        return matrix

    def __set__(self, obj, value):
        value = np.asarray(value, order="F")
        shape = obj.shapes[self.attribute]

        if len(shape) == 3:
            value = self._set_matrix(obj, value, shape)
            value = self._set_vector(obj, value, shape)

        setattr(obj, self._attribute, value)
        obj.shapes[self.attribute] = value.shape

    def _set_matrix(self, obj, value, shape):
        # Expand 1-dimensional array if possible
        if (value.ndim == 1 and shape[0] == 1 and
                value.shape[0] == shape[1]):
            value = value[None, :]

        # Enforce that the matrix is appropriate size
            self.name, value.shape, shape[0], shape[1], obj.nobs

        # Expand time-invariant matrix
        if value.ndim == 2:
            value = np.array(value[:, :, None], order="F")

        return value

    def _set_vector(self, obj, value, shape):
        # Enforce that the vector has appropriate length
            self.name, value.shape, shape[0], obj.nobs

        # Expand the time-invariant vector
        if value.ndim == 1:
            value = np.array(value[:, None], order="F")

        return value

class Representation(object):
    State space representation of a time series process

    k_endog : {array_like, int}
        The observed time-series process :math:`y` if array like or the
        number of variables in the process if an integer.
    k_states : int
        The dimension of the unobserved state process.
    k_posdef : int, optional
        The dimension of a guaranteed positive definite covariance matrix
        describing the shocks in the measurement equation. Must be less than
        or equal to `k_states`. Default is `k_states`.
    initial_variance : float, optional
        Initial variance used when approximate diffuse initialization is
        specified. Default is 1e6.
    initialization : Initialization object or str, optional
        Initialization method for the initial state. If a string, must be one
        of {'diffuse', 'approximate_diffuse', 'stationary', 'known'}.
    initial_state : array_like, optional
        If `initialization='known'` is used, the mean of the initial state's
    initial_state_cov : array_like, optional
        If `initialization='known'` is used, the covariance matrix of the
        initial state's distribution.
    nobs : int, optional
        If an endogenous vector is not given (i.e. `k_endog` is an integer),
        the number of observations can optionally be specified. If not
        specified, they will be set to zero until data is bound to the model.
    dtype : np.dtype, optional
        If an endogenous vector is not given (i.e. `k_endog` is an integer),
        the default datatype of the state space matrices can optionally be
        specified. Default is `np.float64`.
    design : array_like, optional
        The design matrix, :math:`Z`. Default is set to zeros.
    obs_intercept : array_like, optional
        The intercept for the observation equation, :math:`d`. Default is set
        to zeros.
    obs_cov : array_like, optional
        The covariance matrix for the observation equation :math:`H`. Default
        is set to zeros.
    transition : array_like, optional
        The transition matrix, :math:`T`. Default is set to zeros.
    state_intercept : array_like, optional
        The intercept for the transition equation, :math:`c`. Default is set to
    selection : array_like, optional
        The selection matrix, :math:`R`. Default is set to zeros.
    state_cov : array_like, optional
        The covariance matrix for the state equation :math:`Q`. Default is set
        to zeros.
        Additional keyword arguments. Not used directly. It is present to
        improve compatibility with subclasses, so that they can use `**kwargs`
        to specify any default state space matrices (e.g. `design`) without
        having to clean out any other keyword arguments they might have been

    nobs : int
        The number of observations.
    k_endog : int
        The dimension of the observation series.
    k_states : int
        The dimension of the unobserved state process.
    k_posdef : int
        The dimension of a guaranteed positive
        definite covariance matrix describing
        the shocks in the measurement equation.
    shapes : dictionary of name:tuple
        A dictionary recording the initial shapes
        of each of the representation matrices as
    initialization : str
        Kalman filter initialization method. Default is unset.
    initial_variance : float
        Initial variance for approximate diffuse
        initialization. Default is 1e6.

    A general state space model is of the form

    .. math::

        y_t & = Z_t \alpha_t + d_t + \varepsilon_t \\
        \alpha_t & = T_t \alpha_{t-1} + c_t + R_t \eta_t \\

    where :math:`y_t` refers to the observation vector at time :math:`t`,
    :math:`\alpha_t` refers to the (unobserved) state vector at time
    :math:`t`, and where the irregular components are defined as

    .. math::

        \varepsilon_t \sim N(0, H_t) \\
        \eta_t \sim N(0, Q_t) \\

    The remaining variables (:math:`Z_t, d_t, H_t, T_t, c_t, R_t, Q_t`) in the
    equations are matrices describing the process. Their variable names and
    dimensions are as follows

    Z : `design`          :math:`(k\_endog \times k\_states \times nobs)`

    d : `obs_intercept`   :math:`(k\_endog \times nobs)`

    H : `obs_cov`         :math:`(k\_endog \times k\_endog \times nobs)`

    T : `transition`      :math:`(k\_states \times k\_states \times nobs)`

    c : `state_intercept` :math:`(k\_states \times nobs)`

    R : `selection`       :math:`(k\_states \times k\_posdef \times nobs)`

    Q : `state_cov`       :math:`(k\_posdef \times k\_posdef \times nobs)`

    In the case that one of the matrices is time-invariant (so that, for
    example, :math:`Z_t = Z_{t+1} ~ \forall ~ t`), its last dimension may
    be of size :math:`1` rather than size `nobs`.

    .. [*] Durbin, James, and Siem Jan Koopman. 2012.
       Time Series Analysis by State Space Methods: Second Edition.
       Oxford University Press.

    endog = None
    (array) The observation vector, alias for `obs`.
    design = MatrixWrapper('design', 'design')
    (array) Design matrix: :math:`Z~(k\_endog \times k\_states \times nobs)`
    obs_intercept = MatrixWrapper('observation intercept', 'obs_intercept')
    (array) Observation intercept: :math:`d~(k\_endog \times nobs)`
    obs_cov = MatrixWrapper('observation covariance matrix', 'obs_cov')
    (array) Observation covariance matrix:
    :math:`H~(k\_endog \times k\_endog \times nobs)`
    transition = MatrixWrapper('transition', 'transition')
    (array) Transition matrix:
    :math:`T~(k\_states \times k\_states \times nobs)`
    state_intercept = MatrixWrapper('state intercept', 'state_intercept')
    (array) State intercept: :math:`c~(k\_states \times nobs)`
    selection = MatrixWrapper('selection', 'selection')
    (array) Selection matrix:
    :math:`R~(k\_states \times k\_posdef \times nobs)`
    state_cov = MatrixWrapper('state covariance matrix', 'state_cov')
    (array) State covariance matrix:
    :math:`Q~(k\_posdef \times k\_posdef \times nobs)`

    def __init__(self, k_endog, k_states, k_posdef=None,
                 initial_variance=1e6, nobs=0, dtype=np.float64,
                 design=None, obs_intercept=None, obs_cov=None,
                 transition=None, state_intercept=None, selection=None,
                 state_cov=None, statespace_classes=None, **kwargs):
        self.shapes = {}

        # Check if k_endog is actually the endog array
        endog = None
        if isinstance(k_endog, np.ndarray):
            endog = k_endog
            # If so, assume that it is either column-ordered and in wide format
            # or row-ordered and in long format
            if (endog.flags['C_CONTIGUOUS'] and
                    (endog.shape[0] > 1 or nobs == 1)):
                endog = endog.T
            k_endog = endog.shape[0]

        # Endogenous array, dimensions, dtype
        self.k_endog = k_endog
        if k_endog < 1:
            raise ValueError('Number of endogenous variables in statespace'
                             ' model must be a positive number.')
        self.nobs = nobs

        # Get dimensions from transition equation
        if k_states < 1:
            raise ValueError('Number of states in statespace model must be a'
                             ' positive number.')
        self.k_states = k_states
        self.k_posdef = k_posdef if k_posdef is not None else k_states

        # Make sure k_posdef <= k_states
        # TODO: we could technically allow k_posdef > k_states, but the Cython
        # code needs to be more thoroughly checked to avoid seg faults.
        if self.k_posdef > self.k_states:
            raise ValueError('Dimension of state innovation `k_posdef` cannot'
                             ' be larger than the dimension of the state.')

        # Bind endog, if it was given
        if endog is not None:

        # Record the shapes of all of our matrices
        # Note: these are time-invariant shapes; in practice the last dimension
        # may also be `self.nobs` for any or all of these.
        self.shapes = {
            'obs': (self.k_endog, self.nobs),
            'design': (self.k_endog, self.k_states, 1),
            'obs_intercept': (self.k_endog, 1),
            'obs_cov': (self.k_endog, self.k_endog, 1),
            'transition': (self.k_states, self.k_states, 1),
            'state_intercept': (self.k_states, 1),
            'selection': (self.k_states, self.k_posdef, 1),
            'state_cov': (self.k_posdef, self.k_posdef, 1),

        # Representation matrices
        # These matrices are only used in the Python object as containers,
        # which will be copied to the appropriate _statespace object if a
        # filter is called.
        scope = locals()
        for name, shape in self.shapes.items():
            if name == 'obs':
            # Create the initial storage array for each matrix
            setattr(self, '_' + name, np.zeros(shape, dtype=dtype, order="F"))

            # If we were given an initial value for the matrix, set it
            # (notice it is being set via the descriptor)
            if scope[name] is not None:
                setattr(self, name, scope[name])

        # Options
        self.initial_variance = initial_variance
        self.prefix_statespace_map = (statespace_classes
                                      if statespace_classes is not None
                                      else tools.prefix_statespace_map.copy())

        # State-space initialization data
        self.initialization = kwargs.get('initialization', None)
        basic_inits = ['diffuse', 'approximate_diffuse', 'stationary']

        if self.initialization in basic_inits:
        elif self.initialization == 'known':
            if 'constant' in kwargs:
                constant = kwargs['constant']
            elif 'initial_state' in kwargs:
Loading ...