Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ sandbox / nonparametric / kernels.py

# -*- coding: utf-8 -*-

"""
This models contains the Kernels for Kernel smoothing.

Hopefully in the future they may be reused/extended for other kernel based
method

References:
----------

Pointwise Kernel Confidence Bounds
(smoothconf)
http://fedc.wiwi.hu-berlin.de/xplore/ebooks/html/anr/anrhtmlframe62.html
"""

# pylint: disable-msg=C0103
# pylint: disable-msg=W0142
# pylint: disable-msg=E1101
# pylint: disable-msg=E0611
from statsmodels.compat.python import lzip, lfilter
import numpy as np
import scipy.integrate
from scipy.special import factorial
from numpy import exp, multiply, square, divide, subtract, inf


class NdKernel(object):
    """Generic N-dimensial kernel

    Parameters
    ----------
    n : int
        The number of series for kernel estimates
    kernels : list
        kernels

    Can be constructed from either
    a) a list of n kernels which will be treated as
    indepent marginals on a gaussian copula (specified by H)
    or b) a single univariate kernel which will be applied radially to the
    mahalanobis distance defined by H.

    In the case of the Gaussian these are both equivalent, and the second constructiong
    is prefered.
    """
    def __init__(self, n, kernels = None, H = None):
        if kernels is None:
            kernels = Gaussian()

        self._kernels = kernels
        self.weights = None

        if H is None:
            H = np.matrix( np.identity(n))

        self._H = H
        self._Hrootinv = np.linalg.cholesky( H.I )

    def getH(self):
        """Getter for kernel bandwidth, H"""
        return self._H

    def setH(self, value):
        """Setter for kernel bandwidth, H"""
        self._H = value

    H = property(getH, setH, doc="Kernel bandwidth matrix")

    def density(self, xs, x):

        n = len(xs)
        #xs = self.in_domain( xs, xs, x )[0]

        if len(xs)>0:  ## Need to do product of marginal distributions
            #w = np.sum([self(self._Hrootinv * (xx-x).T ) for xx in xs])/n
            #vectorized does not work:
            if self.weights is not None:
                w = np.mean(self((xs-x) * self._Hrootinv).T * self.weights)/sum(self.weights)
            else:
                w = np.mean(self((xs-x) * self._Hrootinv )) #transposed
            #w = np.mean([self(xd) for xd in ((xs-x) * self._Hrootinv)] ) #transposed
            return w
        else:
            return np.nan

    def _kernweight(self, x ):
        """returns the kernel weight for the independent multivariate kernel"""
        if isinstance( self._kernels, CustomKernel ):
            ## Radial case
            #d = x.T * x
            #x is matrix, 2d, element wise sqrt looks wrong
            #d = np.sqrt( x.T * x )
            x = np.asarray(x)
            #d = np.sqrt( (x * x).sum(-1) )
            d = (x * x).sum(-1)
            return self._kernels( np.asarray(d) )

    def __call__(self, x):
        """
        This simply returns the value of the kernel function at x

        Does the same as weight if the function is normalised
        """
        return self._kernweight(x)


