Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ sandbox / tsa / fftarma.py

# -*- coding: utf-8 -*-
"""
Created on Mon Dec 14 19:53:25 2009

Author: josef-pktd

generate arma sample using fft with all the lfilter it looks slow
to get the ma representation first

apply arma filter (in ar representation) to time series to get white noise
but seems slow to be useful for fast estimation for nobs=10000

change/check: instead of using marep, use fft-transform of ar and ma
    separately, use ratio check theory is correct and example works
    DONE : feels much faster than lfilter
    -> use for estimation of ARMA
    -> use pade (scipy.interpolate) approximation to get starting polynomial
       from autocorrelation (is autocorrelation of AR(p) related to marep?)
       check if pade is fast, not for larger arrays ?
       maybe pade does not do the right thing for this, not tried yet
       scipy.pade([ 1.    ,  0.6,  0.25, 0.125, 0.0625, 0.1],2)
       raises LinAlgError: singular matrix
       also does not have roots inside unit circle ??
    -> even without initialization, it might be fast for estimation
    -> how do I enforce stationarity and invertibility,
       need helper function

get function drop imag if close to zero from numpy/scipy source, where?

"""

import numpy as np
import numpy.fft as fft
#import scipy.fftpack as fft
from scipy import signal
#from try_var_convolve import maxabs
from statsmodels.tsa.arima_process import ArmaProcess


#trying to convert old experiments to a class


