Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ base / tests / test_shrink_pickle.py

# -*- coding: utf-8 -*-
"""

Created on Fri Mar 09 16:00:27 2012

Author: Josef Perktold
"""
from statsmodels.compat.pandas import assert_series_equal

import pickle
from io import BytesIO

import numpy as np
import pandas as pd
import pytest
from numpy.testing import assert_
# we need log in module namespace for TestPickleFormula5
from numpy import log  # noqa:F401

import statsmodels.api as sm
import statsmodels.genmod.generalized_linear_model as glm
from statsmodels.compat.python import iterkeys


def check_pickle(obj):
    fh = BytesIO()
    pickle.dump(obj, fh, protocol=pickle.HIGHEST_PROTOCOL)
    plen = fh.tell()
    fh.seek(0, 0)
    res = pickle.load(fh)
    fh.close()
    return res, plen


class RemoveDataPickle(object):

    @classmethod
    def setup_class(cls):
        nobs = 1000
        np.random.seed(987689)
        x = np.random.randn(nobs, 3)
        x = sm.add_constant(x)
        cls.exog = x
        cls.xf = 0.25 * np.ones((2, 4))
        cls.predict_kwds = {}

    def test_remove_data_pickle(self):

        results = self.results
        xf = self.xf
        pred_kwds = self.predict_kwds
        pred1 = results.predict(xf, **pred_kwds)
        # create some cached attributes
        results.summary()
        res = results.summary2()  # SMOKE test also summary2

        # uncomment the following to check whether tests run (7 failures now)
        # np.testing.assert_equal(res, 1)

        # check pickle unpickle works on full results
        # TODO: drop of load save is tested
        res, orig_nbytes = check_pickle(results._results)

        # remove data arrays, check predict still works
        if isinstance(results, glm.GLMResultsWrapper):
            with pytest.warns(FutureWarning, match="Anscombe residuals"):
                results.remove_data()
        else:
            results.remove_data()

        pred2 = results.predict(xf, **pred_kwds)

        if isinstance(pred1, pd.Series) and isinstance(pred2, pd.Series):
            assert_series_equal(pred1, pred2)
        elif isinstance(pred1, pd.DataFrame) and isinstance(pred2,
                                                            pd.DataFrame):
            assert_(pred1.equals(pred2))
        else:
            np.testing.assert_equal(pred2, pred1)

        # pickle and unpickle reduced array
        res, nbytes = check_pickle(results._results)

        # for testing attach res
        self.res = res

        assert_(nbytes < orig_nbytes,
                msg='pickle length not %d < %d' % (nbytes, orig_nbytes))

        pred3 = results.predict(xf, **pred_kwds)

        if isinstance(pred1, pd.Series) and isinstance(pred3, pd.Series):
            assert_series_equal(pred1, pred3)
        elif isinstance(pred1, pd.DataFrame) and isinstance(pred3,
                                                            pd.DataFrame):
            assert_(pred1.equals(pred3))
        else:
            np.testing.assert_equal(pred3, pred1)

    def test_remove_data_docstring(self):
        assert_(self.results.remove_data.__doc__ is not None)

    def test_pickle_wrapper(self):

        fh = BytesIO()  # use pickle with binary content

        # test unwrapped results load save pickle
        self.results._results.save(fh)
        fh.seek(0, 0)
        res_unpickled = self.results._results.__class__.load(fh)
        assert type(res_unpickled) is type(self.results._results)  # noqa: E721

        # test wrapped results load save
        fh.seek(0, 0)
        self.results.save(fh)
        fh.seek(0, 0)
        res_unpickled = self.results.__class__.load(fh)
        fh.close()
        assert type(res_unpickled) is type(self.results)  # noqa: E721

        before = sorted(iterkeys(self.results.__dict__))
        after = sorted(iterkeys(res_unpickled.__dict__))
        assert_(before == after, msg='not equal %r and %r' % (before, after))

        before = sorted(iterkeys(self.results._results.__dict__))
        after = sorted(iterkeys(res_unpickled._results.__dict__))
        assert_(before == after, msg='not equal %r and %r' % (before, after))

        before = sorted(iterkeys(self.results.model.__dict__))
        after = sorted(iterkeys(res_unpickled.model.__dict__))
        assert_(before == after, msg='not equal %r and %r' % (before, after))

        before = sorted(iterkeys(self.results._cache))
        after = sorted(iterkeys(res_unpickled._cache))
        assert_(before == after, msg='not equal %r and %r' % (before, after))


