Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / arima_process.py

"""ARMA process and estimation with scipy.signal.lfilter

Notes
-----
* written without textbook, works but not sure about everything
  briefly checked and it looks to be standard least squares, see below

* theoretical autocorrelation function of general ARMA
  Done, relatively easy to guess solution, time consuming to get
  theoretical test cases, example file contains explicit formulas for
  acovf of MA(1), MA(2) and ARMA(1,1)

Properties:
Judge, ... (1985): The Theory and Practise of Econometrics

Author: josefpktd
License: BSD
"""
from statsmodels.compat.pandas import deprecate_kwarg

import numpy as np
from scipy import signal, optimize, linalg

from statsmodels.compat.pandas import Appender
from statsmodels.tools.docstring import remove_parameters, Docstring
from statsmodels.tools.validation import array_like

__all__ = ['arma_acf', 'arma_acovf', 'arma_generate_sample',
           'arma_impulse_response', 'arma2ar', 'arma2ma', 'deconvolve',
           'lpol2index', 'index2lpol']


# Remove after 0.11
@deprecate_kwarg('sigma', 'scale')
def arma_generate_sample(ar, ma, nsample, scale=1, distrvs=None,
                         axis=0, burnin=0):
    """
    Simulate data from an ARMA.

    Parameters
    ----------
    ar : array_like
        The coefficient for autoregressive lag polynomial, including zero lag.
    ma : array_like
        The coefficient for moving-average lag polynomial, including zero lag.
    nsample : int or tuple of ints
        If nsample is an integer, then this creates a 1d timeseries of
        length size. If nsample is a tuple, creates a len(nsample)
        dimensional time series where time is indexed along the input
        variable ``axis``. All series are unless ``distrvs`` generates
        dependent data.
    scale : float
        The standard deviation of noise.
    distrvs : function, random number generator
        A function that generates the random numbers, and takes sample size
        as argument. The default is np.random.randn.
    axis : int
        See nsample for details.
    burnin : int
        Number of observation at the beginning of the sample to drop.
        Used to reduce dependence on initial values.

    Returns
    -------
    ndarray
        Random sample(s) from an ARMA process.

    Notes
    -----
    As mentioned above, both the AR and MA components should include the
    coefficient on the zero-lag. This is typically 1. Further, due to the
    conventions used in signal processing used in signal.lfilter vs.
    conventions in statistics for ARMA processes, the AR parameters should
    have the opposite sign of what you might expect. See the examples below.

    Examples
    --------
    >>> import numpy as np
    >>> np.random.seed(12345)
    >>> arparams = np.array([.75, -.25])
    >>> maparams = np.array([.65, .35])
    >>> ar = np.r_[1, -arparams] # add zero-lag and negate
    >>> ma = np.r_[1, maparams] # add zero-lag
    >>> y = sm.tsa.arma_generate_sample(ar, ma, 250)
    >>> model = sm.tsa.ARMA(y, (2, 2)).fit(trend='nc', disp=0)
    >>> model.params
    array([ 0.79044189, -0.23140636,  0.70072904,  0.40608028])
    """
    distrvs = np.random.normal if distrvs is None else distrvs
    if np.ndim(nsample) == 0:
        nsample = [nsample]
    if burnin:
        # handle burin time for nd arrays
        # maybe there is a better trick in scipy.fft code
        newsize = list(nsample)
        newsize[axis] += burnin
        newsize = tuple(newsize)
        fslice = [slice(None)] * len(newsize)
        fslice[axis] = slice(burnin, None, None)
        fslice = tuple(fslice)
    else:
        newsize = tuple(nsample)
        fslice = tuple([slice(None)] * np.ndim(newsize))
    eta = scale * distrvs(size=newsize)
    return signal.lfilter(ma, ar, eta, axis=axis)[fslice]


