# Helper functions for working with KFAS output
import numpy as np
import pandas as pd
from statsmodels.tools.tools import Bunch
def parse(path, ssm):
# Dimensions
n = ssm.nobs
p = ssm.k_endog
m = ssm.k_states
r = ssm.k_posdef
p2 = p**2
m2 = m**2
mp = m * p
r2 = r**2
# Extract the different pieces of output from KFAS
kfas = pd.read_csv(path)
components = [('r', m), ('r0', m), ('r1', m),
('N', m2), ('N0', m2), ('N1', m2), ('N2', m2),
('m', p), ('v', p), ('F', p), ('Finf', p),
('K', mp), ('Kinf', mp), ('a', m), ('P', m2),
('Pinf', m2), ('att', m), ('Ptt', m2),
('alphahat', m), ('V', m2), ('muhat', p),
('V_mu', p2), ('etahat', r), ('V_eta', r2), ('epshat', p),
('V_eps', p), ('llf', 1)]
dta = {}
ix = 0
for key, length in components:
dta[key] = kfas.iloc[:, ix:ix + length].fillna(0)
dta[key].name = None
ix += length
# Reformat the KFAS output to compare with statsmodels output
res = Bunch()
d = len(kfas['Pinf_1'].dropna())
# forecasts
res['forecasts'] = dta['m'].values[:n].T
res['forecasts_error'] = dta['v'].values[:n].T
res['forecasts_error_cov'] = np.c_[
[np.diag(x) for y, x in dta['F'].iloc[:n].iterrows()]].T
res['forecasts_error_diffuse_cov'] = np.c_[
[np.diag(x) for y, x in dta['Finf'].iloc[:n].iterrows()]].T
res['kalman_gain'] = dta['K'].values[:n].reshape(n, m, p, order='F').T
res['Kinf'] = dta['Kinf'].values[:n].reshape(n, m, p, order='F')
# filtered
res['filtered_state'] = dta['att'].values[:n].T
res['filtered_state_cov'] = dta['Ptt'].values[:n].reshape(
n, m, m, order='F').T
# predicted
res['predicted_state'] = dta['a'].values.T
res['predicted_state_cov'] = dta['P'].values.reshape(
n + 1, m, m, order='F').T
res['predicted_diffuse_state_cov'] = dta['Pinf'].values.reshape(
n + 1, m, m, order='F').T
# loglike
# Note: KFAS only gives the total loglikelihood
res['llf_obs'] = dta['llf'].values[0, 0]
# smoothed
res['smoothed_state'] = dta['alphahat'].values[:n].T
res['smoothed_state_cov'] = dta['V'].values[:n].reshape(
n, m, m, order='F').T
res['smoothed_measurement_disturbance'] = dta['epshat'].values[:n].T
res['smoothed_measurement_disturbance_cov'] = np.c_[
[np.diag(x) for y, x in dta['V_eps'].iloc[:n].iterrows()]].T
res['smoothed_state_disturbance'] = dta['etahat'].values[:n].T
res['smoothed_state_disturbance_cov'] = dta['V_eta'].values[:n].reshape(
n, r, r, order='F').T
# scaled smoothed estimator
# Note: we store both r and r0 together as "scaled smoothed estimator"
# while "scaled smoothed diffuse estimator" corresponds to r1
res['scaled_smoothed_estimator'] = np.c_[dta['r0'][:d].T,
dta['r'][d:].T][..., 1:]
res['scaled_smoothed_diffuse_estimator'] = dta['r1'].values.T
N0 = dta['N0'].values[:d].reshape(d, m, m, order='F')
N = dta['N'].values[d:].reshape(n + 1 - d, m, m, order='F')
res['scaled_smoothed_estimator_cov'] = np.c_[N0.T, N.T][..., 1:]
# Have to do a more precise transpose for these since they may not be
# symmetric
res['scaled_smoothed_diffuse1_estimator_cov'] = dta['N1'].values.reshape(
n + 1, m, m, order='F').transpose(1, 2, 0)
res['scaled_smoothed_diffuse2_estimator_cov'] = dta['N2'].values.reshape(
n + 1, m, m, order='F').transpose(1, 2, 0)
# Save the results object for the tests
return res