class TestRemoveDataPickleOLS(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y = x.sum(1) + np.random.randn(x.shape[0])
        self.results = sm.OLS(y, self.exog).fit()


class TestRemoveDataPickleWLS(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y = x.sum(1) + np.random.randn(x.shape[0])
        self.results = sm.WLS(y, self.exog, weights=np.ones(len(y))).fit()


class TestRemoveDataPicklePoisson(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y_count = np.random.poisson(np.exp(x.sum(1) - x.mean()))

        # bug with default
        model = sm.Poisson(y_count, x)

        # use start_params to converge faster
        start_params = np.array(
            [0.75334818, 0.99425553, 1.00494724, 1.00247112])
        self.results = model.fit(start_params=start_params, method='bfgs',
                                 disp=0)

        # TODO: temporary, fixed in master
        self.predict_kwds = dict(exposure=1, offset=0)


class TestRemoveDataPickleNegativeBinomial(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        np.random.seed(987689)
        data = sm.datasets.randhie.load(as_pandas=False)
        mod = sm.NegativeBinomial(data.endog, data.exog)
        self.results = mod.fit(disp=0)


class TestRemoveDataPickleLogit(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        nobs = x.shape[0]
        np.random.seed(987689)
        y_bin = (np.random.rand(nobs) < 1.0 / (
                    1 + np.exp(x.sum(1) - x.mean()))).astype(int)

        # bug with default
        model = sm.Logit(y_bin, x)

        # use start_params to converge faster
        start_params = np.array(
            [-0.73403806, -1.00901514, -0.97754543, -0.95648212])
        self.results = model.fit(start_params=start_params, method='bfgs',
                                 disp=0)


class TestRemoveDataPickleRLM(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y = x.sum(1) + np.random.randn(x.shape[0])
        self.results = sm.RLM(y, self.exog).fit()


class TestRemoveDataPickleGLM(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y = x.sum(1) + np.random.randn(x.shape[0])
        self.results = sm.GLM(y, self.exog).fit()

    def test_cached_data_removed(self):
        res = self.results
        # fill data-like members of the cache
        names = ['resid_response', 'resid_deviance',
                 'resid_pearson', 'resid_anscombe']
        with pytest.warns(FutureWarning, match="Anscombe residuals"):
            for name in names:
                getattr(res, name)
        # check that the attributes are present before calling remove_data
        for name in names:
            assert name in res._cache
            assert res._cache[name] is not None

        res.remove_data()
        for name in names:
            assert res._cache[name] is None

    def test_cached_values_evaluated(self):
        # check that value-like attributes are evaluated before data
        # is removed
        res = self.results
        assert res._cache == {}
        with pytest.warns(FutureWarning, match="Anscombe residuals"):
            res.remove_data()
        assert 'bic' in res._cache


class TestPickleFormula(RemoveDataPickle):
    @classmethod
    def setup_class(cls):
        super(TestPickleFormula, cls).setup_class()
        nobs = 10000
        np.random.seed(987689)
        x = np.random.randn(nobs, 3)
        cls.exog = pd.DataFrame(x, columns=["A", "B", "C"])
        cls.xf = pd.DataFrame(0.25 * np.ones((2, 3)),
                              columns=cls.exog.columns)

    def setup(self):
        x = self.exog
        np.random.seed(123)
        y = x.sum(1) + np.random.randn(x.shape[0])
        y = pd.Series(y, name="Y")
        X = self.exog.copy()
        X["Y"] = y
        self.results = sm.OLS.from_formula("Y ~ A + B + C", data=X).fit()


class TestPickleFormula2(RemoveDataPickle):
    @classmethod
    def setup_class(cls):
        super(TestPickleFormula2, cls).setup_class()
        nobs = 500
        np.random.seed(987689)
        data = np.random.randn(nobs, 4)
        data[:, 0] = data[:, 1:].sum(1)
        cls.data = pd.DataFrame(data, columns=["Y", "A", "B", "C"])
        cls.xf = pd.DataFrame(0.25 * np.ones((2, 3)),
                              columns=cls.data.columns[1:])

    def setup(self):
        self.results = sm.OLS.from_formula("Y ~ A + B + C",
                                           data=self.data).fit()


class TestPickleFormula3(TestPickleFormula2):

    def setup(self):
        self.results = sm.OLS.from_formula("Y ~ A + B * C",
                                           data=self.data).fit()


class TestPickleFormula4(TestPickleFormula2):

    def setup(self):
        self.results = sm.OLS.from_formula("Y ~ np.log(abs(A) + 1) + B * C",
                                           data=self.data).fit()


# we need log in module namespace for TestPickleFormula5


class TestPickleFormula5(TestPickleFormula2):

    def setup(self):
        self.results = sm.OLS.from_formula("Y ~ log(abs(A) + 1) + B * C",
                                           data=self.data).fit()


class TestRemoveDataPicklePoissonRegularized(RemoveDataPickle):

    def setup(self):
        # fit for each test, because results will be changed by test
        x = self.exog
        np.random.seed(987689)
        y_count = np.random.poisson(np.exp(x.sum(1) - x.mean()))
        model = sm.Poisson(y_count, x)
        self.results = model.fit_regularized(method='l1', disp=0, alpha=10)