def arma_acovf(ar, ma, nobs=10, sigma2=1, dtype=None):
    """
    Theoretical autocovariance function of ARMA process.

    Parameters
    ----------
    ar : array_like, 1d
        The coefficients for autoregressive lag polynomial, including zero lag.
    ma : array_like, 1d
        The coefficients for moving-average lag polynomial, including zero lag.
    nobs : int
        The number of terms (lags plus zero lag) to include in returned acovf.
    sigma2 : float
        Variance of the innovation term.

    Returns
    -------
    ndarray
        The autocovariance of ARMA process given by ar, ma.

    See Also
    --------
    arma_acf : Autocorrelation function for ARMA processes.
    acovf : Sample autocovariance estimation.

    References
    ----------
    .. [*] Brockwell, Peter J., and Richard A. Davis. 2009. Time Series:
        Theory and Methods. 2nd ed. 1991. New York, NY: Springer.
    """
    if dtype is None:
        dtype = np.common_type(np.array(ar), np.array(ma), np.array(sigma2))

    p = len(ar) - 1
    q = len(ma) - 1
    m = max(p, q) + 1

    if sigma2.real < 0:
        raise ValueError('Must have positive innovation variance.')

    # Short-circuit for trivial corner-case
    if p == q == 0:
        out = np.zeros(nobs, dtype=dtype)
        out[0] = sigma2
        return out

    # Get the moving average representation coefficients that we need
    ma_coeffs = arma2ma(ar, ma, lags=m)

    # Solve for the first m autocovariances via the linear system
    # described by (BD, eq. 3.3.8)
    A = np.zeros((m, m), dtype=dtype)
    b = np.zeros((m, 1), dtype=dtype)
    # We need a zero-right-padded version of ar params
    tmp_ar = np.zeros(m, dtype=dtype)
    tmp_ar[:p + 1] = ar
    for k in range(m):
        A[k, :(k + 1)] = tmp_ar[:(k + 1)][::-1]
        A[k, 1:m - k] += tmp_ar[(k + 1):m]
        b[k] = sigma2 * np.dot(ma[k:q + 1], ma_coeffs[:max((q + 1 - k), 0)])
    acovf = np.zeros(max(nobs, m), dtype=dtype)
    acovf[:m] = np.linalg.solve(A, b)[:, 0]

    # Iteratively apply (BD, eq. 3.3.9) to solve for remaining autocovariances
    if nobs > m:
        zi = signal.lfiltic([1], ar, acovf[:m:][::-1])
        acovf[m:] = signal.lfilter([1], ar, np.zeros(nobs - m, dtype=dtype),
                                   zi=zi)[0]

    return acovf[:nobs]


# Remove after 0.11
@deprecate_kwarg('nobs', 'lags')
def arma_acf(ar, ma, lags=10):
    """
    Theoretical autocorrelation function of an ARMA process.

    Parameters
    ----------
    ar : array_like
        Coefficients for autoregressive lag polynomial, including zero lag.
    ma : array_like
        Coefficients for moving-average lag polynomial, including zero lag.
    lags : int
        The number of terms (lags plus zero lag) to include in returned acf.

    Returns
    -------
    ndarray
        The autocorrelations of ARMA process given by ar and ma.

    See Also
    --------
    arma_acovf : Autocovariances from ARMA processes.
    acf : Sample autocorrelation function estimation.
    acovf : Sample autocovariance function estimation.
    """
    acovf = arma_acovf(ar, ma, lags)
    return acovf / acovf[0]


# Remove after 0.11
@deprecate_kwarg('nobs', 'lags')
def arma_pacf(ar, ma, lags=10):
    """
    Theoretical partial autocorrelation function of an ARMA process.

    Parameters
    ----------
    ar : array_like, 1d
        The coefficients for autoregressive lag polynomial, including zero lag.
    ma : array_like, 1d
        The coefficients for moving-average lag polynomial, including zero lag.
    lags : int
        The number of terms (lags plus zero lag) to include in returned pacf.

    Returns
    -------
    ndarrray
        The partial autocorrelation of ARMA process given by ar and ma.

    Notes
    -----
    Solves yule-walker equation for each lag order up to nobs lags.

    not tested/checked yet
    """
    # TODO: Should use rank 1 inverse update
    apacf = np.zeros(lags)
    acov = arma_acf(ar, ma, lags=lags + 1)

    apacf[0] = 1.
    for k in range(2, lags + 1):
        r = acov[:k]
        apacf[k - 1] = linalg.solve(linalg.toeplitz(r[:-1]), r[1:])[-1]
    return apacf


