"""
Accelerated Failure Time (AFT) Model with empirical likelihood inference.
AFT regression analysis is applicable when the researcher has access
to a randomly right censored dependent variable, a matrix of exogenous
variables and an indicatior variable (delta) that takes a value of 0 if the
observation is censored and 1 otherwise.
AFT References
--------------
Stute, W. (1993). "Consistent Estimation Under Random Censorship when
Covariables are Present." Journal of Multivariate Analysis.
Vol. 45. Iss. 1. 89-103
EL and AFT References
---------------------
Zhou, Kim And Bathke. "Empirical Likelihood Analysis for the Heteroskedastic
Accelerated Failure Time Model." Manuscript:
URL: www.ms.uky.edu/~mai/research/CasewiseEL20080724.pdf
Zhou, M. (2005). Empirical Likelihood Ratio with Arbitrarily Censored/
Truncated Data by EM Algorithm. Journal of Computational and Graphical
Statistics. 14:3, 643-656.
"""
import numpy as np
from statsmodels.regression.linear_model import OLS, WLS
from statsmodels.tools import add_constant
#from elregress import ElReg
from scipy import optimize
from scipy.stats import chi2
from .descriptive import _OptFuncts
import warnings
from statsmodels.tools.sm_exceptions import IterationLimitWarning
class OptAFT(_OptFuncts):
"""
Provides optimization functions used in estimating and conducting
inference in an AFT model.
Methods
------
_opt_wtd_nuis_regress:
Function optimized over nuisance parameters to compute
the profile likelihood
_EM_test:
Uses the modified Em algorithm of Zhou 2005 to maximize the
likelihood of a parameter vector.
"""
def __init__(self):
pass
def _opt_wtd_nuis_regress(self, test_vals):
"""
A function that is optimized over nuisance parameters to conduct a
hypothesis test for the parameters of interest
Parameters
----------
params: 1d array
The regression coefficients of the model. This includes the
nuisance and parameters of interests.
Returns
-------
llr: float
-2 times the log likelihood of the nuisance parameters and the
hypothesized value of the parameter(s) of interest.
"""
test_params = test_vals.reshape(self.model.nvar, 1)
est_vect = self.model.uncens_exog * (self.model.uncens_endog -
np.dot(self.model.uncens_exog,
test_params))
eta_star = self._modif_newton(np.zeros(self.model.nvar), est_vect,
self.model._fit_weights)
denom = np.sum(self.model._fit_weights) + np.dot(eta_star, est_vect.T)
self.new_weights = self.model._fit_weights / denom
return -1 * np.sum(np.log(self.new_weights))
def _EM_test(self, nuisance_params, params=None, param_nums=None,
b0_vals=None, F=None, survidx=None, uncens_nobs=None,
numcensbelow=None, km=None, uncensored=None, censored=None,
maxiter=None, ftol=None):
"""
Uses EM algorithm to compute the maximum likelihood of a test
Parameters
----------
Nuisance Params: ndarray
Vector of values to be used as nuisance params.
maxiter: int
Number of iterations in the EM algorithm for a parameter vector
Returns
-------
-2 ''*'' log likelihood ratio at hypothesized values and
nuisance params
Notes
-----
Optional parameters are provided by the test_beta function.
"""
iters = 0
params[param_nums] = b0_vals
nuis_param_index = np.int_(np.delete(np.arange(self.model.nvar),
param_nums))
params[nuis_param_index] = nuisance_params
to_test = params.reshape(self.model.nvar, 1)
opt_res = np.inf
diff = np.inf
while iters < maxiter and diff > ftol:
F = F.flatten()
death = np.cumsum(F[::-1])
survivalprob = death[::-1]
surv_point_mat = np.dot(F.reshape(-1, 1),
1. / survivalprob[survidx].reshape(1, - 1))
surv_point_mat = add_constant(surv_point_mat)
summed_wts = np.cumsum(surv_point_mat, axis=1)
wts = summed_wts[np.int_(np.arange(uncens_nobs)),
numcensbelow[uncensored]]
# ^E step
# See Zhou 2005, section 3.
self.model._fit_weights = wts
new_opt_res = self._opt_wtd_nuis_regress(to_test)
# ^ Uncensored weights' contribution to likelihood value.
F = self.new_weights
# ^ M step
diff = np.abs(new_opt_res - opt_res)
opt_res = new_opt_res
iters = iters + 1
death = np.cumsum(F.flatten()[::-1])
survivalprob = death[::-1]
llike = -opt_res + np.sum(np.log(survivalprob[survidx]))
wtd_km = km.flatten() / np.sum(km)
survivalmax = np.cumsum(wtd_km[::-1])[::-1]
llikemax = np.sum(np.log(wtd_km[uncensored])) + \
np.sum(np.log(survivalmax[censored]))
if iters == maxiter:
warnings.warn('The EM reached the maximum number of iterations',
IterationLimitWarning)
return -2 * (llike - llikemax)
def _ci_limits_beta(self, b0, param_num=None):
"""
Returns the difference between the log likelihood for a
parameter and some critical value.
Parameters
----------
b0: float
Value of a regression parameter
param_num: int
Parameter index of b0
"""
return self.test_beta([b0], [param_num])[0] - self.r0
class emplikeAFT(object):
"""
Class for estimating and conducting inference in an AFT model.
Parameters
----------
endog: nx1 array
Response variables that are subject to random censoring
exog: nxk array
Matrix of covariates
censors: nx1 array
array with entries 0 or 1. 0 indicates a response was
censored.
Attributes
----------
nobs: float
Number of observations
endog: ndarray
Endog attay
exog: ndarray
Exogenous variable matrix
censors
Censors array but sets the max(endog) to uncensored
nvar: float
Number of exogenous variables
uncens_nobs: float
Number of uncensored observations
uncens_endog: ndarray
Uncensored response variables
uncens_exog: ndarray
Exogenous variables of the uncensored observations
Methods
-------
params:
Fits model parameters
test_beta:
Tests if beta = b0 for any vector b0.
Notes
-----
The data is immediately sorted in order of increasing endogenous
variables
The last observation is assumed to be uncensored which makes
estimation and inference possible.
"""
def __init__(self, endog, exog, censors):
self.nobs = np.shape(exog)[0]
self.endog = endog.reshape(self.nobs, 1)
self.exog = exog.reshape(self.nobs, -1)
self.censors = censors.reshape(self.nobs, 1)
self.nvar = self.exog.shape[1]
idx = np.lexsort((-self.censors[:, 0], self.endog[:, 0]))
self.endog = self.endog[idx]
self.exog = self.exog[idx]
self.censors = self.censors[idx]
self.censors[-1] = 1 # Sort in init, not in function
self.uncens_nobs = int(np.sum(self.censors))
mask = self.censors.ravel().astype(bool)
self.uncens_endog = self.endog[mask, :].reshape(-1, 1)
self.uncens_exog = self.exog[mask, :]
def _is_tied(self, endog, censors):
"""
Indicated if an observation takes the same value as the next
ordered observation.
Parameters
----------
endog: ndarray
Models endogenous variable
censors: ndarray
arrat indicating a censored array
Returns
-------
indic_ties: ndarray
ties[i]=1 if endog[i]==endog[i+1] and
censors[i]=censors[i+1]
"""
nobs = int(self.nobs)
endog_idx = endog[np.arange(nobs - 1)] == (
endog[np.arange(nobs - 1) + 1])
censors_idx = censors[np.arange(nobs - 1)] == (
censors[np.arange(nobs - 1) + 1])
indic_ties = endog_idx * censors_idx # Both true
return np.int_(indic_ties)
def _km_w_ties(self, tie_indic, untied_km):
"""
Computes KM estimator value at each observation, taking into acocunt
ties in the data.
Parameters
----------
tie_indic: 1d array
Indicates if the i'th observation is the same as the ith +1
untied_km: 1d array
Km estimates at each observation assuming no ties.
"""
# TODO: Vectorize, even though it is only 1 pass through for any
# function call
num_same = 1
idx_nums = []
for obs_num in np.arange(int(self.nobs - 1))[::-1]:
if tie_indic[obs_num] == 1:
idx_nums.append(obs_num)
num_same = num_same + 1
untied_km[obs_num] = untied_km[obs_num + 1]
elif tie_indic[obs_num] == 0 and num_same > 1:
idx_nums.append(max(idx_nums) + 1)
idx_nums = np.asarray(idx_nums)
untied_km[idx_nums] = untied_km[idx_nums]
num_same = 1
idx_nums = []
return untied_km.reshape(self.nobs, 1)
def _make_km(self, endog, censors):
"""
Computes the Kaplan-Meier estimate for the weights in the AFT model
Parameters
----------
endog: nx1 array
Array of response variables
censors: nx1 array
Censor-indicating variable
Returns
-------
Kaplan Meier estimate for each observation
Notes
-----
This function makes calls to _is_tied and km_w_ties to handle ties in
the data.If a censored observation and an uncensored observation has
the same value, it is assumed that the uncensored happened first.
"""
nobs = self.nobs
num = (nobs - (np.arange(nobs) + 1.))
denom = ((nobs - (np.arange(nobs) + 1.) + 1.))
km = (num / denom).reshape(nobs, 1)
km = km ** np.abs(censors - 1.)
km = np.cumprod(km) # If no ties, this is kaplan-meier
tied = self._is_tied(endog, censors)
wtd_km = self._km_w_ties(tied, km)
return (censors / wtd_km).reshape(nobs, 1)
def fit(self):
"""
Fits an AFT model and returns results instance
Parameters
----------
None
Loading ...