class CustomKernel(object):
    """
    Generic 1D Kernel object.
    Can be constructed by selecting a standard named Kernel,
    or providing a lambda expression and domain.
    The domain allows some algorithms to run faster for finite domain kernels.
    """
    # MC: Not sure how this will look in the end - or even still exist.
    # Main purpose of this is to allow custom kernels and to allow speed up
    # from finite support.

    def __init__(self, shape, h = 1.0, domain = None, norm = None):
        """
        shape should be a function taking and returning numeric type.

        For sanity it should always return positive or zero but this is not
        enforced in case you want to do weird things. Bear in mind that the
        statistical tests etc. may not be valid for non-positive kernels.

        The bandwidth of the kernel is supplied as h.

        You may specify a domain as a list of 2 values [min, max], in which case
        kernel will be treated as zero outside these values. This will speed up
        calculation.

        You may also specify the normalisation constant for the supplied Kernel.
        If you do this number will be stored and used as the normalisation
        without calculation.  It is recommended you do this if you know the
        constant, to speed up calculation.  In particular if the shape function
        provided is already normalised you should provide norm = 1.0.

        Warning: I think several calculations assume that the kernel is
        normalized. No tests for non-normalized kernel.
        """
        self._normconst = norm   # a value or None, if None, then calculate
        self.domain = domain
        self.weights = None
        if callable(shape):
            self._shape = shape
        else:
            raise TypeError("shape must be a callable object/function")
        self._h = h
        self._L2Norm = None
        self._kernel_var = None
        self._normal_reference_constant = None
        self._order = None

    def geth(self):
        """Getter for kernel bandwidth, h"""
        return self._h
    def seth(self, value):
        """Setter for kernel bandwidth, h"""
        self._h = value
    h = property(geth, seth, doc="Kernel Bandwidth")

    def in_domain(self, xs, ys, x):
        """
        Returns the filtered (xs, ys) based on the Kernel domain centred on x
        """
        # Disable black-list functions: filter used for speed instead of
        # list-comprehension
        # pylint: disable-msg=W0141
        def isInDomain(xy):
            """Used for filter to check if point is in the domain"""
            u = (xy[0]-x)/self.h
            return u >= self.domain[0] and u <= self.domain[1]

        if self.domain is None:
            return (xs, ys)
        else:
            filtered = lfilter(isInDomain, lzip(xs, ys))
            if len(filtered) > 0:
                xs, ys = lzip(*filtered)
                return (xs, ys)
            else:
                return ([], [])

    def density(self, xs, x):
        """Returns the kernel density estimate for point x based on x-values
        xs
        """
        xs = np.asarray(xs)
        n = len(xs) # before in_domain?
        if self.weights is not None:
            xs, weights = self.in_domain( xs, self.weights, x )
        else:
            xs = self.in_domain( xs, xs, x )[0]
        xs = np.asarray(xs)
        #print 'len(xs)', len(xs), x
        if xs.ndim == 1:
            xs = xs[:,None]
        if len(xs)>0:
            h = self.h
            if self.weights is not None:
                w = 1 / h * np.sum(self((xs-x)/h).T * weights, axis=1)
            else:
                w = 1. / (h * n) * np.sum(self((xs-x)/h), axis=0)
            return w
        else:
            return np.nan

    def density_var(self, density, nobs):
        """approximate pointwise variance for kernel density

        not verified

        Parameters
        ----------
        density : array_lie
            pdf of the kernel density
        nobs : int
            number of observations used in the KDE estimation

        Returns
        -------
        kde_var : ndarray
            estimated variance of the density estimate

        Notes
        -----
        This uses the asymptotic normal approximation to the distribution of
        the density estimate.
        """
        return np.asarray(density) * self.L2Norm / self.h / nobs

    def density_confint(self, density, nobs, alpha=0.05):
        """approximate pointwise confidence interval for kernel density

        The confidence interval is centered at the estimated density and
        ignores the bias of the density estimate.

        not verified

        Parameters
        ----------
        density : array_lie
            pdf of the kernel density
        nobs : int
            number of observations used in the KDE estimation

        Returns
        -------
        conf_int : ndarray
            estimated confidence interval of the density estimate, lower bound
            in first column and upper bound in second column

        Notes
        -----
        This uses the asymptotic normal approximation to the distribution of
        the density estimate. The lower bound can be negative for density
        values close to zero.
        """
        from scipy import stats
        crit = stats.norm.isf(alpha / 2.)
        density = np.asarray(density)
        half_width = crit * np.sqrt(self.density_var(density, nobs))
        conf_int = np.column_stack((density - half_width, density + half_width))
        return conf_int

    def smooth(self, xs, ys, x):
        """Returns the kernel smoothing estimate for point x based on x-values
        xs and y-values ys.
        Not expected to be called by the user.
        """
        xs, ys = self.in_domain(xs, ys, x)

        if len(xs)>0:
            w = np.sum(self((xs-x)/self.h))
            #TODO: change the below to broadcasting when shape is sorted
            v = np.sum([yy*self((xx-x)/self.h) for xx, yy in zip(xs, ys)])
            return v / w
        else:
            return np.nan

    def smoothvar(self, xs, ys, x):
        """Returns the kernel smoothing estimate of the variance at point x.
        """
        xs, ys = self.in_domain(xs, ys, x)

        if len(xs) > 0:
            fittedvals = np.array([self.smooth(xs, ys, xx) for xx in xs])
            sqresid = square( subtract(ys, fittedvals) )
            w = np.sum(self((xs-x)/self.h))
            v = np.sum([rr*self((xx-x)/self.h) for xx, rr in zip(xs, sqresid)])
            return v / w
        else:
            return np.nan

    def smoothconf(self, xs, ys, x, alpha=0.05):
        """Returns the kernel smoothing estimate with confidence 1sigma bounds
        """
        xs, ys = self.in_domain(xs, ys, x)

        if len(xs) > 0:
            fittedvals = np.array([self.smooth(xs, ys, xx) for xx in xs])
            #fittedvals = self.smooth(xs, ys, x) # x or xs in Haerdle
            sqresid = square(
                subtract(ys, fittedvals)
            )
            w = np.sum(self((xs-x)/self.h))
            #var = sqresid.sum() / (len(sqresid) - 0)  # nonlocal var ? JP just trying
            v = np.sum([rr*self((xx-x)/self.h) for xx, rr in zip(xs, sqresid)])
            var = v / w
            sd = np.sqrt(var)
            K = self.L2Norm
            yhat = self.smooth(xs, ys, x)
            from scipy import stats
            crit = stats.norm.isf(alpha / 2)
            err = crit * sd * np.sqrt(K) / np.sqrt(w * self.h * self.norm_const)
            return (yhat - err, yhat, yhat + err)
        else:
            return (np.nan, np.nan, np.nan)

    @property
    def L2Norm(self):
        """Returns the integral of the square of the kernal from -inf to inf"""
        if self._L2Norm is None:
            L2Func = lambda x: (self.norm_const*self._shape(x))**2
            if self.domain is None:
                self._L2Norm = scipy.integrate.quad(L2Func, -inf, inf)[0]
            else:
                self._L2Norm = scipy.integrate.quad(L2Func, self.domain[0],
                                               self.domain[1])[0]
        return self._L2Norm

    @property
    def norm_const(self):
        """
        Normalising constant for kernel (integral from -inf to inf)
        """
        if self._normconst is None:
            if self.domain is None:
                quadres = scipy.integrate.quad(self._shape, -inf, inf)
            else:
                quadres = scipy.integrate.quad(self._shape, self.domain[0],
                                               self.domain[1])
            self._normconst = 1.0/(quadres[0])
        return self._normconst
Loading ...