Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / vector_ar / tests / test_var_jmulti.py


import numpy as np
from numpy.testing import assert_, assert_allclose, assert_raises

import statsmodels.datasets.macrodata.data as macro
from statsmodels.tsa.vector_ar.tests.JMulTi_results.parse_jmulti_vecm_output \
    import sublists
from statsmodels.tsa.vector_ar.var_model import VAR
from .JMulTi_results.parse_jmulti_var_output import dt_s_tup_to_string
from .JMulTi_results.parse_jmulti_var_output import load_results_jmulti

atol = 0.001  # absolute tolerance
rtol = 0.01  # relative tolerance
datasets = []
data = {}
results_ref = {}
results_sm = {}

debug_mode = False
dont_test_se_t_p = False
deterministic_terms_list = ["nc", "c", "ct"]
seasonal_list = [0, 4]
dt_s_list = [(det, s) for det in deterministic_terms_list
             for s in seasonal_list]
all_tests = ["coefs", "det", "Sigma_u", "log_like", "fc", "causality",
             "impulse-response", "lag order", "test normality", "whiteness",
             "exceptions"]
to_test = all_tests  #["coefs", "det", "Sigma_u", "log_like", "fc", "causality"]  # all_tests


def load_data(dataset, data_dict):
    dtset = dataset.load_pandas()
    variables = dataset.variable_names
    loaded = dtset.data[variables].astype(float).values
    data_dict[dataset] = loaded.reshape((-1, len(variables)))


def reorder_jmultis_det_terms(jmulti_output, constant, seasons):
    """
    In case of seasonal terms and a trend term we have to reorder them to make
    the outputs from JMulTi and statsmodels comparable.
    JMulTi's ordering is: [constant], [seasonal terms], [trend term] while
    in statsmodels it is: [constant], [trend term], [seasonal terms]

    Parameters
    ----------
    jmulti_output : ndarray (neqs x number_of_deterministic_terms)

    constant : bool
        Indicates whether there is a constant term or not in jmulti_output.
    seasons : int
        Number of seasons in the model. That means there are seasons-1
        columns for seasonal terms in jmulti_output

    Returns
    -------
    reordered : ndarray (neqs x number_of_deterministic_terms)
        jmulti_output reordered such that the order of deterministic terms
        matches that of statsmodels.
    """
    if seasons == 0:
        return jmulti_output
    constant = int(constant)
    const_column = jmulti_output[:, :constant]
    season_columns = jmulti_output[:, constant:constant+seasons-1].copy()
    trend_columns = jmulti_output[:, constant+seasons-1:].copy()
    return np.hstack((const_column,
                      trend_columns,
                      season_columns))


def generate_exog_from_season(seasons, endog_len):
    """
    Translate seasons to exog matrix.

    Parameters
    ----------
    seasons : int
        Number of seasons.
    endog_len : int
        Number of observations.

    Returns
    -------
    exog : ndarray or None
        If seasonal deterministic terms exist, the corresponding exog-matrix is
        returned.
        Otherwise, None is returned.
    """

    exog_stack = []
    if seasons > 0:
        season_exog = np.zeros((seasons - 1, endog_len))
        for i in range(seasons - 1):
            season_exog[i, i::seasons] = 1
        # season_exog = season_exog[:, ::-1]
        # season_exog = np.hstack((season_exog[:, 3:4],
        #   season_exog[:, :-1]))
        # season_exog = np.hstack((season_exog[:, 2:4],
        #                          season_exog[:, :-2]))
        # season_exog = np.hstack((season_exog[:, 1:4], season_exog[:, :-3]))
        # season_exog[1] = -season_exog[1]
        # the following line is commented out because seasonal terms are
        # *not* centered in JMulTi's VAR-framework (in contrast to VECM)
        # season_exog -= 1 / seasons
        season_exog = season_exog.T
        exog_stack.append(season_exog)
    if exog_stack != []:
        exog = np.column_stack(exog_stack)
    else:
        exog = None
    return exog


def load_results_statsmodels(dataset):
    results_per_deterministic_terms = dict.fromkeys(dt_s_list)
    for dt_s_tup in dt_s_list:
        endog = data[dataset]
        exog = generate_exog_from_season(dt_s_tup[1], len(endog))
        model = VAR(endog, exog)
        results_per_deterministic_terms[dt_s_tup] = model.fit(
                maxlags=4, trend=dt_s_tup[0], method="ols")
    return results_per_deterministic_terms


