"""
Implementation of Regression on Order Statistics for imputing left-
censored (non-detect data)
Method described in *Nondetects and Data Analysis* by Dennis R.
Helsel (John Wiley, 2005) to estimate the left-censored (non-detect)
values of a dataset.
Author: Paul M. Hobson
Company: Geosyntec Consultants (Portland, OR)
Date: 2016-06-14
"""
import warnings
import numpy
from scipy import stats
import pandas
def _ros_sort(df, observations, censorship, warn=False):
"""
This function prepares a dataframe for ROS.
It sorts ascending with
left-censored observations first. Censored observations larger than
the maximum uncensored observations are removed from the dataframe.
Parameters
----------
df : pandas.DataFrame
observations : str
Name of the column in the dataframe that contains observed
values. Censored values should be set to the detection (upper)
limit.
censorship : str
Name of the column in the dataframe that indicates that a
observation is left-censored. (i.e., True -> censored,
False -> uncensored)
Returns
------
sorted_df : pandas.DataFrame
The sorted dataframe with all columns dropped except the
observation and censorship columns.
"""
# separate uncensored data from censored data
censored = df[df[censorship]].sort_values(observations, axis=0)
uncensored = df[~df[censorship]].sort_values(observations, axis=0)
if censored[observations].max() > uncensored[observations].max():
censored = censored[censored[observations] <= uncensored[observations].max()]
if warn:
msg = ("Dropping censored observations greater than "
"the max uncensored observation.")
warnings.warn(msg)
return censored.append(uncensored)[[observations, censorship]].reset_index(drop=True)
def cohn_numbers(df, observations, censorship):
r"""
Computes the Cohn numbers for the detection limits in the dataset.
The Cohn Numbers are:
- :math:`A_j =` the number of uncensored obs above the jth
threshold.
- :math:`B_j =` the number of observations (cen & uncen) below
the jth threshold.
- :math:`C_j =` the number of censored observations at the jth
threshold.
- :math:`\mathrm{PE}_j =` the probability of exceeding the jth
threshold
- :math:`\mathrm{DL}_j =` the unique, sorted detection limits
- :math:`\mathrm{DL}_{j+1} = \mathrm{DL}_j` shifted down a
single index (row)
Parameters
----------
dataframe : pandas.DataFrame
observations : str
Name of the column in the dataframe that contains observed
values. Censored values should be set to the detection (upper)
limit.
censorship : str
Name of the column in the dataframe that indicates that a
observation is left-censored. (i.e., True -> censored,
False -> uncensored)
Returns
-------
cohn : pandas.DataFrame
"""
def nuncen_above(row):
""" A, the number of uncensored obs above the given threshold.
"""
# index of observations above the lower_dl DL
above = df[observations] >= row['lower_dl']
# index of observations below the upper_dl DL
below = df[observations] < row['upper_dl']
# index of non-detect observations
detect = ~df[censorship]
# return the number of observations where all conditions are True
return df[above & below & detect].shape[0]
def nobs_below(row):
""" B, the number of observations (cen & uncen) below the given
threshold
"""
# index of data less than the lower_dl DL
less_than = df[observations] < row['lower_dl']
# index of data less than or equal to the lower_dl DL
less_thanequal = df[observations] <= row['lower_dl']
# index of detects, non-detects
uncensored = ~df[censorship]
censored = df[censorship]
# number observations less than or equal to lower_dl DL and non-detect
LTE_censored = df[less_thanequal & censored].shape[0]
# number of observations less than lower_dl DL and detected
LT_uncensored = df[less_than & uncensored].shape[0]
# return the sum
return LTE_censored + LT_uncensored
def ncen_equal(row):
""" C, the number of censored observations at the given
threshold.
"""
censored_index = df[censorship]
censored_data = df[observations][censored_index]
censored_below = censored_data == row['lower_dl']
return censored_below.sum()
def set_upper_limit(cohn):
""" Sets the upper_dl DL for each row of the Cohn dataframe. """
if cohn.shape[0] > 1:
return cohn['lower_dl'].shift(-1).fillna(value=numpy.inf)
else:
return [numpy.inf]
def compute_PE(A, B):
""" Computes the probability of excedance for each row of the
Cohn dataframe. """
N = len(A)
PE = numpy.empty(N, dtype='float64')
PE[-1] = 0.0
for j in range(N-2, -1, -1):
PE[j] = PE[j+1] + (1 - PE[j+1]) * A[j] / (A[j] + B[j])
return PE
# unique, sorted detection limts
censored_data = df[censorship]
DLs = pandas.unique(df.loc[censored_data, observations])
DLs.sort()
# if there is a observations smaller than the minimum detection limit,
# add that value to the array
if DLs.shape[0] > 0:
if df[observations].min() < DLs.min():
DLs = numpy.hstack([df[observations].min(), DLs])
# create a dataframe
# (editted for pandas 0.14 compatibility; see commit 63f162e
# when `pipe` and `assign` are available)
cohn = pandas.DataFrame(DLs, columns=['lower_dl'])
cohn.loc[:, 'upper_dl'] = set_upper_limit(cohn)
cohn.loc[:, 'nuncen_above'] = cohn.apply(nuncen_above, axis=1)
cohn.loc[:, 'nobs_below'] = cohn.apply(nobs_below, axis=1)
cohn.loc[:, 'ncen_equal'] = cohn.apply(ncen_equal, axis=1)
cohn = cohn.reindex(range(DLs.shape[0] + 1))
cohn.loc[:, 'prob_exceedance'] = compute_PE(cohn['nuncen_above'], cohn['nobs_below'])
else:
dl_cols = ['lower_dl', 'upper_dl', 'nuncen_above',
'nobs_below', 'ncen_equal', 'prob_exceedance']
cohn = pandas.DataFrame(numpy.empty((0, len(dl_cols))), columns=dl_cols)
return cohn
def _detection_limit_index(obs, cohn):
"""
Locates the corresponding detection limit for each observation.
Basically, creates an array of indices for the detection limits
(Cohn numbers) corresponding to each data point.
Parameters
----------
obs : float
A single observation from the larger dataset.
cohn : pandas.DataFrame
DataFrame of Cohn numbers.
Returns
-------
det_limit_index : int
The index of the corresponding detection limit in `cohn`
See Also
--------
cohn_numbers
"""
if cohn.shape[0] > 0:
index, = numpy.where(cohn['lower_dl'] <= obs)
det_limit_index = index[-1]
else:
det_limit_index = 0
return det_limit_index
def _ros_group_rank(df, dl_idx, censorship):
"""
Ranks each observation within the data groups.
In this case, the groups are defined by the record's detection
limit index and censorship status.
Parameters
----------
df : pandas.DataFrame
dl_idx : str
Name of the column in the dataframe the index of the
observations' corresponding detection limit in the `cohn`
dataframe.
censorship : str
Name of the column in the dataframe that indicates that a
observation is left-censored. (i.e., True -> censored,
False -> uncensored)
Returns
-------
ranks : numpy.array
Array of ranks for the dataset.
"""
# (editted for pandas 0.14 compatibility; see commit 63f162e
# when `pipe` and `assign` are available)
ranks = df.copy()
ranks.loc[:, 'rank'] = 1
ranks = (
ranks.groupby(by=[dl_idx, censorship])['rank']
.transform(lambda g: g.cumsum())
)
return ranks
def _ros_plot_pos(row, censorship, cohn):
"""
ROS-specific plotting positions.
Computes the plotting position for an observation based on its rank,
censorship status, and detection limit index.
Parameters
----------
row : pandas.Series or dict-like
Full observation (row) from a censored dataset. Requires a
'rank', 'detection_limit', and `censorship` column.
censorship : str
Name of the column in the dataframe that indicates that a
observation is left-censored. (i.e., True -> censored,
False -> uncensored)
cohn : pandas.DataFrame
DataFrame of Cohn numbers.
Returns
-------
plotting_position : float
See Also
--------
cohn_numbers
"""
DL_index = row['det_limit_index']
rank = row['rank']
censored = row[censorship]
dl_1 = cohn.iloc[DL_index]
dl_2 = cohn.iloc[DL_index + 1]
if censored:
return (1 - dl_1['prob_exceedance']) * rank / (dl_1['ncen_equal']+1)
else:
return (1 - dl_1['prob_exceedance']) + (dl_1['prob_exceedance'] - dl_2['prob_exceedance']) * \
rank / (dl_1['nuncen_above']+1)
def _norm_plot_pos(observations):
"""
Computes standard normal (Gaussian) plotting positions using scipy.
Parameters
----------
observations : array_like
Sequence of observed quantities.
Returns
-------
plotting_position : array of floats
"""
ppos, sorted_res = stats.probplot(observations, fit=False)
return stats.norm.cdf(ppos)
def plotting_positions(df, censorship, cohn):
"""
Compute the plotting positions for the observations.
The ROS-specific plotting postions are based on the observations'
rank, censorship status, and corresponding detection limit.
Parameters
----------
df : pandas.DataFrame
censorship : str
Name of the column in the dataframe that indicates that a
Loading ...