Accelerated Failure Time (AFT) Model with empirical likelihood inference.
AFT regression analysis is applicable when the researcher has access
to a randomly right censored dependent variable, a matrix of exogenous
variables and an indicatior variable (delta) that takes a value of 0 if the
observation is censored and 1 otherwise.
AFT References
Stute, W. (1993). "Consistent Estimation Under Random Censorship when
Covariables are Present." Journal of Multivariate Analysis.
Vol. 45. Iss. 1. 89-103
EL and AFT References
Zhou, Kim And Bathke. "Empirical Likelihood Analysis for the Heteroskedastic
Accelerated Failure Time Model." Manuscript:
URL: www.ms.uky.edu/~mai/research/CasewiseEL20080724.pdf
Zhou, M. (2005). Empirical Likelihood Ratio with Arbitrarily Censored/
Truncated Data by EM Algorithm. Journal of Computational and Graphical
Statistics. 14:3, 643-656.
import numpy as np
from statsmodels.regression.linear_model import OLS, WLS
from statsmodels.tools import add_constant
#from elregress import ElReg
from scipy import optimize
from scipy.stats import chi2
from .descriptive import _OptFuncts
import warnings
from statsmodels.tools.sm_exceptions import IterationLimitWarning
class OptAFT(_OptFuncts):
Provides optimization functions used in estimating and conducting
inference in an AFT model.
Function optimized over nuisance parameters to compute
the profile likelihood
Uses the modified Em algorithm of Zhou 2005 to maximize the
likelihood of a parameter vector.
def __init__(self):
def _opt_wtd_nuis_regress(self, test_vals):
A function that is optimized over nuisance parameters to conduct a
hypothesis test for the parameters of interest
params: 1d array
The regression coefficients of the model. This includes the
nuisance and parameters of interests.
llr: float
-2 times the log likelihood of the nuisance parameters and the
hypothesized value of the parameter(s) of interest.
test_params = test_vals.reshape(self.model.nvar, 1)
est_vect = self.model.uncens_exog * (self.model.uncens_endog -
eta_star = self._modif_newton(np.zeros(self.model.nvar), est_vect,
denom = np.sum(self.model._fit_weights) + np.dot(eta_star, est_vect.T)
self.new_weights = self.model._fit_weights / denom
return -1 * np.sum(np.log(self.new_weights))
def _EM_test(self, nuisance_params, params=None, param_nums=None,
b0_vals=None, F=None, survidx=None, uncens_nobs=None,
numcensbelow=None, km=None, uncensored=None, censored=None,
maxiter=None, ftol=None):
Uses EM algorithm to compute the maximum likelihood of a test
Nuisance Params: ndarray
Vector of values to be used as nuisance params.
maxiter: int
Number of iterations in the EM algorithm for a parameter vector
-2 ''*'' log likelihood ratio at hypothesized values and
nuisance params
Optional parameters are provided by the test_beta function.
iters = 0
params[param_nums] = b0_vals
nuis_param_index = np.int_(np.delete(np.arange(self.model.nvar),
params[nuis_param_index] = nuisance_params
to_test = params.reshape(self.model.nvar, 1)
opt_res = np.inf
diff = np.inf
while iters < maxiter and diff > ftol:
F = F.flatten()
death = np.cumsum(F[::-1])
survivalprob = death[::-1]
surv_point_mat = np.dot(F.reshape(-1, 1),
1. / survivalprob[survidx].reshape(1, - 1))
surv_point_mat = add_constant(surv_point_mat)
summed_wts = np.cumsum(surv_point_mat, axis=1)
wts = summed_wts[np.int_(np.arange(uncens_nobs)),
# ^E step
# See Zhou 2005, section 3.
self.model._fit_weights = wts
new_opt_res = self._opt_wtd_nuis_regress(to_test)
# ^ Uncensored weights' contribution to likelihood value.
F = self.new_weights
# ^ M step
diff = np.abs(new_opt_res - opt_res)
opt_res = new_opt_res
iters = iters + 1
death = np.cumsum(F.flatten()[::-1])
survivalprob = death[::-1]
llike = -opt_res + np.sum(np.log(survivalprob[survidx]))
wtd_km = km.flatten() / np.sum(km)
survivalmax = np.cumsum(wtd_km[::-1])[::-1]
llikemax = np.sum(np.log(wtd_km[uncensored])) + \
if iters == maxiter:
warnings.warn('The EM reached the maximum number of iterations',
return -2 * (llike - llikemax)
def _ci_limits_beta(self, b0, param_num=None):
Returns the difference between the log likelihood for a
parameter and some critical value.
b0: float
Value of a regression parameter
param_num: int
Parameter index of b0
return self.test_beta([b0], [param_num])[0] - self.r0
class emplikeAFT(object):
Class for estimating and conducting inference in an AFT model.
endog: nx1 array
Response variables that are subject to random censoring
exog: nxk array
Matrix of covariates
censors: nx1 array
array with entries 0 or 1. 0 indicates a response was
nobs: float
Number of observations
endog: ndarray
Endog attay
exog: ndarray
Exogenous variable matrix
Censors array but sets the max(endog) to uncensored
nvar: float
Number of exogenous variables
uncens_nobs: float
Number of uncensored observations
uncens_endog: ndarray
Uncensored response variables
uncens_exog: ndarray
Exogenous variables of the uncensored observations
Fits model parameters
Tests if beta = b0 for any vector b0.
The data is immediately sorted in order of increasing endogenous
The last observation is assumed to be uncensored which makes
estimation and inference possible.
def __init__(self, endog, exog, censors):
self.nobs = np.shape(exog)[0]
self.endog = endog.reshape(self.nobs, 1)
self.exog = exog.reshape(self.nobs, -1)
self.censors = censors.reshape(self.nobs, 1)
self.nvar = self.exog.shape[1]
idx = np.lexsort((-self.censors[:, 0], self.endog[:, 0]))
self.endog = self.endog[idx]
self.exog = self.exog[idx]
self.censors = self.censors[idx]
self.censors[-1] = 1 # Sort in init, not in function
self.uncens_nobs = int(np.sum(self.censors))
mask = self.censors.ravel().astype(bool)
self.uncens_endog = self.endog[mask, :].reshape(-1, 1)
self.uncens_exog = self.exog[mask, :]
def _is_tied(self, endog, censors):
Indicated if an observation takes the same value as the next
ordered observation.
endog: ndarray
Models endogenous variable
censors: ndarray
arrat indicating a censored array
indic_ties: ndarray
ties[i]=1 if endog[i]==endog[i+1] and
nobs = int(self.nobs)
endog_idx = endog[np.arange(nobs - 1)] == (
endog[np.arange(nobs - 1) + 1])
censors_idx = censors[np.arange(nobs - 1)] == (
censors[np.arange(nobs - 1) + 1])
indic_ties = endog_idx * censors_idx # Both true
return np.int_(indic_ties)
def _km_w_ties(self, tie_indic, untied_km):
Computes KM estimator value at each observation, taking into acocunt
ties in the data.
tie_indic: 1d array
Indicates if the i'th observation is the same as the ith +1
untied_km: 1d array
Km estimates at each observation assuming no ties.
# TODO: Vectorize, even though it is only 1 pass through for any
# function call
num_same = 1
idx_nums = []
for obs_num in np.arange(int(self.nobs - 1))[::-1]:
if tie_indic[obs_num] == 1:
num_same = num_same + 1
untied_km[obs_num] = untied_km[obs_num + 1]
elif tie_indic[obs_num] == 0 and num_same > 1:
idx_nums.append(max(idx_nums) + 1)
idx_nums = np.asarray(idx_nums)
untied_km[idx_nums] = untied_km[idx_nums]
num_same = 1
idx_nums = []
return untied_km.reshape(self.nobs, 1)
def _make_km(self, endog, censors):
Computes the Kaplan-Meier estimate for the weights in the AFT model
endog: nx1 array
Array of response variables
censors: nx1 array
Censor-indicating variable
Kaplan Meier estimate for each observation
This function makes calls to _is_tied and km_w_ties to handle ties in
the data.If a censored observation and an uncensored observation has
the same value, it is assumed that the uncensored happened first.
nobs = self.nobs
num = (nobs - (np.arange(nobs) + 1.))
denom = ((nobs - (np.arange(nobs) + 1.) + 1.))
km = (num / denom).reshape(nobs, 1)
km = km ** np.abs(censors - 1.)
km = np.cumprod(km) # If no ties, this is kaplan-meier
tied = self._is_tied(endog, censors)
wtd_km = self._km_w_ties(tied, km)
return (censors / wtd_km).reshape(nobs, 1)
def fit(self):
Fits an AFT model and returns results instance
Loading ...