Repository URL to install this package:
Version:
0.11.1 ▾
|
"""
Assesment of Generalized Estimating Equations using simulation.
Only Gaussian models are currently checked.
See the generated file "gee_simulation_check.txt" for results.
"""
from statsmodels.compat.python import lrange
import scipy
import numpy as np
from itertools import product
from statsmodels.genmod.families import Gaussian
from statsmodels.genmod.generalized_estimating_equations import GEE
from statsmodels.genmod.cov_struct import Autoregressive, Nested
np.set_printoptions(formatter={'all': lambda x: "%8.3f" % x},
suppress=True)
OUT = open("gee_simulation_check.txt", "w")
class GEE_simulator(object):
#
# Parameters that must be defined
#
# Number of groups
ngroups = None
# Standard deviation of the pure errors
error_sd = None
# The regression coefficients
params = None
# The parameters defining the dependence structure
dparams = None
#
# Output parameters
#
# Matrix of exogeneous data (rows are cases, columns are
# variables)
exog = None
# Matrix of endogeneous data (len(endog) = exog.shape[0])
endog = None
# Matrix of time information (time.shape[0] = len(endog))
time = None
# Group labels (len(groups) = len(endog))
group = None
# Group sizes are random within this range
group_size_range = [4, 11]
# dparams_est is dparams with scale_inv appended
def print_dparams(self, dparams_est):
raise NotImplementedError
class AR_simulator(GEE_simulator):
# The distance function for determining AR correlations.
distfun = [lambda x, y: np.sqrt(np.sum((x-y)**2)),]
def print_dparams(self, dparams_est):
OUT.write("AR coefficient estimate: %8.4f\n" %
dparams_est[0])
OUT.write("AR coefficient truth: %8.4f\n" %
self.dparams[0])
OUT.write("Error variance estimate: %8.4f\n" %
dparams_est[1])
OUT.write("Error variance truth: %8.4f\n" %
self.error_sd**2)
OUT.write("\n")
def simulate(self):
endog, exog, group, time = [], [], [], []
for i in range(self.ngroups):
gsize = np.random.randint(self.group_size_range[0],
self.group_size_range[1])
group.append([i,] * gsize)
time1 = np.random.normal(size=(gsize,2))
time.append(time1)
exog1 = np.random.normal(size=(gsize, 5))
exog1[:,0] = 1
exog.append(exog1)
# Pairwise distances within the cluster
distances = scipy.spatial.distance.cdist(time1, time1,
self.distfun[0])
# Pairwise correlations within the cluster
correlations = self.dparams[0]**distances
correlations_sr = np.linalg.cholesky(correlations)
errors = np.dot(correlations_sr, np.random.normal(size=gsize))
endog1 = np.dot(exog1, self.params) + errors * self.error_sd
endog.append(endog1)
self.exog = np.concatenate(exog, axis=0)
self.endog = np.concatenate(endog)
self.time = np.concatenate(time, axis=0)
self.group = np.concatenate(group)
class Nested_simulator(GEE_simulator):
# Vector containing list of nest sizes (used instead of
# group_size_range).
nest_sizes = None
# Matrix of nest id's (an output parameter)
id_matrix = None
def print_dparams(self, dparams_est):
for j in range(len(self.nest_sizes)):
OUT.write("Nest %d variance estimate: %8.4f\n" % \
(j+1, dparams_est[j]))
OUT.write("Nest %d variance truth: %8.4f\n" % \
(j+1, self.dparams[j]))
OUT.write("Error variance estimate: %8.4f\n" % \
(dparams_est[-1] - sum(dparams_est[0:-1])))
OUT.write("Error variance truth: %8.4f\n" %
self.error_sd**2)
OUT.write("\n")
def simulate(self):
group_effect_var = self.dparams[0]
vcomp = self.dparams[1:]
vcomp.append(0)
endog, exog, group, id_matrix = [], [], [], []
for i in range(self.ngroups):
iterators = [lrange(n) for n in self.nest_sizes]
# The random effects
variances = [np.sqrt(v)*np.random.normal(size=n)
for v,n in zip(vcomp, self.nest_sizes)]
gpe = np.random.normal() * np.sqrt(group_effect_var)
nest_all = []
for j in self.nest_sizes:
nest_all.append(set())
for nest in product(*iterators):
group.append(i)
# The sum of all random effects that apply to this
# unit
ref = gpe + sum([v[j] for v,j in zip(variances, nest)])
exog1 = np.random.normal(size=5)
exog1[0] = 1
exog.append(exog1)
error = ref + self.error_sd * np.random.normal()
endog1 = np.dot(exog1, self.params) + error
endog.append(endog1)
for j in range(len(nest)):
nest_all[j].add(tuple(nest[0:j+1]))
nest1 = [len(x)-1 for x in nest_all]
id_matrix.append(nest1[0:-1])
self.exog = np.array(exog)
self.endog = np.array(endog)
self.group = np.array(group)
self.id_matrix = np.array(id_matrix)
self.time = np.zeros_like(self.endog)
def check_constraint(da, va, ga):
"""
Check the score testing of the parameter constraints.
"""
def gen_gendat_ar0(ar):
def gendat_ar0(msg = False):
ars = AR_simulator()
ars.ngroups = 200
ars.params = np.r_[0, -1, 1, 0, 0.5]
ars.error_sd = 2
ars.dparams = [ar,]
ars.simulate()
return ars, Autoregressive()
return gendat_ar0
def gen_gendat_ar1(ar):
def gendat_ar1():
ars = AR_simulator()
ars.ngroups = 200
ars.params = np.r_[0, -0.8, 1.2, 0, 0.5]
ars.error_sd = 2
ars.dparams = [ar,]
ars.simulate()
return ars, Autoregressive()
return gendat_ar1
def gendat_nested0():
ns = Nested_simulator()
ns.error_sd = 1.
ns.params = np.r_[0., 1, 1, -1, -1]
ns.ngroups = 50
ns.nest_sizes = [10, 5]
ns.dparams = [2., 1.]
ns.simulate()
return ns, Nested(ns.id_matrix)
def gendat_nested1():
ns = Nested_simulator()
ns.error_sd = 2.
ns.params = np.r_[0, 1, 1.3, -0.8, -1.2]
ns.ngroups = 50
ns.nest_sizes = [10, 5]
ns.dparams = [1., 3.]
ns.simulate()
return ns, Nested(ns.id_matrix)
nrep = 100
gendats = [gen_gendat_ar0(ar) for ar in (0, 0.3, 0.6)]
gendats.extend([gen_gendat_ar1(ar) for ar in (0, 0.3, 0.6)])
gendats.extend([gendat_nested0, gendat_nested1])
lhs = np.array([[0., 1, 1, 0, 0],])
rhs = np.r_[0.,]
# Loop over data generating models
for gendat in gendats:
pvalues = []
params = []
std_errors = []
dparams = []
for j in range(nrep):
da,va = gendat()
ga = Gaussian()
md = GEE(da.endog, da.exog, da.group, da.time, ga, va)
mdf = md.fit()
scale_inv = 1 / md.estimate_scale()
dparams.append(np.r_[va.dparams, scale_inv])
params.append(np.asarray(mdf.params))
std_errors.append(np.asarray(mdf.standard_errors))
da,va = gendat()
ga = Gaussian()
md = GEE(da.endog, da.exog, da.group, da.time, ga, va,
constraint=(lhs, rhs))
mdf = md.fit()
score = md.score_test_results
pvalue = score["p-value"]
pvalues.append(pvalue)
dparams_mean = np.array(sum(dparams) / len(dparams))
OUT.write("Checking dependence parameters:\n")
da.print_dparams(dparams_mean)
params = np.array(params)
eparams = params.mean(0)
sdparams = params.std(0)
std_errors = np.array(std_errors)
std_errors = std_errors.mean(0)
OUT.write("Checking parameter values:\n")
OUT.write("Observed: ")
OUT.write(np.array_str(eparams) + "\n")
OUT.write("Expected: ")
OUT.write(np.array_str(da.params) + "\n")
OUT.write("Absolute difference: ")
OUT.write(np.array_str(eparams - da.params) + "\n")
OUT.write("Relative difference: ")
OUT.write(np.array_str((eparams - da.params) / da.params) + "\n")
OUT.write("\n")
OUT.write("Checking standard errors\n")
OUT.write("Observed: ")
OUT.write(np.array_str(sdparams) + "\n")
OUT.write("Expected: ")
OUT.write(np.array_str(std_errors) + "\n")
OUT.write("Absolute difference: ")
OUT.write(np.array_str(sdparams - std_errors) + "\n")
OUT.write("Relative difference: ")
OUT.write(np.array_str((sdparams - std_errors) / std_errors) + "\n")
OUT.write("\n")
pvalues.sort()
OUT.write("Checking constrained estimation:\n")
OUT.write("Left hand side:\n")
OUT.write(np.array_str(lhs) + "\n")
OUT.write("Right hand side:\n")
OUT.write(np.array_str(rhs) + "\n")
OUT.write("Observed p-values Expected Null p-values\n")
for q in np.arange(0.1, 0.91, 0.1):
OUT.write("%20.3f %20.3f\n" % (pvalues[int(q*len(pvalues))], q))
OUT.write("=" * 80 + "\n\n")
OUT.close()