# -*- coding: utf-8 -*-
"""
Created on Fri Mar 09 16:00:27 2012
Author: Josef Perktold
"""
from statsmodels.compat.pandas import assert_series_equal
import pickle
from io import BytesIO
import numpy as np
import pandas as pd
import pytest
from numpy.testing import assert_
# we need log in module namespace for TestPickleFormula5
from numpy import log # noqa:F401
import statsmodels.api as sm
import statsmodels.genmod.generalized_linear_model as glm
from statsmodels.compat.python import iterkeys
def check_pickle(obj):
fh = BytesIO()
pickle.dump(obj, fh, protocol=pickle.HIGHEST_PROTOCOL)
plen = fh.tell()
fh.seek(0, 0)
res = pickle.load(fh)
fh.close()
return res, plen
class RemoveDataPickle(object):
@classmethod
def setup_class(cls):
nobs = 1000
np.random.seed(987689)
x = np.random.randn(nobs, 3)
x = sm.add_constant(x)
cls.exog = x
cls.xf = 0.25 * np.ones((2, 4))
cls.predict_kwds = {}
def test_remove_data_pickle(self):
results = self.results
xf = self.xf
pred_kwds = self.predict_kwds
pred1 = results.predict(xf, **pred_kwds)
# create some cached attributes
results.summary()
res = results.summary2() # SMOKE test also summary2
# uncomment the following to check whether tests run (7 failures now)
# np.testing.assert_equal(res, 1)
# check pickle unpickle works on full results
# TODO: drop of load save is tested
res, orig_nbytes = check_pickle(results._results)
# remove data arrays, check predict still works
if isinstance(results, glm.GLMResultsWrapper):
with pytest.warns(FutureWarning, match="Anscombe residuals"):
results.remove_data()
else:
results.remove_data()
pred2 = results.predict(xf, **pred_kwds)
if isinstance(pred1, pd.Series) and isinstance(pred2, pd.Series):
assert_series_equal(pred1, pred2)
elif isinstance(pred1, pd.DataFrame) and isinstance(pred2,
pd.DataFrame):
assert_(pred1.equals(pred2))
else:
np.testing.assert_equal(pred2, pred1)
# pickle and unpickle reduced array
res, nbytes = check_pickle(results._results)
# for testing attach res
self.res = res
assert_(nbytes < orig_nbytes,
msg='pickle length not %d < %d' % (nbytes, orig_nbytes))
pred3 = results.predict(xf, **pred_kwds)
if isinstance(pred1, pd.Series) and isinstance(pred3, pd.Series):
assert_series_equal(pred1, pred3)
elif isinstance(pred1, pd.DataFrame) and isinstance(pred3,
pd.DataFrame):
assert_(pred1.equals(pred3))
else:
np.testing.assert_equal(pred3, pred1)
def test_remove_data_docstring(self):
assert_(self.results.remove_data.__doc__ is not None)
def test_pickle_wrapper(self):
fh = BytesIO() # use pickle with binary content
# test unwrapped results load save pickle
self.results._results.save(fh)
fh.seek(0, 0)
res_unpickled = self.results._results.__class__.load(fh)
assert type(res_unpickled) is type(self.results._results) # noqa: E721
# test wrapped results load save
fh.seek(0, 0)
self.results.save(fh)
fh.seek(0, 0)
res_unpickled = self.results.__class__.load(fh)
fh.close()
assert type(res_unpickled) is type(self.results) # noqa: E721
before = sorted(iterkeys(self.results.__dict__))
after = sorted(iterkeys(res_unpickled.__dict__))
assert_(before == after, msg='not equal %r and %r' % (before, after))
before = sorted(iterkeys(self.results._results.__dict__))
after = sorted(iterkeys(res_unpickled._results.__dict__))
assert_(before == after, msg='not equal %r and %r' % (before, after))
before = sorted(iterkeys(self.results.model.__dict__))
after = sorted(iterkeys(res_unpickled.model.__dict__))
assert_(before == after, msg='not equal %r and %r' % (before, after))
before = sorted(iterkeys(self.results._cache))
after = sorted(iterkeys(res_unpickled._cache))
assert_(before == after, msg='not equal %r and %r' % (before, after))
class TestRemoveDataPickleOLS(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y = x.sum(1) + np.random.randn(x.shape[0])
self.results = sm.OLS(y, self.exog).fit()
class TestRemoveDataPickleWLS(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y = x.sum(1) + np.random.randn(x.shape[0])
self.results = sm.WLS(y, self.exog, weights=np.ones(len(y))).fit()
class TestRemoveDataPicklePoisson(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y_count = np.random.poisson(np.exp(x.sum(1) - x.mean()))
# bug with default
model = sm.Poisson(y_count, x)
# use start_params to converge faster
start_params = np.array(
[0.75334818, 0.99425553, 1.00494724, 1.00247112])
self.results = model.fit(start_params=start_params, method='bfgs',
disp=0)
# TODO: temporary, fixed in master
self.predict_kwds = dict(exposure=1, offset=0)
class TestRemoveDataPickleNegativeBinomial(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
np.random.seed(987689)
data = sm.datasets.randhie.load(as_pandas=False)
mod = sm.NegativeBinomial(data.endog, data.exog)
self.results = mod.fit(disp=0)
class TestRemoveDataPickleLogit(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
nobs = x.shape[0]
np.random.seed(987689)
y_bin = (np.random.rand(nobs) < 1.0 / (
1 + np.exp(x.sum(1) - x.mean()))).astype(int)
# bug with default
model = sm.Logit(y_bin, x)
# use start_params to converge faster
start_params = np.array(
[-0.73403806, -1.00901514, -0.97754543, -0.95648212])
self.results = model.fit(start_params=start_params, method='bfgs',
disp=0)
class TestRemoveDataPickleRLM(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y = x.sum(1) + np.random.randn(x.shape[0])
self.results = sm.RLM(y, self.exog).fit()
class TestRemoveDataPickleGLM(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y = x.sum(1) + np.random.randn(x.shape[0])
self.results = sm.GLM(y, self.exog).fit()
def test_cached_data_removed(self):
res = self.results
# fill data-like members of the cache
names = ['resid_response', 'resid_deviance',
'resid_pearson', 'resid_anscombe']
with pytest.warns(FutureWarning, match="Anscombe residuals"):
for name in names:
getattr(res, name)
# check that the attributes are present before calling remove_data
for name in names:
assert name in res._cache
assert res._cache[name] is not None
res.remove_data()
for name in names:
assert res._cache[name] is None
def test_cached_values_evaluated(self):
# check that value-like attributes are evaluated before data
# is removed
res = self.results
assert res._cache == {}
with pytest.warns(FutureWarning, match="Anscombe residuals"):
res.remove_data()
assert 'bic' in res._cache
class TestPickleFormula(RemoveDataPickle):
@classmethod
def setup_class(cls):
super(TestPickleFormula, cls).setup_class()
nobs = 10000
np.random.seed(987689)
x = np.random.randn(nobs, 3)
cls.exog = pd.DataFrame(x, columns=["A", "B", "C"])
cls.xf = pd.DataFrame(0.25 * np.ones((2, 3)),
columns=cls.exog.columns)
def setup(self):
x = self.exog
np.random.seed(123)
y = x.sum(1) + np.random.randn(x.shape[0])
y = pd.Series(y, name="Y")
X = self.exog.copy()
X["Y"] = y
self.results = sm.OLS.from_formula("Y ~ A + B + C", data=X).fit()
class TestPickleFormula2(RemoveDataPickle):
@classmethod
def setup_class(cls):
super(TestPickleFormula2, cls).setup_class()
nobs = 500
np.random.seed(987689)
data = np.random.randn(nobs, 4)
data[:, 0] = data[:, 1:].sum(1)
cls.data = pd.DataFrame(data, columns=["Y", "A", "B", "C"])
cls.xf = pd.DataFrame(0.25 * np.ones((2, 3)),
columns=cls.data.columns[1:])
def setup(self):
self.results = sm.OLS.from_formula("Y ~ A + B + C",
data=self.data).fit()
class TestPickleFormula3(TestPickleFormula2):
def setup(self):
self.results = sm.OLS.from_formula("Y ~ A + B * C",
data=self.data).fit()
class TestPickleFormula4(TestPickleFormula2):
def setup(self):
self.results = sm.OLS.from_formula("Y ~ np.log(abs(A) + 1) + B * C",
data=self.data).fit()
# we need log in module namespace for TestPickleFormula5
class TestPickleFormula5(TestPickleFormula2):
def setup(self):
self.results = sm.OLS.from_formula("Y ~ log(abs(A) + 1) + B * C",
data=self.data).fit()
class TestRemoveDataPicklePoissonRegularized(RemoveDataPickle):
def setup(self):
# fit for each test, because results will be changed by test
x = self.exog
np.random.seed(987689)
y_count = np.random.poisson(np.exp(x.sum(1) - x.mean()))
model = sm.Poisson(y_count, x)
self.results = model.fit_regularized(method='l1', disp=0, alpha=10)