Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ gam / tests / test_gam.py

# pylint: disable=F841
"""
unit test for GAM

Author: Luca Puggini

Created on 08/07/2015
"""

import os
import numpy as np
from numpy.testing import assert_allclose
import pandas as pd
from scipy.linalg import block_diag
import pytest

from statsmodels.tools.linalg import matrix_sqrt
from statsmodels.gam.smooth_basis import (
    UnivariatePolynomialSmoother, PolynomialSmoother, BSplines,
    GenericSmoothers, UnivariateCubicSplines, CyclicCubicSplines)
from statsmodels.gam.generalized_additive_model import (
    GLMGam, LogitGam, make_augmented_matrix, penalized_wls)
from statsmodels.gam.gam_cross_validation.gam_cross_validation import (
    MultivariateGAMCV, MultivariateGAMCVPath, _split_train_test_smoothers)
from statsmodels.gam.gam_penalties import (UnivariateGamPenalty,
                                           MultivariateGamPenalty)
from statsmodels.gam.gam_cross_validation.cross_validators import KFold
from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.genmod.families.family import Gaussian
from statsmodels.genmod.generalized_linear_model import lm

sigmoid = np.vectorize(lambda x: 1.0 / (1.0 + np.exp(-x)))


def polynomial_sample_data():
    """A polynomial of degree 4

    poly = ax^4 + bx^3 + cx^2 + dx + e
    second der = 12ax^2 + 6bx + 2c
    integral from -1 to 1 of second der^2 is
        (288 a^2)/5 + 32 a c + 8 (3 b^2 + c^2)
    the gradient of the integral is der
        [576*a/5 + 32 * c, 48*b, 32*a + 16*c, 0, 0]

    Returns
    -------
    poly : smoother instance
    y : ndarray
        generated function values, demeaned
    """
    n = 10000
    x = np.linspace(-1, 1, n)
    y = 2 * x ** 3 - x
    y -= y.mean()

    degree = [4]
    pol = PolynomialSmoother(x, degree)

    return pol, y


def integral(params):
    d, c, b, a = params
    itg = (288 * a ** 2) / 5 + (32 * a * c) + 8 * (3 * b ** 2 + c ** 2)
    itg /= 2
    return itg


def grad(params):
    d, c, b, a = params
    grd = np.array([576 * a / 5 + 32 * c, 48 * b, 32 * a + 16 * c, 0])
    grd = grd[::-1]
    return grd / 2


def hessian(params):
    hess = np.array([[576 / 5, 0, 32, 0],
                     [0, 48, 0, 0],
                     [32, 0, 16, 0],
                     [0, 0, 0, 0]
                     ])
    return hess / 2


def cost_function(params, pol, y, alpha):
    # this should be the MSE or log likelihood value
    lin_pred = np.dot(pol.basis, params)
    gaussian = Gaussian()
    expval = gaussian.link.inverse(lin_pred)
    loglike = gaussian.loglike(y, expval)

    # this is the vale of the GAM penalty. For the example polynomial
    itg = integral(params)

    # return the cost function of the GAM for the given polynomial
    return loglike - alpha * itg, loglike, itg


def test_gam_penalty():
    """
    test the func method of the gam penalty
    :return:
    """
    pol, y = polynomial_sample_data()
    univ_pol = pol.smoothers[0]
    alpha = 1
    gp = UnivariateGamPenalty(alpha=alpha, univariate_smoother=univ_pol)

    for _ in range(10):
        params = np.random.randint(-2, 2, 4)
        gp_score = gp.func(params)
        itg = integral(params)
        assert_allclose(gp_score, itg, atol=1.e-1)


def test_gam_gradient():
    # test the gam gradient for the example polynomial
    np.random.seed(1)
    pol, y = polynomial_sample_data()

    alpha = 1
    smoother = pol.smoothers[0]
    gp = UnivariateGamPenalty(alpha=alpha, univariate_smoother=smoother)

    for _ in range(10):
        params = np.random.uniform(-2, 2, 4)
        params = np.array([1, 1, 1, 1])
        gam_grad = gp.deriv(params)
        grd = grad(params)

        assert_allclose(gam_grad, grd, rtol=1.e-2, atol=1.e-2)


