Repository URL to install this package:
Version:
0.11.1 ▾
|
"""
Base tools for handling various kinds of data structures, attaching metadata to
results, and doing data cleaning
"""
from statsmodels.compat.python import iteritems, lmap
from functools import reduce
import numpy as np
from pandas import DataFrame, Series, isnull, MultiIndex
import statsmodels.tools.data as data_util
from statsmodels.tools.decorators import cache_readonly, cache_writable
from statsmodels.tools.sm_exceptions import MissingDataError
def _asarray_2dcolumns(x):
if np.asarray(x).ndim > 1 and np.asarray(x).squeeze().ndim == 1:
return
def _asarray_2d_null_rows(x):
"""
Makes sure input is an array and is 2d. Makes sure output is 2d. True
indicates a null in the rows of 2d x.
"""
#Have to have the asarrays because isnull does not account for array_like
#input
x = np.asarray(x)
if x.ndim == 1:
x = x[:, None]
return np.any(isnull(x), axis=1)[:, None]
def _nan_rows(*arrs):
"""
Returns a boolean array which is True where any of the rows in any
of the _2d_ arrays in arrs are NaNs. Inputs can be any mixture of Series,
DataFrames or array_like.
"""
if len(arrs) == 1:
arrs += ([[False]],)
def _nan_row_maybe_two_inputs(x, y):
# check for dtype bc dataframe has dtypes
x_is_boolean_array = hasattr(x, 'dtype') and x.dtype == bool and x
return np.logical_or(_asarray_2d_null_rows(x),
(x_is_boolean_array | _asarray_2d_null_rows(y)))
return reduce(_nan_row_maybe_two_inputs, arrs).squeeze()
class ModelData(object):
"""
Class responsible for handling input data and extracting metadata into the
appropriate form
"""
_param_names = None
_cov_names = None
def __init__(self, endog, exog=None, missing='none', hasconst=None,
**kwargs):
if data_util._is_recarray(endog) or data_util._is_recarray(exog):
import warnings
from statsmodels.tools.sm_exceptions import recarray_warning
warnings.warn(recarray_warning, FutureWarning)
if 'design_info' in kwargs:
self.design_info = kwargs.pop('design_info')
if 'formula' in kwargs:
self.formula = kwargs.pop('formula')
if missing != 'none':
arrays, nan_idx = self.handle_missing(endog, exog, missing,
**kwargs)
self.missing_row_idx = nan_idx
self.__dict__.update(arrays) # attach all the data arrays
self.orig_endog = self.endog
self.orig_exog = self.exog
self.endog, self.exog = self._convert_endog_exog(self.endog,
self.exog)
else:
self.__dict__.update(kwargs) # attach the extra arrays anyway
self.orig_endog = endog
self.orig_exog = exog
self.endog, self.exog = self._convert_endog_exog(endog, exog)
self.const_idx = None
self.k_constant = 0
self._handle_constant(hasconst)
self._check_integrity()
self._cache = {}
def __getstate__(self):
from copy import copy
d = copy(self.__dict__)
if "design_info" in d:
del d["design_info"]
d["restore_design_info"] = True
return d
def __setstate__(self, d):
if "restore_design_info" in d:
# NOTE: there may be a more performant way to do this
from patsy import dmatrices, PatsyError
exc = []
try:
data = d['frame']
except KeyError:
data = d['orig_endog'].join(d['orig_exog'])
for depth in [2, 3, 1, 0, 4]: # sequence is a guess where to likely find it
try:
_, design = dmatrices(d['formula'], data, eval_env=depth,
return_type='dataframe')
break
except (NameError, PatsyError) as e:
exc.append(e) # why do I need a reference from outside except block
pass
else:
raise exc[-1]
self.design_info = design.design_info
del d["restore_design_info"]
self.__dict__.update(d)
def _handle_constant(self, hasconst):
if hasconst is False or self.exog is None:
self.k_constant = 0
self.const_idx = None
else:
# detect where the constant is
check_implicit = False
exog_max = np.max(self.exog, axis=0)
if not np.isfinite(exog_max).all():
raise MissingDataError('exog contains inf or nans')
exog_min = np.min(self.exog, axis=0)
const_idx = np.where(exog_max == exog_min)[0].squeeze()
self.k_constant = const_idx.size
if self.k_constant == 1:
if self.exog[:, const_idx].mean() != 0:
self.const_idx = int(const_idx)
else:
# we only have a zero column and no other constant
check_implicit = True
elif self.k_constant > 1:
# we have more than one constant column
# look for ones
values = [] # keep values if we need != 0
for idx in const_idx:
value = self.exog[:, idx].mean()
if value == 1:
self.k_constant = 1
self.const_idx = int(idx)
break
values.append(value)
else:
# we did not break, no column of ones
pos = (np.array(values) != 0)
if pos.any():
# take the first nonzero column
self.k_constant = 1
self.const_idx = int(const_idx[pos.argmax()])
else:
# only zero columns
check_implicit = True
elif self.k_constant == 0:
check_implicit = True
else:
# should not be here
pass
if check_implicit and not hasconst:
# look for implicit constant
# Compute rank of augmented matrix
augmented_exog = np.column_stack(
(np.ones(self.exog.shape[0]), self.exog))
rank_augm = np.linalg.matrix_rank(augmented_exog)
rank_orig = np.linalg.matrix_rank(self.exog)
self.k_constant = int(rank_orig == rank_augm)
self.const_idx = None
elif hasconst:
# Ensure k_constant is 1 any time hasconst is True
# even if one is not found
self.k_constant = 1
@classmethod
def _drop_nans(cls, x, nan_mask):
return x[nan_mask]
@classmethod
def _drop_nans_2d(cls, x, nan_mask):
return x[nan_mask][:, nan_mask]
@classmethod
def handle_missing(cls, endog, exog, missing, **kwargs):
"""
This returns a dictionary with keys endog, exog and the keys of
kwargs. It preserves Nones.
"""
none_array_names = []
# patsy's already dropped NaNs in y/X
missing_idx = kwargs.pop('missing_idx', None)
if missing_idx is not None:
# y, X already handled by patsy. add back in later.
combined = ()
combined_names = []
if exog is None:
none_array_names += ['exog']
elif exog is not None:
combined = (endog, exog)
combined_names = ['endog', 'exog']
else:
combined = (endog,)
combined_names = ['endog']
none_array_names += ['exog']
# deal with other arrays
combined_2d = ()
combined_2d_names = []
if len(kwargs):
for key, value_array in iteritems(kwargs):
if value_array is None or value_array.ndim == 0:
none_array_names += [key]
continue
# grab 1d arrays
if value_array.ndim == 1:
combined += (np.asarray(value_array),)
combined_names += [key]
elif value_array.squeeze().ndim == 1:
combined += (np.asarray(value_array),)
combined_names += [key]
# grab 2d arrays that are _assumed_ to be symmetric
elif value_array.ndim == 2:
combined_2d += (np.asarray(value_array),)
combined_2d_names += [key]
else:
raise ValueError("Arrays with more than 2 dimensions "
"are not yet handled")
if missing_idx is not None:
nan_mask = missing_idx
updated_row_mask = None
if combined: # there were extra arrays not handled by patsy
combined_nans = _nan_rows(*combined)
if combined_nans.shape[0] != nan_mask.shape[0]:
raise ValueError("Shape mismatch between endog/exog "
"and extra arrays given to model.")
# for going back and updated endog/exog
updated_row_mask = combined_nans[~nan_mask]
nan_mask |= combined_nans # for updating extra arrays only
if combined_2d:
combined_2d_nans = _nan_rows(combined_2d)
if combined_2d_nans.shape[0] != nan_mask.shape[0]:
raise ValueError("Shape mismatch between endog/exog "
"and extra 2d arrays given to model.")
if updated_row_mask is not None:
updated_row_mask |= combined_2d_nans[~nan_mask]
else:
updated_row_mask = combined_2d_nans[~nan_mask]
nan_mask |= combined_2d_nans
else:
nan_mask = _nan_rows(*combined)
if combined_2d:
nan_mask = _nan_rows(*(nan_mask[:, None],) + combined_2d)
if not np.any(nan_mask): # no missing do not do anything
combined = dict(zip(combined_names, combined))
if combined_2d:
combined.update(dict(zip(combined_2d_names, combined_2d)))
if none_array_names:
combined.update(dict(zip(none_array_names,
[None] * len(none_array_names))))
if missing_idx is not None:
combined.update({'endog': endog})
if exog is not None:
combined.update({'exog': exog})
return combined, []
elif missing == 'raise':
raise MissingDataError("NaNs were encountered in the data")
elif missing == 'drop':
nan_mask = ~nan_mask
drop_nans = lambda x: cls._drop_nans(x, nan_mask)
drop_nans_2d = lambda x: cls._drop_nans_2d(x, nan_mask)
combined = dict(zip(combined_names, lmap(drop_nans, combined)))
if missing_idx is not None:
if updated_row_mask is not None:
updated_row_mask = ~updated_row_mask
# update endog/exog with this new information
endog = cls._drop_nans(endog, updated_row_mask)
if exog is not None:
exog = cls._drop_nans(exog, updated_row_mask)
combined.update({'endog': endog})
if exog is not None:
combined.update({'exog': exog})
if combined_2d:
combined.update(dict(zip(combined_2d_names,
lmap(drop_nans_2d, combined_2d))))
if none_array_names:
combined.update(dict(zip(none_array_names,
[None] * len(none_array_names))))
return combined, np.where(~nan_mask)[0].tolist()
else:
raise ValueError("missing option %s not understood" % missing)
def _convert_endog_exog(self, endog, exog):
# for consistent outputs if endog is (n,1)
yarr = self._get_yarr(endog)
xarr = None
if exog is not None:
xarr = self._get_xarr(exog)
if xarr.ndim == 1:
xarr = xarr[:, None]
if xarr.ndim != 2:
raise ValueError("exog is not 1d or 2d")
return yarr, xarr
@cache_writable()
def ynames(self):
endog = self.orig_endog
ynames = self._get_names(endog)
if not ynames:
ynames = _make_endog_names(self.endog)
if len(ynames) == 1:
return ynames[0]
else:
return list(ynames)
@cache_writable()
def xnames(self):
exog = self.orig_exog
if exog is not None:
xnames = self._get_names(exog)
if not xnames:
xnames = _make_exog_names(self.exog)
return list(xnames)
return None
@property
def param_names(self):
# for handling names of 'extra' parameters in summary, etc.
return self._param_names or self.xnames
@param_names.setter
def param_names(self, values):
self._param_names = values
@property
def cov_names(self):
"""
Labels for covariance matrices
In multidimensional models, each dimension of a covariance matrix
differs from the number of param_names.
If not set, returns param_names
"""
# for handling names of covariance names in multidimensional models
if self._cov_names is not None:
return self._cov_names
return self.param_names
@cov_names.setter
def cov_names(self, value):
# for handling names of covariance names in multidimensional models
self._cov_names = value
@cache_readonly
def row_labels(self):
exog = self.orig_exog
if exog is not None:
row_labels = self._get_row_labels(exog)
else:
endog = self.orig_endog
row_labels = self._get_row_labels(endog)
return row_labels
def _get_row_labels(self, arr):
return None
def _get_names(self, arr):
if isinstance(arr, DataFrame):
if isinstance(arr.columns, MultiIndex):
# Flatten MultiIndexes into "simple" column names
return ['_'.join((level for level in c if level))
for c in arr.columns]
else:
return list(arr.columns)
elif isinstance(arr, Series):
if arr.name:
return [arr.name]
else:
return
else:
try:
return arr.dtype.names
except AttributeError:
pass
return None
def _get_yarr(self, endog):
if data_util._is_structured_ndarray(endog):
endog = data_util.struct_to_ndarray(endog)
endog = np.asarray(endog)
if len(endog) == 1: # never squeeze to a scalar
if endog.ndim == 1:
return endog
elif endog.ndim > 1:
return np.asarray([endog.squeeze()])
return endog.squeeze()
def _get_xarr(self, exog):
if data_util._is_structured_ndarray(exog):
exog = data_util.struct_to_ndarray(exog)
return np.asarray(exog)
def _check_integrity(self):
if self.exog is not None:
if len(self.exog) != len(self.endog):
raise ValueError("endog and exog matrices are different sizes")
def wrap_output(self, obj, how='columns', names=None):
if how == 'columns':
return self.attach_columns(obj)
elif how == 'rows':
return self.attach_rows(obj)
elif how == 'cov':
return self.attach_cov(obj)
elif how == 'dates':
return self.attach_dates(obj)
elif how == 'columns_eq':
return self.attach_columns_eq(obj)
elif how == 'cov_eq':
return self.attach_cov_eq(obj)
elif how == 'generic_columns':
return self.attach_generic_columns(obj, names)
elif how == 'generic_columns_2d':
return self.attach_generic_columns_2d(obj, names)
elif how == 'ynames':
return self.attach_ynames(obj)
elif how == 'multivariate_confint':
return self.attach_mv_confint(obj)
else:
return obj
def attach_columns(self, result):
return result
def attach_columns_eq(self, result):
return result
def attach_cov(self, result):
return result
def attach_cov_eq(self, result):
return result
def attach_rows(self, result):
return result
def attach_dates(self, result):
return result
def attach_mv_confint(self, result):
return result
def attach_generic_columns(self, result, *args, **kwargs):
return result
def attach_generic_columns_2d(self, result, *args, **kwargs):
return result
def attach_ynames(self, result):
return result
class PatsyData(ModelData):
def _get_names(self, arr):
return arr.design_info.column_names
class PandasData(ModelData):
"""
Data handling class which knows how to reattach pandas metadata to model
results
"""
def _convert_endog_exog(self, endog, exog=None):
#TODO: remove this when we handle dtype systematically
endog = np.asarray(endog)
exog = exog if exog is None else np.asarray(exog)
if endog.dtype == object or exog is not None and exog.dtype == object:
raise ValueError("Pandas data cast to numpy dtype of object. "
"Check input data with np.asarray(data).")
return super(PandasData, self)._convert_endog_exog(endog, exog)
@classmethod
def _drop_nans(cls, x, nan_mask):
if isinstance(x, (Series, DataFrame)):
return x.loc[nan_mask]
else: # extra arguments could be plain ndarrays
return super(PandasData, cls)._drop_nans(x, nan_mask)
@classmethod
def _drop_nans_2d(cls, x, nan_mask):
if isinstance(x, (Series, DataFrame)):
return x.loc[nan_mask].loc[:, nan_mask]
else: # extra arguments could be plain ndarrays
return super(PandasData, cls)._drop_nans_2d(x, nan_mask)
def _check_integrity(self):
endog, exog = self.orig_endog, self.orig_exog
# exog can be None and we could be upcasting one or the other
if (exog is not None and
(hasattr(endog, 'index') and hasattr(exog, 'index')) and
not self.orig_endog.index.equals(self.orig_exog.index)):
raise ValueError("The indices for endog and exog are not aligned")
super(PandasData, self)._check_integrity()
def _get_row_labels(self, arr):
try:
return arr.index
except AttributeError:
# if we've gotten here it's because endog is pandas and
# exog is not, so just return the row labels from endog
return self.orig_endog.index
def attach_generic_columns(self, result, names):
# get the attribute to use
column_names = getattr(self, names, None)
return Series(result, index=column_names)
def attach_generic_columns_2d(self, result, rownames, colnames=None):
colnames = colnames or rownames
rownames = getattr(self, rownames, None)
colnames = getattr(self, colnames, None)
return DataFrame(result, index=rownames, columns=colnames)
def attach_columns(self, result):
# this can either be a 1d array or a scalar
# do not squeeze because it might be a 2d row array
# if it needs a squeeze, the bug is elsewhere
if result.ndim <= 1:
return Series(result, index=self.param_names)
else: # for e.g., confidence intervals
return DataFrame(result, index=self.param_names)
def attach_columns_eq(self, result):
return DataFrame(result, index=self.xnames, columns=self.ynames)
def attach_cov(self, result):
return DataFrame(result, index=self.cov_names, columns=self.cov_names)
def attach_cov_eq(self, result):
return DataFrame(result, index=self.ynames, columns=self.ynames)
def attach_rows(self, result):
# assumes if len(row_labels) > len(result) it's bc it was truncated
# at the front, for AR lags, for example
squeezed = result.squeeze()
k_endog = np.array(self.ynames, ndmin=1).shape[0]
if k_endog > 1 and squeezed.shape == (k_endog,):
squeezed = squeezed[None, :]
# May be zero-dim, for example in the case of forecast one step in tsa
if squeezed.ndim < 2:
return Series(squeezed, index=self.row_labels[-len(result):])
else:
return DataFrame(result, index=self.row_labels[-len(result):],
columns=self.ynames)
def attach_dates(self, result):
squeezed = result.squeeze()
k_endog = np.array(self.ynames, ndmin=1).shape[0]
if k_endog > 1 and squeezed.shape == (k_endog,):
squeezed = np.asarray(squeezed)[None, :]
# May be zero-dim, for example in the case of forecast one step in tsa
if squeezed.ndim < 2:
return Series(squeezed, index=self.predict_dates)
else:
return DataFrame(result, index=self.predict_dates,
columns=self.ynames)
def attach_mv_confint(self, result):
return DataFrame(result.reshape((-1, 2)),
index=self.cov_names,
columns=['lower', 'upper'])
def attach_ynames(self, result):
squeezed = result.squeeze()
# May be zero-dim, for example in the case of forecast one step in tsa
if squeezed.ndim < 2:
return Series(squeezed, name=self.ynames)
else:
return DataFrame(result, columns=self.ynames)
def _make_endog_names(endog):
if endog.ndim == 1 or endog.shape[1] == 1:
ynames = ['y']
else: # for VAR
ynames = ['y%d' % (i+1) for i in range(endog.shape[1])]
return ynames
def _make_exog_names(exog):
exog_var = exog.var(0)
if (exog_var == 0).any():
# assumes one constant in first or last position
# avoid exception if more than one constant
const_idx = exog_var.argmin()
exog_names = ['x%d' % i for i in range(1, exog.shape[1])]
exog_names.insert(const_idx, 'const')
else:
exog_names = ['x%d' % i for i in range(1, exog.shape[1]+1)]
return exog_names
def handle_missing(endog, exog=None, missing='none', **kwargs):
klass = handle_data_class_factory(endog, exog)
if missing == 'none':
ret_dict = dict(endog=endog, exog=exog)
ret_dict.update(kwargs)
return ret_dict, None
return klass.handle_missing(endog, exog, missing=missing, **kwargs)
def handle_data_class_factory(endog, exog):
"""
Given inputs
"""
if data_util._is_using_ndarray_type(endog, exog):
klass = ModelData
elif data_util._is_using_pandas(endog, exog):
klass = PandasData
elif data_util._is_using_patsy(endog, exog):
klass = PatsyData
# keep this check last
elif data_util._is_using_ndarray(endog, exog):
klass = ModelData
else:
raise ValueError('unrecognized data structures: %s / %s' %
(type(endog), type(exog)))
return klass
def handle_data(endog, exog, missing='none', hasconst=None, **kwargs):
# deal with lists and tuples up-front
if isinstance(endog, (list, tuple)):
endog = np.asarray(endog)
if isinstance(exog, (list, tuple)):
exog = np.asarray(exog)
klass = handle_data_class_factory(endog, exog)
return klass(endog, exog=exog, missing=missing, hasconst=hasconst,
**kwargs)