__all__ = ["ZeroInflatedPoisson", "ZeroInflatedGeneralizedPoisson",
"ZeroInflatedNegativeBinomialP"]
import warnings
import numpy as np
import statsmodels.base.model as base
import statsmodels.base.wrapper as wrap
import statsmodels.regression.linear_model as lm
from statsmodels.discrete.discrete_model import (DiscreteModel, CountModel,
Poisson, Logit, CountResults,
L1CountResults, Probit,
_discrete_results_docs,
_validate_l1_method,
GeneralizedPoisson,
NegativeBinomialP)
from statsmodels.distributions import zipoisson, zigenpoisson, zinegbin
from statsmodels.tools.numdiff import approx_fprime, approx_hess
from statsmodels.tools.decorators import cache_readonly
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from statsmodels.compat.pandas import Appender
_doc_zi_params = """
exog_infl : array_like or None
Explanatory variables for the binary inflation model, i.e. for
mixing probability model. If None, then a constant is used.
offset : array_like
Offset is added to the linear prediction with coefficient equal to 1.
exposure : array_like
Log(exposure) is added to the linear prediction with coefficient
equal to 1.
inflation : {'logit', 'probit'}
The model for the zero inflation, either Logit (default) or Probit
"""
class GenericZeroInflated(CountModel):
__doc__ = """
Generic Zero Inflated Model
%(params)s
%(extra_params)s
Attributes
----------
endog : ndarray
A reference to the endogenous response variable
exog : ndarray
A reference to the exogenous design.
exog_infl: ndarray
A reference to the zero-inflated exogenous design.
""" % {'params' : base._model_params_doc,
'extra_params' : _doc_zi_params + base._missing_param_doc}
def __init__(self, endog, exog, exog_infl=None, offset=None,
inflation='logit', exposure=None, missing='none', **kwargs):
super(GenericZeroInflated, self).__init__(endog, exog, offset=offset,
exposure=exposure,
missing=missing, **kwargs)
if exog_infl is None:
self.k_inflate = 1
self.exog_infl = np.ones((endog.size, self.k_inflate),
dtype=np.float64)
else:
self.exog_infl = exog_infl
self.k_inflate = exog_infl.shape[1]
if len(exog.shape) == 1:
self.k_exog = 1
else:
self.k_exog = exog.shape[1]
self.infl = inflation
if inflation == 'logit':
self.model_infl = Logit(np.zeros(self.exog_infl.shape[0]),
self.exog_infl)
self._hessian_inflate = self._hessian_logit
elif inflation == 'probit':
self.model_infl = Probit(np.zeros(self.exog_infl.shape[0]),
self.exog_infl)
self._hessian_inflate = self._hessian_probit
else:
raise ValueError("inflation == %s, which is not handled"
% inflation)
self.inflation = inflation
self.k_extra = self.k_inflate
if len(self.exog) != len(self.exog_infl):
raise ValueError('exog and exog_infl have different number of'
'observation. `missing` handling is not supported')
infl_names = ['inflate_%s' % i for i in self.model_infl.data.param_names]
self.exog_names[:] = infl_names + list(self.exog_names)
self.exog_infl = np.asarray(self.exog_infl, dtype=np.float64)
self._init_keys.extend(['exog_infl', 'inflation'])
self._null_drop_keys = ['exog_infl']
def loglike(self, params):
"""
Loglikelihood of Generic Zero Inflated model.
Parameters
----------
params : array_like
The parameters of the model.
Returns
-------
loglike : float
The log-likelihood function of the model evaluated at `params`.
See notes.
Notes
--------
.. math:: \\ln L=\\sum_{y_{i}=0}\\ln(w_{i}+(1-w_{i})*P_{main\\_model})+
\\sum_{y_{i}>0}(\\ln(1-w_{i})+L_{main\\_model})
where P - pdf of main model, L - loglike function of main model.
"""
return np.sum(self.loglikeobs(params))
def loglikeobs(self, params):
"""
Loglikelihood for observations of Generic Zero Inflated model.
Parameters
----------
params : array_like
The parameters of the model.
Returns
-------
loglike : ndarray
The log likelihood for each observation of the model evaluated
at `params`. See Notes for definition.
Notes
--------
.. math:: \\ln L=\\ln(w_{i}+(1-w_{i})*P_{main\\_model})+
\\ln(1-w_{i})+L_{main\\_model}
where P - pdf of main model, L - loglike function of main model.
for observations :math:`i=1,...,n`
"""
params_infl = params[:self.k_inflate]
params_main = params[self.k_inflate:]
y = self.endog
w = self.model_infl.predict(params_infl)
w = np.clip(w, np.finfo(float).eps, 1 - np.finfo(float).eps)
llf_main = self.model_main.loglikeobs(params_main)
zero_idx = np.nonzero(y == 0)[0]
nonzero_idx = np.nonzero(y)[0]
llf = np.zeros_like(y, dtype=np.float64)
llf[zero_idx] = (np.log(w[zero_idx] +
(1 - w[zero_idx]) * np.exp(llf_main[zero_idx])))
llf[nonzero_idx] = np.log(1 - w[nonzero_idx]) + llf_main[nonzero_idx]
return llf
@Appender(DiscreteModel.fit.__doc__)
def fit(self, start_params=None, method='bfgs', maxiter=35,
full_output=1, disp=1, callback=None,
cov_type='nonrobust', cov_kwds=None, use_t=None, **kwargs):
if start_params is None:
offset = getattr(self, "offset", 0) + getattr(self, "exposure", 0)
if np.size(offset) == 1 and offset == 0:
offset = None
start_params = self._get_start_params()
if callback is None:
# work around perfect separation callback #3895
callback = lambda *x: x
mlefit = super(GenericZeroInflated, self).fit(start_params=start_params,
maxiter=maxiter, disp=disp, method=method,
full_output=full_output, callback=callback,
**kwargs)
zipfit = self.result_class(self, mlefit._results)
result = self.result_class_wrapper(zipfit)
if cov_kwds is None:
cov_kwds = {}
result._get_robustcov_results(cov_type=cov_type,
use_self=True, use_t=use_t, **cov_kwds)
return result
@Appender(DiscreteModel.fit_regularized.__doc__)
def fit_regularized(self, start_params=None, method='l1',
maxiter='defined_by_method', full_output=1, disp=1, callback=None,
alpha=0, trim_mode='auto', auto_trim_tol=0.01, size_trim_tol=1e-4,
qc_tol=0.03, **kwargs):
_validate_l1_method(method)
if np.size(alpha) == 1 and alpha != 0:
k_params = self.k_exog + self.k_inflate
alpha = alpha * np.ones(k_params)
extra = self.k_extra - self.k_inflate
alpha_p = alpha[:-(self.k_extra - extra)] if (self.k_extra
and np.size(alpha) > 1) else alpha
if start_params is None:
offset = getattr(self, "offset", 0) + getattr(self, "exposure", 0)
if np.size(offset) == 1 and offset == 0:
offset = None
start_params = self.model_main.fit_regularized(
start_params=start_params, method=method, maxiter=maxiter,
full_output=full_output, disp=0, callback=callback,
alpha=alpha_p, trim_mode=trim_mode, auto_trim_tol=auto_trim_tol,
size_trim_tol=size_trim_tol, qc_tol=qc_tol, **kwargs).params
start_params = np.append(np.ones(self.k_inflate), start_params)
cntfit = super(CountModel, self).fit_regularized(
start_params=start_params, method=method, maxiter=maxiter,
full_output=full_output, disp=disp, callback=callback,
alpha=alpha, trim_mode=trim_mode, auto_trim_tol=auto_trim_tol,
size_trim_tol=size_trim_tol, qc_tol=qc_tol, **kwargs)
discretefit = self.result_class_reg(self, cntfit)
return self.result_class_reg_wrapper(discretefit)
def score_obs(self, params):
"""
Generic Zero Inflated model score (gradient) vector of the log-likelihood
Parameters
----------
params : array_like
The parameters of the model
Returns
-------
score : ndarray, 1-D
The score vector of the model, i.e. the first derivative of the
loglikelihood function, evaluated at `params`
"""
params_infl = params[:self.k_inflate]
params_main = params[self.k_inflate:]
y = self.endog
w = self.model_infl.predict(params_infl)
w = np.clip(w, np.finfo(float).eps, 1 - np.finfo(float).eps)
score_main = self.model_main.score_obs(params_main)
llf_main = self.model_main.loglikeobs(params_main)
llf = self.loglikeobs(params)
zero_idx = np.nonzero(y == 0)[0]
nonzero_idx = np.nonzero(y)[0]
mu = self.model_main.predict(params_main)
dldp = np.zeros((self.exog.shape[0], self.k_exog), dtype=np.float64)
dldw = np.zeros_like(self.exog_infl, dtype=np.float64)
dldp[zero_idx,:] = (score_main[zero_idx].T *
(1 - (w[zero_idx]) / np.exp(llf[zero_idx]))).T
dldp[nonzero_idx,:] = score_main[nonzero_idx]
if self.inflation == 'logit':
dldw[zero_idx,:] = (self.exog_infl[zero_idx].T * w[zero_idx] *
(1 - w[zero_idx]) *
(1 - np.exp(llf_main[zero_idx])) /
np.exp(llf[zero_idx])).T
dldw[nonzero_idx,:] = -(self.exog_infl[nonzero_idx].T *
w[nonzero_idx]).T
elif self.inflation == 'probit':
return approx_fprime(params, self.loglikeobs)
return np.hstack((dldw, dldp))
def score(self, params):
return self.score_obs(params).sum(0)
def _hessian_main(self, params):
pass
def _hessian_logit(self, params):
params_infl = params[:self.k_inflate]
params_main = params[self.k_inflate:]
y = self.endog
w = self.model_infl.predict(params_infl)
w = np.clip(w, np.finfo(float).eps, 1 - np.finfo(float).eps)
score_main = self.model_main.score_obs(params_main)
llf_main = self.model_main.loglikeobs(params_main)
llf = self.loglikeobs(params)
zero_idx = np.nonzero(y == 0)[0]
nonzero_idx = np.nonzero(y)[0]
hess_arr = np.zeros((self.k_inflate, self.k_exog + self.k_inflate))
pmf = np.exp(llf)
#d2l/dw2
for i in range(self.k_inflate):
for j in range(i, -1, -1):
hess_arr[i, j] = ((
self.exog_infl[zero_idx, i] * self.exog_infl[zero_idx, j] *
(w[zero_idx] * (1 - w[zero_idx]) * ((1 -
np.exp(llf_main[zero_idx])) * (1 - 2 * w[zero_idx]) *
np.exp(llf[zero_idx]) - (w[zero_idx] - w[zero_idx]**2) *
(1 - np.exp(llf_main[zero_idx]))**2) /
pmf[zero_idx]**2)).sum() -
(self.exog_infl[nonzero_idx, i] * self.exog_infl[nonzero_idx, j] *
w[nonzero_idx] * (1 - w[nonzero_idx])).sum())
#d2l/dpdw
for i in range(self.k_inflate):
for j in range(self.k_exog):
hess_arr[i, j + self.k_inflate] = -(score_main[zero_idx, j] *
w[zero_idx] * (1 - w[zero_idx]) *
self.exog_infl[zero_idx, i] / pmf[zero_idx]).sum()
return hess_arr
def _hessian_probit(self, params):
pass
def hessian(self, params):
"""
Generic Zero Inflated model Hessian matrix of the loglikelihood
Parameters
----------
params : array_like
The parameters of the model
Returns
-------
hess : ndarray, (k_vars, k_vars)
The Hessian, second derivative of loglikelihood function,
evaluated at `params`
Notes
-----
"""
hess_arr_main = self._hessian_main(params)
hess_arr_infl = self._hessian_inflate(params)
Loading ...