def test_gam_hessian():
    # test the deriv2 method of the gam penalty
    np.random.seed(1)
    pol, y = polynomial_sample_data()
    univ_pol = pol.smoothers[0]
    alpha = 1
    gp = UnivariateGamPenalty(alpha=alpha, univariate_smoother=univ_pol)

    for _ in range(10):
        params = np.random.randint(-2, 2, 5)
        gam_der2 = gp.deriv2(params)
        hess = hessian(params)
        hess = np.flipud(hess)
        hess = np.fliplr(hess)
        assert_allclose(gam_der2, hess, atol=1.e-13, rtol=1.e-3)


def test_approximation():
    np.random.seed(1)
    poly, y = polynomial_sample_data()
    alpha = 1
    for _ in range(10):
        params = np.random.uniform(-1, 1, 4)
        cost, err, itg = cost_function(params, poly, y, alpha)
        glm_gam = GLMGam(y, smoother=poly, alpha=alpha)
        # TODO: why do we need pen_weight=1
        gam_loglike = glm_gam.loglike(params, scale=1, pen_weight=1)
        assert_allclose(err - itg, cost, rtol=1e-10)
        assert_allclose(gam_loglike, cost, rtol=0.1)


def test_gam_glm():
    cur_dir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(cur_dir, "results", "prediction_from_mgcv.csv")
    data_from_r = pd.read_csv(file_path)
    # dataset used to train the R model
    x = data_from_r.x.values
    y = data_from_r.y.values

    df = [10]
    degree = [3]
    bsplines = BSplines(x, degree=degree, df=df, include_intercept=True)
    # y_mgcv is obtained from R with the following code
    # g = gam(y~s(x, k = 10, bs = "cr"), data = data, scale = 80)
    y_mgcv = np.asarray(data_from_r.y_est)

    alpha = 0.1  # chosen by trial and error

    glm_gam = GLMGam(y, smoother=bsplines, alpha=alpha)
    res_glm_gam = glm_gam.fit(method='bfgs', max_start_irls=0,
                              disp=1, maxiter=10000, maxfun=5000)
    y_gam0 = np.dot(bsplines.basis, res_glm_gam.params)
    y_gam = np.asarray(res_glm_gam.fittedvalues)
    assert_allclose(y_gam, y_gam0, rtol=1e-10)

    # plt.plot(x, y_gam, '.', label='gam')
    # plt.plot(x, y_mgcv, '.', label='mgcv')
    # plt.plot(x, y, '.', label='y')
    # plt.legend()
    # plt.show()

    assert_allclose(y_gam, y_mgcv, atol=1.e-2)


def test_gam_discrete():
    cur_dir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(cur_dir, "results", "prediction_from_mgcv.csv")
    data_from_r = pd.read_csv(file_path)
    # dataset used to train the R model
    x = data_from_r.x.values
    y = data_from_r.ybin.values

    df = [10]
    degree = [5]
    bsplines = BSplines(x, degree=degree, df=df, include_intercept=True)

    # y_mgcv is obtained from R with the following code
    # g = gam(y~s(x, k = 10, bs = "cr"), data = data, scale = 80)
    y_mgcv = data_from_r.ybin_est

    alpha = 0.00002
    # gp = UnivariateGamPenalty(alpha=alpha, univariate_smoother=bsplines)
    # lg_gam = LogitGam(y, bsplines.basis, penal=gp)
    #
    lg_gam = LogitGam(y, bsplines, alpha=alpha)
    res_lg_gam = lg_gam.fit(maxiter=10000)
    y_gam = np.dot(bsplines.basis, res_lg_gam.params)
    y_gam = sigmoid(y_gam)
    y_mgcv = sigmoid(y_mgcv)

    # plt.plot(x, y_gam, label='gam')
    # plt.plot(x, y_mgcv, label='mgcv')
    # plt.plot(x, y, '.', label='y')
    # plt.ylim(-0.4, 1.4)
    # plt.legend()
    # plt.show()

    assert_allclose(y_gam, y_mgcv, rtol=1.e-10, atol=1.e-1)


def multivariate_sample_data(seed=1):
    n = 1000
    x1 = np.linspace(-1, 1, n)
    x2 = np.linspace(-10, 10, n)
    x = np.vstack([x1, x2]).T
    np.random.seed(seed)
    y = x1 * x1 * x1 + x2 + np.random.normal(0, 0.01, n)
    degree1 = 4
    degree2 = 3
    degrees = [degree1, degree2]
    pol = PolynomialSmoother(x, degrees)
    return x, y, pol