def build_err_msg(ds, dt_s, parameter_str):
    dt = dt_s_tup_to_string(dt_s)
    seasons = dt_s[1]
    err_msg = "Error in " + parameter_str + " for:\n"
    err_msg += "- Dataset: " + ds.__str__() + "\n"
    err_msg += "- Deterministic terms: "
    err_msg += (dt_s[0] if dt != "nc" else "no det. terms")
    if seasons > 0:
        err_msg += ", seasons: " + str(seasons)
    return err_msg


def setup():
    datasets.append(macro)  # TODO: append more data sets for more test cases.

    for ds in datasets:
        load_data(ds, data)
        results_ref[ds] = load_results_jmulti(ds, dt_s_list)
        results_sm[ds] = load_results_statsmodels(ds)


setup()


def test_ols_coefs():
    if debug_mode:
        if "coefs" not in to_test:
            return
        print("\n\nESTIMATED PARAMETER MATRICES FOR LAGGED ENDOG", end="")
    for ds in datasets:
        for dt_s in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt_s) + ": ", end="")

            # estimated parameter vector
            err_msg = build_err_msg(ds, dt_s, "PARAMETER MATRICES ENDOG")
            obtained = np.hstack(results_sm[ds][dt_s].coefs)
            desired = results_ref[ds][dt_s]["est"]["Lagged endogenous term"]
            assert_allclose(obtained, desired, rtol, atol, False, err_msg)
            if debug_mode and dont_test_se_t_p:
                continue
            # standard errors
            obt = results_sm[ds][dt_s].stderr_endog_lagged
            des = results_ref[ds][dt_s]["se"]["Lagged endogenous term"].T
            assert_allclose(obt, des, rtol, atol, False, "STANDARD ERRORS\n" + err_msg)
            # t-values
            obt = results_sm[ds][dt_s].tvalues_endog_lagged
            des = results_ref[ds][dt_s]["t"]["Lagged endogenous term"].T
            assert_allclose(obt, des, rtol, atol, False, "t-VALUES\n" + err_msg)
            # p-values
            obt = results_sm[ds][dt_s].pvalues_endog_lagged
            des = results_ref[ds][dt_s]["p"]["Lagged endogenous term"].T
            assert_allclose(obt, des, rtol, atol, False, "p-VALUES\n" + err_msg)


def test_ols_det_terms():
    if debug_mode:
        if "det" not in to_test:
            return
        print("\n\nESTIMATED PARAMETERS FOR DETERMINISTIC TERMS", end="")
    for ds in datasets:
        for dt_s in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt_s) + ": ", end="")

            err_msg = build_err_msg(ds, dt_s, "PARAMETER MATRICES EXOG")
            det_key_ref = "Deterministic term"
            # If there are no det. terms, just make sure we do not compute any:
            if det_key_ref not in results_ref[ds][dt_s]["est"].keys():
                assert_((results_sm[ds][dt_s].coefs_exog.size == 0 and
                         results_sm[ds][dt_s].stderr_dt.size == 0 and
                         results_sm[ds][dt_s].tvalues_dt.size == 0 and
                         results_sm[ds][dt_s].pvalues_dt.size == 0), err_msg)
                continue
            obtained = results_sm[ds][dt_s].coefs_exog
            desired = results_ref[ds][dt_s]["est"][det_key_ref]
            desired = reorder_jmultis_det_terms(
                    desired, dt_s[0].startswith("c"), dt_s[1])
            assert_allclose(obtained, desired, rtol, atol, False, err_msg)
            if debug_mode and dont_test_se_t_p:
                continue
            # standard errors
            obt = results_sm[ds][dt_s].stderr_dt
            des = results_ref[ds][dt_s]["se"][det_key_ref]
            des = reorder_jmultis_det_terms(des, dt_s[0].startswith("c"),
                                            dt_s[1]).T
            assert_allclose(obt, des, rtol, atol, False, "STANDARD ERRORS\n" + err_msg)
            # t-values
            obt = results_sm[ds][dt_s].tvalues_dt
            des = results_ref[ds][dt_s]["t"][det_key_ref]
            des = reorder_jmultis_det_terms(des, dt_s[0].startswith("c"),
                                            dt_s[1]).T
            assert_allclose(obt, des, rtol, atol, False, "t-VALUES\n" + err_msg)
            # p-values
            obt = results_sm[ds][dt_s].pvalues_dt
            des = results_ref[ds][dt_s]["p"][det_key_ref]
            des = reorder_jmultis_det_terms(des, dt_s[0].startswith("c"),
                                            dt_s[1]).T
            assert_allclose(obt, des, rtol, atol, False, "p-VALUES\n" + err_msg)


