from statsmodels.tsa.arima_model import ARMA
from unittest import TestCase
import numpy as np
from numpy.testing import (assert_array_almost_equal, assert_almost_equal,
assert_allclose,
assert_equal, assert_raises, assert_)
import pytest
from statsmodels.tsa.arima_process import (arma_generate_sample, arma_acovf,
arma_acf, arma_impulse_response, lpol_fiar, lpol_fima,
ArmaProcess, lpol2index, index2lpol)
from statsmodels.sandbox.tsa.fftarma import ArmaFft
from statsmodels.tsa.tests.results.results_process import armarep # benchmarkdata
from statsmodels.tsa.tests.results import results_arma_acf
arlist = [[1.],
[1, -0.9], # ma representation will need many terms to get high precision
[1, 0.9],
[1, -0.9, 0.3]]
malist = [[1.],
[1, 0.9],
[1, -0.9],
[1, 0.9, -0.3]]
DECIMAL_4 = 4
def test_arma_acovf():
# Check for specific AR(1)
N = 20
phi = 0.9
sigma = 1
# rep 1: from module function
rep1 = arma_acovf([1, -phi], [1], N)
# rep 2: manually
rep2 = [1. * sigma * phi ** i / (1 - phi ** 2) for i in range(N)]
assert_allclose(rep1, rep2)
def test_arma_acovf_persistent():
# Test arma_acovf in case where there is a near-unit root.
# .999 is high enough to trigger the "while ir[-1] > 5*1e-5:" clause,
# but not high enough to trigger the "nobs_ir > 50000" clause.
ar = np.array([1, -.9995])
ma = np.array([1])
process = ArmaProcess(ar, ma)
res = process.acovf(10)
# Theoretical variance sig2 given by:
# sig2 = .9995**2 * sig2 + 1
sig2 = 1/(1-.9995**2)
corrs = .9995**np.arange(10)
expected = sig2*corrs
assert_equal(res.ndim, 1)
assert_allclose(res, expected)
def test_arma_acf():
# Check for specific AR(1)
N = 20
phi = 0.9
sigma = 1
# rep 1: from module function
rep1 = arma_acf([1, -phi], [1], N)
# rep 2: manually
acovf = np.array([1. * sigma * phi ** i / (1 - phi ** 2) for i in range(N)])
rep2 = acovf / (1. / (1 - phi ** 2))
assert_allclose(rep1, rep2)
def test_arma_acf_compare_R_ARMAacf():
# Test specific cases against output from R's ARMAacf
bd_example_3_3_2 = arma_acf([1, -1, 0.25], [1, 1])
assert_allclose(bd_example_3_3_2, results_arma_acf.bd_example_3_3_2)
example_1 = arma_acf([1, -1, 0.25], [1, 1, 0.2])
assert_allclose(example_1, results_arma_acf.custom_example_1)
example_2 = arma_acf([1, -1, 0.25], [1, 1, 0.2, 0.3])
assert_allclose(example_2, results_arma_acf.custom_example_2)
example_3 = arma_acf([1, -1, 0.25], [1, 1, 0.2, 0.3, -0.35])
assert_allclose(example_3, results_arma_acf.custom_example_3)
example_4 = arma_acf([1, -1, 0.25], [1, 1, 0.2, 0.3, -0.35, -0.1])
assert_allclose(example_4, results_arma_acf.custom_example_4)
example_5 = arma_acf([1, -1, 0.25, -0.1], [1, 1, 0.2])
assert_allclose(example_5, results_arma_acf.custom_example_5)
example_6 = arma_acf([1, -1, 0.25, -0.1, 0.05], [1, 1, 0.2])
assert_allclose(example_6, results_arma_acf.custom_example_6)
example_7 = arma_acf([1, -1, 0.25, -0.1, 0.05, -0.02], [1, 1, 0.2])
assert_allclose(example_7, results_arma_acf.custom_example_7)
def test_arma_acov_compare_theoretical_arma_acov():
# Test against the older version of this function, which used a different
# approach that nicely shows the theoretical relationship
# See GH:5324 when this was removed for full version of the function
# including documentation and inline comments
def arma_acovf_historical(ar, ma, nobs=10):
if np.abs(np.sum(ar) - 1) > 0.9:
nobs_ir = max(1000, 2 * nobs)
else:
nobs_ir = max(100, 2 * nobs)
ir = arma_impulse_response(ar, ma, leads=nobs_ir)
while ir[-1] > 5 * 1e-5:
nobs_ir *= 10
ir = arma_impulse_response(ar, ma, leads=nobs_ir)
if nobs_ir > 50000 and nobs < 1001:
end = len(ir)
acovf = np.array([np.dot(ir[:end-nobs-t], ir[t:end-nobs])
for t in range(nobs)])
else:
acovf = np.correlate(ir, ir, 'full')[len(ir) - 1:]
return acovf[:nobs]
assert_allclose(arma_acovf([1, -0.5], [1, 0.2]),
arma_acovf_historical([1, -0.5], [1, 0.2]))
assert_allclose(arma_acovf([1, -0.99], [1, 0.2]),
arma_acovf_historical([1, -0.99], [1, 0.2]))
def _manual_arma_generate_sample(ar, ma, eta):
T = len(eta)
ar = ar[::-1]
ma = ma[::-1]
p, q = len(ar), len(ma)
rep2 = [0] * max(p, q) # initialize with zeroes
for t in range(T):
yt = eta[t]
if p:
yt += np.dot(rep2[-p:], ar)
if q:
# left pad shocks with zeros
yt += np.dot([0] * (q - t) + list(eta[max(0, t - q):t]), ma)
rep2.append(yt)
return np.array(rep2[max(p, q):])
@pytest.mark.parametrize('ar', arlist)
@pytest.mark.parametrize('ma', malist)
@pytest.mark.parametrize('dist', [np.random.standard_normal])
def test_arma_generate_sample(dist, ar, ma):
# Test that this generates a true ARMA process
# (amounts to just a test that scipy.signal.lfilter does what we want)
T = 100
np.random.seed(1234)
eta = dist(T)
# rep1: from module function
np.random.seed(1234)
rep1 = arma_generate_sample(ar, ma, T, distrvs=dist)
# rep2: "manually" create the ARMA process
ar_params = -1 * np.array(ar[1:])
ma_params = np.array(ma[1:])
rep2 = _manual_arma_generate_sample(ar_params, ma_params, eta)
assert_array_almost_equal(rep1, rep2, 13)
def test_fi():
# test identity of ma and ar representation of fi lag polynomial
n = 100
mafromar = arma_impulse_response(lpol_fiar(0.4, n=n), [1], n)
assert_array_almost_equal(mafromar, lpol_fima(0.4, n=n), 13)
def test_arma_impulse_response():
arrep = arma_impulse_response(armarep.ma, armarep.ar, leads=21)[1:]
marep = arma_impulse_response(armarep.ar, armarep.ma, leads=21)[1:]
assert_array_almost_equal(armarep.marep.ravel(), marep, 14)
# difference in sign convention to matlab for AR term
assert_array_almost_equal(-armarep.arrep.ravel(), arrep, 14)
@pytest.mark.parametrize('ar', arlist)
@pytest.mark.parametrize('ma', malist)
def test_spectrum(ar, ma):
nfreq = 20
w = np.linspace(0, np.pi, nfreq, endpoint=False)
arma = ArmaFft(ar, ma, 20)
spdr, wr = arma.spdroots(w)
spdp, wp = arma.spdpoly(w, 200)
spdd, wd = arma.spddirect(nfreq * 2)
assert_equal(w, wr)
assert_equal(w, wp)
assert_almost_equal(w, wd[:nfreq], decimal=14)
assert_almost_equal(spdr, spdd[:nfreq], decimal=7,
err_msg='spdr spdd not equal for %s, %s' % (ar, ma))
assert_almost_equal(spdr, spdp, decimal=7,
err_msg='spdr spdp not equal for %s, %s' % (ar, ma))
@pytest.mark.parametrize('ar', arlist)
@pytest.mark.parametrize('ma', malist)
def test_armafft(ar, ma):
# test other methods
nfreq = 20
w = np.linspace(0, np.pi, nfreq, endpoint=False)
arma = ArmaFft(ar, ma, 20)
ac1 = arma.invpowerspd(1024)[:10]
ac2 = arma.acovf(10)[:10]
assert_allclose(ac1, ac2, atol=1e-15,
err_msg='acovf not equal for %s, %s' % (ar, ma))
def test_lpol2index_index2lpol():
process = ArmaProcess([1, 0, 0, -0.8])
coefs, locs = lpol2index(process.arcoefs)
assert_almost_equal(coefs, [0.8])
assert_equal(locs, [2])
process = ArmaProcess([1, .1, .1, -0.8])
coefs, locs = lpol2index(process.arcoefs)
assert_almost_equal(coefs, [-.1, -.1, 0.8])
assert_equal(locs, [0, 1, 2])
ar = index2lpol(coefs, locs)
assert_equal(process.arcoefs, ar)
class TestArmaProcess(TestCase):
def test_empty_coeff(self):
process = ArmaProcess()
assert_equal(process.arcoefs, np.array([]))
assert_equal(process.macoefs, np.array([]))
process = ArmaProcess([1, -0.8])
assert_equal(process.arcoefs, np.array([0.8]))
assert_equal(process.macoefs, np.array([]))
process = ArmaProcess(ma=[1, -0.8])
assert_equal(process.arcoefs, np.array([]))
assert_equal(process.macoefs, np.array([-0.8]))
def test_from_coeff(self):
ar = [1.8, -0.9]
ma = [0.3]
process = ArmaProcess.from_coeffs(np.array(ar), np.array(ma))
ar.insert(0, -1)
ma.insert(0, 1)
ar_p = -1 * np.array(ar)
ma_p = ma
process_direct = ArmaProcess(ar_p, ma_p)
assert_equal(process.arcoefs, process_direct.arcoefs)
assert_equal(process.macoefs, process_direct.macoefs)
assert_equal(process.nobs, process_direct.nobs)
assert_equal(process.maroots, process_direct.maroots)
assert_equal(process.arroots, process_direct.arroots)
assert_equal(process.isinvertible, process_direct.isinvertible)
assert_equal(process.isstationary, process_direct.isstationary)
def test_from_model(self):
process = ArmaProcess([1, -.8], [1, .3], 1000)
t = 1000
rs = np.random.RandomState(12345)
y = process.generate_sample(t, burnin=100, distrvs=rs.standard_normal)
res = ARMA(y, (1, 1)).fit(disp=False)
process_model = ArmaProcess.from_estimation(res)
process_coef = ArmaProcess.from_coeffs(res.arparams, res.maparams, t)
assert_equal(process_model.arcoefs, process_coef.arcoefs)
assert_equal(process_model.macoefs, process_coef.macoefs)
assert_equal(process_model.nobs, process_coef.nobs)
assert_equal(process_model.isinvertible, process_coef.isinvertible)
assert_equal(process_model.isstationary, process_coef.isstationary)
def test_process_multiplication(self):
process1 = ArmaProcess.from_coeffs([.9])
process2 = ArmaProcess.from_coeffs([.7])
process3 = process1 * process2
assert_equal(process3.arcoefs, np.array([1.6, -0.7 * 0.9]))
assert_equal(process3.macoefs, np.array([]))
process1 = ArmaProcess.from_coeffs([.9], [.2])
process2 = ArmaProcess.from_coeffs([.7])
process3 = process1 * process2
assert_equal(process3.arcoefs, np.array([1.6, -0.7 * 0.9]))
assert_equal(process3.macoefs, np.array([0.2]))
process1 = ArmaProcess.from_coeffs([.9], [.2])
process2 = process1 * (np.array([1.0, -0.7]), np.array([1.0]))
assert_equal(process2.arcoefs, np.array([1.6, -0.7 * 0.9]))
assert_raises(TypeError, process1.__mul__, [3])
def test_str_repr(self):
process1 = ArmaProcess.from_coeffs([.9], [.2])
out = process1.__str__()
print(out)
assert_(out.find('AR: [1.0, -0.9]') != -1)
assert_(out.find('MA: [1.0, 0.2]') != -1)
out = process1.__repr__()
assert_(out.find('nobs=100') != -1)
assert_(out.find('at ' + str(hex(id(process1)))) != -1)
def test_acf(self):
process1 = ArmaProcess.from_coeffs([.9])
acf = process1.acf(10)
expected = np.array(0.9) ** np.arange(10.0)
assert_array_almost_equal(acf, expected)
acf = process1.acf()
assert_(acf.shape[0] == process1.nobs)
def test_pacf(self):
process1 = ArmaProcess.from_coeffs([.9])
pacf = process1.pacf(10)
expected = np.array([1, 0.9] + [0] * 8)
assert_array_almost_equal(pacf, expected)
pacf = process1.pacf()
assert_(pacf.shape[0] == process1.nobs)
def test_isstationary(self):
process1 = ArmaProcess.from_coeffs([1.1])
assert_equal(process1.isstationary, False)
process1 = ArmaProcess.from_coeffs([1.8, -0.9])
assert_equal(process1.isstationary, True)
process1 = ArmaProcess.from_coeffs([1.5, -0.5])
print(np.abs(process1.arroots))
assert_equal(process1.isstationary, False)
def test_arma2ar(self):
process1 = ArmaProcess.from_coeffs([], [0.8])
vals = process1.arma2ar(100)
assert_almost_equal(vals, (-0.8) ** np.arange(100.0))
def test_invertroots(self):
process1 = ArmaProcess.from_coeffs([], [2.5])
process2 = process1.invertroots(True)
assert_almost_equal(process2.ma, np.array([1.0, 0.4]))
process1 = ArmaProcess.from_coeffs([], [0.4])
process2 = process1.invertroots(True)
assert_almost_equal(process2.ma, np.array([1.0, 0.4]))
Loading ...