def test_multivariate_penalty():
    alphas = [1, 2]
    weights = [1, 1]
    np.random.seed(1)
    x, y, pol = multivariate_sample_data()

    univ_pol1 = UnivariatePolynomialSmoother(x[:, 0], degree=pol.degrees[0])
    univ_pol2 = UnivariatePolynomialSmoother(x[:, 1], degree=pol.degrees[1])

    gp1 = UnivariateGamPenalty(alpha=alphas[0], univariate_smoother=univ_pol1)
    gp2 = UnivariateGamPenalty(alpha=alphas[1], univariate_smoother=univ_pol2)
    with pytest.warns(UserWarning, match="weights is currently ignored"):
        mgp = MultivariateGamPenalty(multivariate_smoother=pol, alpha=alphas,
                                     weights=weights)

    for i in range(10):
        params1 = np.random.randint(-3, 3, pol.smoothers[0].dim_basis)
        params2 = np.random.randint(-3, 3, pol.smoothers[1].dim_basis)
        params = np.concatenate([params1, params2])
        c1 = gp1.func(params1)
        c2 = gp2.func(params2)
        c = mgp.func(params)
        assert_allclose(c, c1 + c2, atol=1.e-10, rtol=1.e-10)

        d1 = gp1.deriv(params1)
        d2 = gp2.deriv(params2)
        d12 = np.concatenate([d1, d2])
        d = mgp.deriv(params)
        assert_allclose(d, d12)

        h1 = gp1.deriv2(params1)
        h2 = gp2.deriv2(params2)
        h12 = block_diag(h1, h2)
        h = mgp.deriv2(params)
        assert_allclose(h, h12)


def test_generic_smoother():
    x, y, poly = multivariate_sample_data()
    alphas = [0.4, 0.7]
    weights = [1, 1]  # noqa: F841

    gs = GenericSmoothers(poly.x, poly.smoothers)
    gam_gs = GLMGam(y, smoother=gs, alpha=alphas)
    gam_gs_res = gam_gs.fit()

    gam_poly = GLMGam(y, smoother=poly, alpha=alphas)
    gam_poly_res = gam_poly.fit()

    assert_allclose(gam_gs_res.params, gam_poly_res.params)


def test_multivariate_gam_1d_data():
    cur_dir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(cur_dir, "results", "prediction_from_mgcv.csv")
    data_from_r = pd.read_csv(file_path)
    # dataset used to train the R model
    x = data_from_r.x.values
    y = data_from_r.y

    df = [10]
    degree = [3]
    bsplines = BSplines(x, degree=degree, df=df)
    # y_mgcv is obtained from R with the following code
    # g = gam(y~s(x, k = 10, bs = "cr"), data = data, scale = 80)
    y_mgcv = data_from_r.y_est

    # alpha is by manually adjustment to reduce discrepancy in fittedvalues
    alpha = [0.0168 * 0.0251 / 2 * 500]
    gp = MultivariateGamPenalty(bsplines, alpha=alpha)    # noqa: F841

    glm_gam = GLMGam(y, exog=np.ones((len(y), 1)), smoother=bsplines,
                     alpha=alpha)
    # "nm" converges to a different params, "bfgs" params are close to pirls
    # res_glm_gam = glm_gam.fit(method='nm', max_start_irls=0,
    #                           disp=1, maxiter=10000, maxfun=5000)
    res_glm_gam = glm_gam.fit(method='pirls', max_start_irls=0,
                              disp=1, maxiter=10000)
    y_gam = res_glm_gam.fittedvalues

    # plt.plot(x, y_gam, '.', label='gam')
    # plt.plot(x, y_mgcv, '.', label='mgcv')
    # plt.plot(x, y, '.', label='y')
    # plt.legend()
    # plt.show()

    assert_allclose(y_gam, y_mgcv, atol=0.01)


def test_multivariate_gam_cv():
    # SMOKE test
    # no test is performed. It only checks that there is not any runtime error

    def cost(x1, x2):
        return np.linalg.norm(x1 - x2) / len(x1)

    cur_dir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(cur_dir, "results", "prediction_from_mgcv.csv")
Loading ...