def arma_periodogram(ar, ma, worN=None, whole=0):
    """
    Periodogram for ARMA process given by lag-polynomials ar and ma.

    Parameters
    ----------
    ar : array_like
        The autoregressive lag-polynomial with leading 1 and lhs sign.
    ma : array_like
        The moving average lag-polynomial with leading 1.
    worN : {None, int}, optional
        An option for scipy.signal.freqz (read "w or N").
        If None, then compute at 512 frequencies around the unit circle.
        If a single integer, the compute at that many frequencies.
        Otherwise, compute the response at frequencies given in worN.
    whole : {0,1}, optional
        An options for scipy.signal.freqz/
        Normally, frequencies are computed from 0 to pi (upper-half of
        unit-circle.  If whole is non-zero compute frequencies from 0 to 2*pi.

    Returns
    -------
    w : ndarray
        The frequencies.
    sd : ndarray
        The periodogram, also known as the spectral density.

    Notes
    -----
    Normalization ?

    This uses signal.freqz, which does not use fft. There is a fft version
    somewhere.
    """
    w, h = signal.freqz(ma, ar, worN=worN, whole=whole)
    sd = np.abs(h) ** 2 / np.sqrt(2 * np.pi)
    if np.any(np.isnan(h)):
        # this happens with unit root or seasonal unit root'
        import warnings
        warnings.warn('Warning: nan in frequency response h, maybe a unit '
                      'root', RuntimeWarning)
    return w, sd


# Remove after 0.11
@deprecate_kwarg('nobs', 'leads')
def arma_impulse_response(ar, ma, leads=100):
    """
    Compute the impulse response function (MA representation) for ARMA process.

    Parameters
    ----------
    ar : array_like, 1d
        The auto regressive lag polynomial.
    ma : array_like, 1d
        The moving average lag polynomial.
    leads : int
        The number of observations to calculate.

    Returns
    -------
    ndarray
        The impulse response function with nobs elements.

    Notes
    -----
    This is the same as finding the MA representation of an ARMA(p,q).
    By reversing the role of ar and ma in the function arguments, the
    returned result is the AR representation of an ARMA(p,q), i.e

    ma_representation = arma_impulse_response(ar, ma, leads=100)
    ar_representation = arma_impulse_response(ma, ar, leads=100)

    Fully tested against matlab

    Examples
    --------
    AR(1)

    >>> arma_impulse_response([1.0, -0.8], [1.], leads=10)
    array([ 1.        ,  0.8       ,  0.64      ,  0.512     ,  0.4096    ,
            0.32768   ,  0.262144  ,  0.2097152 ,  0.16777216,  0.13421773])

    this is the same as

    >>> 0.8**np.arange(10)
    array([ 1.        ,  0.8       ,  0.64      ,  0.512     ,  0.4096    ,
            0.32768   ,  0.262144  ,  0.2097152 ,  0.16777216,  0.13421773])

    MA(2)

    >>> arma_impulse_response([1.0], [1., 0.5, 0.2], leads=10)
    array([ 1. ,  0.5,  0.2,  0. ,  0. ,  0. ,  0. ,  0. ,  0. ,  0. ])

    ARMA(1,2)

    >>> arma_impulse_response([1.0, -0.8], [1., 0.5, 0.2], leads=10)
    array([ 1.        ,  1.3       ,  1.24      ,  0.992     ,  0.7936    ,
            0.63488   ,  0.507904  ,  0.4063232 ,  0.32505856,  0.26004685])
Loading ...