class ArmaFft(ArmaProcess):
    '''fft tools for arma processes

    This class contains several methods that are providing the same or similar
    returns to try out and test different implementations.

    Notes
    -----
    TODO:
    check whether we do not want to fix maxlags, and create new instance if
    maxlag changes. usage for different lengths of timeseries ?
    or fix frequency and length for fft

    check default frequencies w, terminology norw  n_or_w

    some ffts are currently done without padding with zeros

    returns for spectral density methods needs checking, is it always the power
    spectrum hw*hw.conj()

    normalization of the power spectrum, spectral density: not checked yet, for
    example no variance of underlying process is used

    '''

    def __init__(self, ar, ma, n):
        #duplicates now that are subclassing ArmaProcess
        super(ArmaFft, self).__init__(ar, ma)

        self.ar = np.asarray(ar)
        self.ma = np.asarray(ma)
        self.nobs = n
        #could make the polynomials into cached attributes
        self.arpoly = np.polynomial.Polynomial(ar)
        self.mapoly = np.polynomial.Polynomial(ma)
        self.nar = len(ar)  #1d only currently
        self.nma = len(ma)

    def padarr(self, arr, maxlag, atend=True):
        '''pad 1d array with zeros at end to have length maxlag
        function that is a method, no self used

        Parameters
        ----------
        arr : array_like, 1d
            array that will be padded with zeros
        maxlag : int
            length of array after padding
        atend : bool
            If True (default), then the zeros are added to the end, otherwise
            to the front of the array

        Returns
        -------
        arrp : ndarray
            zero-padded array

        Notes
        -----
        This is mainly written to extend coefficient arrays for the lag-polynomials.
        It returns a copy.

        '''
        if atend:
            return np.r_[arr, np.zeros(maxlag-len(arr))]
        else:
            return np.r_[np.zeros(maxlag-len(arr)), arr]


    def pad(self, maxlag):
        '''construct AR and MA polynomials that are zero-padded to a common length

        Parameters
        ----------
        maxlag : int
            new length of lag-polynomials

        Returns
        -------
        ar : ndarray
            extended AR polynomial coefficients
        ma : ndarray
            extended AR polynomial coefficients

        '''
        arpad = np.r_[self.ar, np.zeros(maxlag-self.nar)]
        mapad = np.r_[self.ma, np.zeros(maxlag-self.nma)]
        return arpad, mapad

    def fftar(self, n=None):
        '''Fourier transform of AR polynomial, zero-padded at end to n

        Parameters
        ----------
        n : int
            length of array after zero-padding

        Returns
        -------
        fftar : ndarray
            fft of zero-padded ar polynomial
        '''
        if n is None:
            n = len(self.ar)
        return fft.fft(self.padarr(self.ar, n))

    def fftma(self, n):
        '''Fourier transform of MA polynomial, zero-padded at end to n

        Parameters
        ----------
        n : int
            length of array after zero-padding

        Returns
        -------
        fftar : ndarray
            fft of zero-padded ar polynomial
        '''
        if n is None:
            n = len(self.ar)
        return fft.fft(self.padarr(self.ma, n))

    def fftarma(self, n=None):
        '''Fourier transform of ARMA polynomial, zero-padded at end to n

        The Fourier transform of the ARMA process is calculated as the ratio
        of the fft of the MA polynomial divided by the fft of the AR polynomial.

        Parameters
        ----------
        n : int
            length of array after zero-padding

        Returns
        -------
        fftarma : ndarray
            fft of zero-padded arma polynomial
        '''
        if n is None:
            n = self.nobs
        return (self.fftma(n) / self.fftar(n))

    def spd(self, npos):
        '''raw spectral density, returns Fourier transform

        n is number of points in positive spectrum, the actual number of points
        is twice as large. different from other spd methods with fft
        '''
        n = npos
        w = fft.fftfreq(2*n) * 2 * np.pi
        hw = self.fftarma(2*n)  #not sure, need to check normalization
        #return (hw*hw.conj()).real[n//2-1:]  * 0.5 / np.pi #does not show in plot
        return (hw*hw.conj()).real * 0.5 / np.pi, w

    def spdshift(self, n):
        '''power spectral density using fftshift

        currently returns two-sided according to fft frequencies, use first half
        '''
        #size = s1+s2-1
        mapadded = self.padarr(self.ma, n)
        arpadded = self.padarr(self.ar, n)
        hw = fft.fft(fft.fftshift(mapadded)) / fft.fft(fft.fftshift(arpadded))
        #return np.abs(spd)[n//2-1:]
        w = fft.fftfreq(n) * 2 * np.pi
        wslice = slice(n//2-1, None, None)
        #return (hw*hw.conj()).real[wslice], w[wslice]
        return (hw*hw.conj()).real, w

    def spddirect(self, n):
        '''power spectral density using padding to length n done by fft

        currently returns two-sided according to fft frequencies, use first half
        '''
        #size = s1+s2-1
        #abs looks wrong
        hw = fft.fft(self.ma, n) / fft.fft(self.ar, n)
        w = fft.fftfreq(n) * 2 * np.pi
        wslice = slice(None, n//2, None)
        #return (np.abs(hw)**2)[wslice], w[wslice]
        return (np.abs(hw)**2) * 0.5/np.pi, w

    def _spddirect2(self, n):
        '''this looks bad, maybe with an fftshift
        '''
        #size = s1+s2-1
        hw = (fft.fft(np.r_[self.ma[::-1],self.ma], n)
                / fft.fft(np.r_[self.ar[::-1],self.ar], n))
        return (hw*hw.conj()) #.real[n//2-1:]

    def spdroots(self, w):
        '''spectral density for frequency using polynomial roots

        builds two arrays (number of roots, number of frequencies)
        '''
        return self._spdroots(self.arroots, self.maroots, w)

    def _spdroots(self, arroots, maroots, w):
        '''spectral density for frequency using polynomial roots

        builds two arrays (number of roots, number of frequencies)

        Parameters
        ----------
        arroots : ndarray
            roots of ar (denominator) lag-polynomial
        maroots : ndarray
            roots of ma (numerator) lag-polynomial
        w : array_like
            frequencies for which spd is calculated

        Notes
        -----
        this should go into a function
        '''
        w = np.atleast_2d(w).T
        cosw = np.cos(w)
        #Greene 5th edt. p626, section 20.2.7.a.
        maroots = 1./maroots
        arroots = 1./arroots
        num = 1 + maroots**2 - 2* maroots * cosw
        den = 1 + arroots**2 - 2* arroots * cosw
        #print 'num.shape, den.shape', num.shape, den.shape
        hw = 0.5 / np.pi * num.prod(-1) / den.prod(-1) #or use expsumlog
        return np.squeeze(hw), w.squeeze()

    def spdpoly(self, w, nma=50):
        '''spectral density from MA polynomial representation for ARMA process

        References
        ----------
        Cochrane, section 8.3.3
        '''
        mpoly = np.polynomial.Polynomial(self.arma2ma(nma))
        hw = mpoly(np.exp(1j * w))
        spd = np.real_if_close(hw * hw.conj() * 0.5/np.pi)
        return spd, w

    def filter(self, x):
        '''
        filter a timeseries with the ARMA filter

        padding with zero is missing, in example I needed the padding to get
        initial conditions identical to direct filter

        Initial filtered observations differ from filter2 and signal.lfilter, but
        at end they are the same.

        See Also
        --------
        tsa.filters.fftconvolve

        '''
        n = x.shape[0]
        if n == self.fftarma:
            fftarma = self.fftarma
        else:
            fftarma = self.fftma(n) / self.fftar(n)
        tmpfft = fftarma * fft.fft(x)
        return fft.ifft(tmpfft)

    def filter2(self, x, pad=0):
        '''filter a time series using fftconvolve3 with ARMA filter

        padding of x currently works only if x is 1d
        in example it produces same observations at beginning as lfilter even
        without padding.

        TODO: this returns 1 additional observation at the end
        '''
        from statsmodels.tsa.filters import fftconvolve3
        if not pad:
            pass
        elif pad == 'auto':
            #just guessing how much padding
            x = self.padarr(x, x.shape[0] + 2*(self.nma+self.nar), atend=False)
        else:
            x = self.padarr(x, x.shape[0] + int(pad), atend=False)

        return fftconvolve3(x, self.ma, self.ar)


    def acf2spdfreq(self, acovf, nfreq=100, w=None):
        '''
        not really a method
        just for comparison, not efficient for large n or long acf

        this is also similarly use in tsa.stattools.periodogram with window
        '''
        if w is None:
            w = np.linspace(0, np.pi, nfreq)[:, None]
        nac = len(acovf)
        hw = 0.5 / np.pi * (acovf[0] +
                            2 * (acovf[1:] * np.cos(w*np.arange(1,nac))).sum(1))
        return hw

    def invpowerspd(self, n):
        '''autocovariance from spectral density

        scaling is correct, but n needs to be large for numerical accuracy
        maybe padding with zero in fft would be faster
        without slicing it returns 2-sided autocovariance with fftshift
Loading ...