# -*- coding: utf-8 -*-
""" Helper and filter functions for VAR and VARMA, and basic VAR class
Created on Mon Jan 11 11:04:23 2010
Author: josef-pktd
License: BSD
This is a new version, I did not look at the old version again, but similar
ideas.
not copied/cleaned yet:
* fftn based filtering, creating samples with fft
* Tests: I ran examples but did not convert them to tests
examples look good for parameter estimate and forecast, and filter functions
main TODOs:
* result statistics
* see whether Bayesian dummy observation can be included without changing
the single call to linalg.lstsq
* impulse response function does not treat correlation, see Hamilton and jplv
Extensions
* constraints, Bayesian priors/penalization
* Error Correction Form and Cointegration
* Factor Models Stock-Watson, ???
see also VAR section in Notes.txt
"""
import numpy as np
from scipy import signal
from statsmodels.tsa.tsatools import lagmat
def varfilter(x, a):
'''apply an autoregressive filter to a series x
Warning: I just found out that convolve does not work as I
thought, this likely does not work correctly for
nvars>3
x can be 2d, a can be 1d, 2d, or 3d
Parameters
----------
x : array_like
data array, 1d or 2d, if 2d then observations in rows
a : array_like
autoregressive filter coefficients, ar lag polynomial
see Notes
Returns
-------
y : ndarray, 2d
filtered array, number of columns determined by x and a
Notes
-----
In general form this uses the linear filter ::
y = a(L)x
where
x : nobs, nvars
a : nlags, nvars, npoly
Depending on the shape and dimension of a this uses different
Lag polynomial arrays
case 1 : a is 1d or (nlags,1)
one lag polynomial is applied to all variables (columns of x)
case 2 : a is 2d, (nlags, nvars)
each series is independently filtered with its own
lag polynomial, uses loop over nvar
case 3 : a is 3d, (nlags, nvars, npoly)
the ith column of the output array is given by the linear filter
defined by the 2d array a[:,:,i], i.e. ::
y[:,i] = a(.,.,i)(L) * x
y[t,i] = sum_p sum_j a(p,j,i)*x(t-p,j)
for p = 0,...nlags-1, j = 0,...nvars-1,
for all t >= nlags
Note: maybe convert to axis=1, Not
TODO: initial conditions
'''
x = np.asarray(x)
a = np.asarray(a)
if x.ndim == 1:
x = x[:,None]
if x.ndim > 2:
raise ValueError('x array has to be 1d or 2d')
nvar = x.shape[1]
nlags = a.shape[0]
ntrim = nlags//2
# for x is 2d with ncols >1
if a.ndim == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a[:,None], mode='valid')
# alternative:
#return signal.lfilter(a,[1],x.astype(float),axis=0)
elif a.ndim == 2:
if min(a.shape) == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a, mode='valid')
# case: independent ar
#(a bit like recserar in gauss, but no x yet)
#(no, reserar is inverse filter)
result = np.zeros((x.shape[0]-nlags+1, nvar))
for i in range(nvar):
# could also use np.convolve, but easier for swiching to fft
result[:,i] = signal.convolve(x[:,i], a[:,i], mode='valid')
return result
elif a.ndim == 3:
# case: vector autoregressive with lag matrices
# Note: we must have shape[1] == shape[2] == nvar
yf = signal.convolve(x[:,:,None], a)
yvalid = yf[ntrim:-ntrim, yf.shape[1]//2,:]
return yvalid
def varinversefilter(ar, nobs, version=1):
'''creates inverse ar filter (MA representation) recursively
The VAR lag polynomial is defined by ::
ar(L) y_t = u_t or
y_t = -ar_{-1}(L) y_{t-1} + u_t
the returned lagpolynomial is arinv(L)=ar^{-1}(L) in ::
y_t = arinv(L) u_t
Parameters
----------
ar : ndarray, (nlags,nvars,nvars)
matrix lagpolynomial, currently no exog
first row should be identity
Returns
-------
arinv : ndarray, (nobs,nvars,nvars)
Notes
-----
'''
nlags, nvars, nvarsex = ar.shape
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
arinv = np.zeros((nobs+1, nvarsex, nvars))
arinv[0,:,:] = ar[0]
arinv[1:nlags,:,:] = -ar[1:]
if version == 1:
for i in range(2,nobs+1):
tmp = np.zeros((nvars,nvars))
for p in range(1,nlags):
tmp += np.dot(-ar[p],arinv[i-p,:,:])
arinv[i,:,:] = tmp
if version == 0:
for i in range(nlags+1,nobs+1):
print(ar[1:].shape, arinv[i-1:i-nlags:-1,:,:].shape)
#arinv[i,:,:] = np.dot(-ar[1:],arinv[i-1:i-nlags:-1,:,:])
#print(np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1])).shape
#arinv[i,:,:] = np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1]))
raise NotImplementedError('waiting for generalized ufuncs or something')
return arinv
def vargenerate(ar, u, initvalues=None):
'''generate an VAR process with errors u
similar to gauss
uses loop
Parameters
----------
ar : array (nlags,nvars,nvars)
matrix lagpolynomial
u : array (nobs,nvars)
exogenous variable, error term for VAR
Returns
-------
sar : array (1+nobs,nvars)
sample of var process, inverse filtered u
does not trim initial condition y_0 = 0
Examples
--------
# generate random sample of VAR
nobs, nvars = 10, 2
u = numpy.random.randn(nobs,nvars)
a21 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.8, 0. ],
[ 0., -0.6]]])
vargenerate(a21,u)
# Impulse Response to an initial shock to the first variable
imp = np.zeros((nobs, nvars))
imp[0,0] = 1
vargenerate(a21,imp)
'''
nlags, nvars, nvarsex = ar.shape
nlagsm1 = nlags - 1
nobs = u.shape[0]
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
if u.shape[1] != nvars:
raise ValueError('u needs to have nvars columns')
if initvalues is None:
sar = np.zeros((nobs+nlagsm1, nvars))
start = nlagsm1
else:
start = max(nlagsm1, initvalues.shape[0])
sar = np.zeros((nobs+start, nvars))
sar[start-initvalues.shape[0]:start] = initvalues
#sar[nlagsm1:] = u
sar[start:] = u
#if version == 1:
for i in range(start,start+nobs):
for p in range(1,nlags):
sar[i] += np.dot(sar[i-p,:],-ar[p])
return sar
def padone(x, front=0, back=0, axis=0, fillvalue=0):
'''pad with zeros along one axis, currently only axis=0
can be used sequentially to pad several axis
Examples
--------
>>> padone(np.ones((2,3)),1,3,axis=1)
array([[ 0., 1., 1., 1., 0., 0., 0.],
[ 0., 1., 1., 1., 0., 0., 0.]])
>>> padone(np.ones((2,3)),1,1, fillvalue=np.nan)
array([[ NaN, NaN, NaN],
[ 1., 1., 1.],
[ 1., 1., 1.],
[ NaN, NaN, NaN]])
'''
#primitive version
shape = np.array(x.shape)
shape[axis] += (front + back)
shapearr = np.array(x.shape)
out = np.empty(shape)
out.fill(fillvalue)
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shapearr
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(out.shape
#print(out[tuple(myslice)].shape
out[tuple(myslice)] = x
return out
def trimone(x, front=0, back=0, axis=0):
'''trim number of array elements along one axis
Examples
--------
>>> xp = padone(np.ones((2,3)),1,3,axis=1)
>>> xp
array([[ 0., 1., 1., 1., 0., 0., 0.],
[ 0., 1., 1., 1., 0., 0., 0.]])
>>> trimone(xp,1,3,1)
array([[ 1., 1., 1.],
[ 1., 1., 1.]])
'''
shape = np.array(x.shape)
shape[axis] -= (front + back)
#print(shape, front, back
shapearr = np.array(x.shape)
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shape
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(shape, endind
#print(x[tuple(myslice)].shape
return x[tuple(myslice)]
def ar2full(ar):
'''make reduced lagpolynomial into a right side lagpoly array
'''
nlags, nvar,nvarex = ar.shape
return np.r_[np.eye(nvar,nvarex)[None,:,:],-ar]
def ar2lhs(ar):
'''convert full (rhs) lagpolynomial into a reduced, left side lagpoly array
this is mainly a reminder about the definition
'''
return -ar[1:]
class _Var(object):
'''obsolete VAR class, use tsa.VAR instead, for internal use only
Examples
--------
>>> v = Var(ar2s)
>>> v.fit(1)
>>> v.arhat
array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.77784898, 0.01726193],
[ 0.10733009, -0.78665335]]])
'''
def __init__(self, y):
self.y = y
self.nobs, self.nvars = y.shape
def fit(self, nlags):
Loading ...