def test_ols_sigma():
    if debug_mode:
        if "Sigma_u" not in to_test:
            return
        print("\n\nSIGMA_U", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")

            err_msg = build_err_msg(ds, dt, "Sigma_u")
            obtained = results_sm[ds][dt].sigma_u
            desired = results_ref[ds][dt]["est"]["Sigma_u"]
            assert_allclose(obtained, desired, rtol, atol, False, err_msg)


def test_log_like():
    if debug_mode:
        if "log_like" not in to_test:
            return
        else:
            print("\n\nLOG LIKELIHOOD", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")

            err_msg = build_err_msg(ds, dt, "Log Likelihood")
            obtained = results_sm[ds][dt].llf
            desired = results_ref[ds][dt]["log_like"]
            assert_allclose(obtained, desired, rtol, atol, False, err_msg)


def test_fc():
    if debug_mode:
        if "fc" not in to_test:
            return
        else:
            print("\n\nFORECAST", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")
            steps = 5  # parsed JMulTi output comprises 5 steps
            last_observations = results_sm[ds][dt].endog[
                -results_sm[ds][dt].k_ar:]
            seasons = dt[1]
            if seasons == 0:
                exog_future = None
            else:
                exog_future = np.zeros((steps, seasons-1))
                # the following line is appropriate only if the last
                # observation was in the next to last season (this is the case
                # for macrodata)
                exog_future[1:seasons] = np.identity(seasons-1)
            err_msg = build_err_msg(ds, dt, "FORECAST")
            # test point forecast functionality of forecast method
            obtained = results_sm[ds][dt].forecast(
                y=last_observations, steps=steps, exog_future=exog_future)
            desired = results_ref[ds][dt]["fc"]["fc"]
            assert_allclose(obtained, desired, rtol, atol, False, err_msg)

            # test forecast method with confidence interval calculation
            err_msg = build_err_msg(ds, dt, "FORECAST WITH INTERVALS")
            obtained = results_sm[ds][dt].forecast_interval(
                y=last_observations, steps=steps, alpha=0.05,
                exog_future=exog_future)
            obt = obtained[0]  # forecast
            obt_l = obtained[1]  # lower bound
            obt_u = obtained[2]  # upper bound
            des = results_ref[ds][dt]["fc"]["fc"]
            des_l = results_ref[ds][dt]["fc"]["lower"]
            des_u = results_ref[ds][dt]["fc"]["upper"]
            assert_allclose(obt, des, rtol, atol, False, err_msg)
            assert_allclose(obt_l, des_l, rtol, atol, False, err_msg)
            assert_allclose(obt_u, des_u, rtol, atol, False, err_msg)


def test_causality():  # test Granger- and instantaneous causality
    if debug_mode:
        if "causality" not in to_test:
            return
        else:
            print("\n\nCAUSALITY", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")

            err_msg_g_p = build_err_msg(ds, dt, "GRANGER CAUS. - p-VALUE")
            err_msg_g_t = build_err_msg(ds, dt, "GRANGER CAUS. - TEST STAT.")
            err_msg_i_p = build_err_msg(ds, dt, "INSTANT. CAUS. - p-VALUE")
            err_msg_i_t = build_err_msg(ds, dt, "INSTANT. CAUS. - TEST STAT.")
            v_ind = range(len(ds.variable_names))
            for causing_ind in sublists(v_ind, 1, len(v_ind)-1):
                causing_names = ["y" + str(i+1) for i in causing_ind]
                causing_key = tuple(ds.variable_names[i] for i in causing_ind)

                caused_ind = [i for i in v_ind if i not in causing_ind]
                caused_names = ["y" + str(i+1) for i in caused_ind]
                caused_key = tuple(ds.variable_names[i] for i in caused_ind)

                # test Granger-causality ######################################
                granger_sm_ind = results_sm[ds][
                    dt].test_causality(caused_ind, causing_ind)
                granger_sm_str = results_sm[ds][
                    dt].test_causality(caused_names, causing_names)

                # test test-statistic for Granger non-causality:
                g_t_obt = granger_sm_ind.test_statistic
                g_t_des = results_ref[ds][dt]["granger_caus"][
                    "test_stat"][(causing_key, caused_key)]
                assert_allclose(g_t_obt, g_t_des, rtol, atol, False, err_msg_g_t)
                # check whether string sequences as args work in the same way:
                g_t_obt_str = granger_sm_str.test_statistic
                assert_allclose(g_t_obt_str, g_t_obt, 1e-07, 0, False,
                                err_msg_g_t + " - sequences of integers and ".upper() +
                                "strings as arguments do not yield the same result!".upper())
                # check if int (e.g. 0) as index and list of int ([0]) yield
                # the same result:
                if len(causing_ind) == 1 or len(caused_ind) == 1:
                    ci = causing_ind[0] if len(causing_ind)==1 else causing_ind
                    ce = caused_ind[0] if len(caused_ind) == 1 else caused_ind
                    granger_sm_single_ind = results_sm[ds][
                        dt].test_causality(ce, ci)
                    g_t_obt_single = granger_sm_single_ind.test_statistic
                    assert_allclose(g_t_obt_single, g_t_obt, 1e-07, 0, False,
                                    err_msg_g_t + " - list of int and int as ".upper() +
                                    "argument do not yield the same result!".upper())

                # test p-value for Granger non-causality:
                g_p_obt = granger_sm_ind.pvalue
                g_p_des = results_ref[ds][dt]["granger_caus"]["p"][(
                    causing_key, caused_key)]
                assert_allclose(g_p_obt, g_p_des, rtol, atol, False, err_msg_g_p)
                # check whether string sequences as args work in the same way:
                g_p_obt_str = granger_sm_str.pvalue
                assert_allclose(g_p_obt_str, g_p_obt, 1e-07, 0, False,
                                err_msg_g_t + " - sequences of integers and ".upper() +
                                "strings as arguments do not yield the same result!".upper())
                # check if int (e.g. 0) as index and list of int ([0]) yield
                # the same result:
                if len(causing_ind) == 1:
                    g_p_obt_single = granger_sm_single_ind.pvalue
                    assert_allclose(g_p_obt_single, g_p_obt, 1e-07, 0, False,
                        err_msg_g_t + " - list of int and int as ".upper() + \
                                    "argument do not yield the same result!".upper())

                # test instantaneous causality ################################
                inst_sm_ind = results_sm[ds][dt].test_inst_causality(
                    causing_ind)
                inst_sm_str = results_sm[ds][dt].test_inst_causality(
                    causing_names)
                # test test-statistic for instantaneous non-causality
                t_obt = inst_sm_ind.test_statistic
                t_des = results_ref[ds][dt]["inst_caus"][
                    "test_stat"][(causing_key, caused_key)]
                assert_allclose(t_obt, t_des, rtol, atol, False, err_msg_i_t)
                # check whether string sequences as args work in the same way:
                t_obt_str = inst_sm_str.test_statistic
                assert_allclose(t_obt_str, t_obt, 1e-07, 0, False,
                                err_msg_i_t + " - sequences of integers and ".upper() +
                                "strings as arguments do not yield the same result!".upper())
                # check if int (e.g. 0) as index and list of int ([0]) yield
                # the same result:
                if len(causing_ind) == 1:
                    inst_sm_single_ind = results_sm[ds][
                        dt].test_inst_causality(causing_ind[0])
                    t_obt_single = inst_sm_single_ind.test_statistic
                    assert_allclose(t_obt_single, t_obt, 1e-07, 0, False,
                                    err_msg_i_t + " - list of int and int as ".upper() +
                                    "argument do not yield the same result!".upper())

                # test p-value for instantaneous non-causality
                p_obt = results_sm[ds][dt].test_inst_causality(
                    causing_ind).pvalue
                p_des = results_ref[ds][dt]["inst_caus"]["p"][(
                    causing_key, caused_key)]
                assert_allclose(p_obt, p_des, rtol, atol, False, err_msg_i_p)
                # check whether string sequences as args work in the same way:
                p_obt_str = inst_sm_str.pvalue
                assert_allclose(p_obt_str, p_obt, 1e-07, 0, False,
                                err_msg_i_p + " - sequences of integers and ".upper() +
                                "strings as arguments do not yield the same result!".upper())
                # check if int (e.g. 0) as index and list of int ([0]) yield
                # the same result:
                if len(causing_ind) == 1:
                    inst_sm_single_ind = results_sm[ds][
                        dt].test_inst_causality(causing_ind[0])
                    p_obt_single = inst_sm_single_ind.pvalue
                    assert_allclose(p_obt_single, p_obt, 1e-07, 0, False,
                                    err_msg_i_p + " - list of int and int as ".upper() +
                                    "argument do not yield the same result!".upper())


def test_impulse_response():
    if debug_mode:
        if "impulse-response" not in to_test:
            return
        else:
            print("\n\nIMPULSE-RESPONSE", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")
            err_msg = build_err_msg(ds, dt, "IMULSE-RESPONSE")
            periods = 20
            obtained_all = results_sm[ds][dt].irf(periods=periods).irfs
            # flatten inner arrays to make them comparable to parsed results:
            obtained_all = obtained_all.reshape(periods+1, -1)
            desired_all = results_ref[ds][dt]["ir"]
            assert_allclose(obtained_all, desired_all, rtol, atol, False, err_msg)


def test_lag_order_selection():
    if debug_mode:
        if "lag order" not in to_test:
            return
        else:
            print("\n\nLAG ORDER SELECTION", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")
            endog_tot = data[ds]
            exog = generate_exog_from_season(dt[1], len(endog_tot))
            model = VAR(endog_tot, exog)
            obtained_all = model.select_order(10, trend=dt[0])
            for ic in ["aic", "fpe", "hqic", "bic"]:
                err_msg = build_err_msg(ds, dt,
                                        "LAG ORDER SELECTION - " + ic.upper())
                obtained = getattr(obtained_all, ic)
                desired = results_ref[ds][dt]["lagorder"][ic]
                assert_allclose(obtained, desired, rtol, atol, False, err_msg)


def test_normality():
    if debug_mode:
        if "test normality" not in to_test:
            return
        else:
            print("\n\nTEST NON-NORMALITY", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")

            obtained = results_sm[ds][dt].test_normality(signif=0.05)
            err_msg = build_err_msg(ds, dt, "TEST NON-NORMALITY - STATISTIC")
            obt_statistic = obtained.test_statistic
            des_statistic = results_ref[ds][dt]["test_norm"][
                "joint_test_statistic"]
            assert_allclose(obt_statistic, des_statistic, rtol, atol, False, err_msg)
            err_msg = build_err_msg(ds, dt, "TEST NON-NORMALITY - P-VALUE")
            obt_pvalue = obtained.pvalue
            des_pvalue = results_ref[ds][dt]["test_norm"]["joint_pvalue"]
            assert_allclose(obt_pvalue, des_pvalue, rtol, atol, False, err_msg)
            # call methods to assure they do not raise exceptions
            obtained.summary()
            str(obtained)  # __str__()


def test_whiteness():
    if debug_mode:
        if "whiteness" not in to_test:
            return
        else:
            print("\n\nTEST WHITENESS OF RESIDUALS", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")
            lags = results_ref[ds][dt]["whiteness"]["tested order"]

            obtained = results_sm[ds][dt].test_whiteness(nlags=lags)
            # test statistic
            err_msg = build_err_msg(ds, dt, "WHITENESS OF RESIDUALS - "
                                            "TEST STATISTIC")
            desired = results_ref[ds][dt]["whiteness"]["test statistic"]
            assert_allclose(obtained.test_statistic, desired, rtol, atol, False, err_msg)
            # p-value
            err_msg = build_err_msg(ds, dt, "WHITENESS OF RESIDUALS - "
                                            "P-VALUE")
            desired = results_ref[ds][dt]["whiteness"]["p-value"]
            assert_allclose(obtained.pvalue, desired, rtol, atol, False, err_msg)

            obtained = results_sm[ds][dt].test_whiteness(nlags=lags,
                                                         adjusted=True)
            # test statistic (adjusted Portmanteau test)
            err_msg = build_err_msg(ds, dt, "WHITENESS OF RESIDUALS - "
                                            "TEST STATISTIC (ADJUSTED TEST)")
            desired = results_ref[ds][dt]["whiteness"]["test statistic adj."]
            assert_allclose(obtained.test_statistic, desired, rtol, atol, False, err_msg)
            # p-value (adjusted Portmanteau test)
            err_msg = build_err_msg(ds, dt, "WHITENESS OF RESIDUALS - "
                                            "P-VALUE (ADJUSTED TEST)")
            desired = results_ref[ds][dt]["whiteness"]["p-value adjusted"]
            assert_allclose(obtained.pvalue, desired, rtol, atol, False, err_msg)


def test_exceptions():
    if debug_mode:
        if "exceptions" not in to_test:
            return
        else:
            print("\n\nEXCEPTIONS\n", end="")
    for ds in datasets:
        for dt in dt_s_list:
            if debug_mode:
                print("\n" + dt_s_tup_to_string(dt) + ": ", end="")

            # instant causality:
            ### 0<signif<1
            assert_raises(ValueError, results_sm[ds][dt].test_inst_causality, 0, 0) # this means signif=0
            ### causing must be int, str or iterable of int or str
            assert_raises(TypeError, results_sm[ds][dt].test_inst_causality, [0.5])  # 0.5 not an int