Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ sandbox / tsa / diffusion.py

'''getting started with diffusions, continuous time stochastic processes

Author: josef-pktd
License: BSD


References
----------

An Algorithmic Introduction to Numerical Simulation of Stochastic Differential
Equations
Author(s): Desmond J. Higham
Source: SIAM Review, Vol. 43, No. 3 (Sep., 2001), pp. 525-546
Published by: Society for Industrial and Applied Mathematics
Stable URL: http://www.jstor.org/stable/3649798

http://www.sitmo.com/  especially the formula collection


Notes
-----

OU process: use same trick for ARMA with constant (non-zero mean) and drift
some of the processes have easy multivariate extensions

*Open Issues*

include xzero in returned sample or not? currently not

*TODOS*

* Milstein from Higham paper, for which processes does it apply
* Maximum Likelihood estimation
* more statistical properties (useful for tests)
* helper functions for display and MonteCarlo summaries (also for testing/checking)
* more processes for the menagerie (e.g. from empirical papers)
* characteristic functions
* transformations, non-linear e.g. log
* special estimators, e.g. Ait Sahalia, empirical characteristic functions
* fft examples
* check naming of methods, "simulate", "sample", "simexact", ... ?



stochastic volatility models: estimation unclear

finance applications ? option pricing, interest rate models


'''
import numpy as np
from scipy import stats, signal
import matplotlib.pyplot as plt

#np.random.seed(987656789)

class Diffusion(object):
    '''Wiener Process, Brownian Motion with mu=0 and sigma=1
    '''
    def __init__(self):
        pass

    def simulateW(self, nobs=100, T=1, dt=None, nrepl=1):
        '''generate sample of Wiener Process
        '''
        dt = T*1.0/nobs
        t = np.linspace(dt, 1, nobs)
        dW = np.sqrt(dt)*np.random.normal(size=(nrepl, nobs))
        W = np.cumsum(dW,1)
        self.dW = dW
        return W, t

    def expectedsim(self, func, nobs=100, T=1, dt=None, nrepl=1):
        '''get expectation of a function of a Wiener Process by simulation

        initially test example from
        '''
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        U = func(t, W)
        Umean = U.mean(0)
        return U, Umean, t

class AffineDiffusion(Diffusion):
    r'''

    differential equation:

    :math::
    dx_t = f(t,x)dt + \sigma(t,x)dW_t

    integral:

    :math::
    x_T = x_0 + \int_{0}^{T}f(t,S)dt + \int_0^T  \sigma(t,S)dW_t

    TODO: check definition, affine, what about jump diffusion?

    '''

    def __init__(self):
        pass

    def sim(self, nobs=100, T=1, dt=None, nrepl=1):
        # this does not look correct if drift or sig depend on x
        # see arithmetic BM
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        dx =  self._drift() + self._sig() * W
        x  = np.cumsum(dx,1)
        xmean = x.mean(0)
        return x, xmean, t

    def simEM(self, xzero=None, nobs=100, T=1, dt=None, nrepl=1, Tratio=4):
        '''

        from Higham 2001

        TODO: reverse parameterization to start with final nobs and DT
        TODO: check if I can skip the loop using my way from exactprocess
              problem might be Winc (reshape into 3d and sum)
        TODO: (later) check memory efficiency for large simulations
        '''
        #TODO: reverse parameterization to start with final nobs and DT
        nobs = nobs * Tratio  # simple way to change parameter
        # maybe wrong parameterization,
        # drift too large, variance too small ? which dt/Dt
        # _drift, _sig independent of dt is wrong
        if xzero is None:
            xzero = self.xzero
        if dt is None:
            dt = T*1.0/nobs
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        dW = self.dW
        t = np.linspace(dt, 1, nobs)
        Dt = Tratio*dt
        L = nobs/Tratio        # L EM steps of size Dt = R*dt
        Xem = np.zeros((nrepl,L))    # preallocate for efficiency
        Xtemp = xzero
        Xem[:,0] = xzero
        for j in np.arange(1,L):
            #Winc = np.sum(dW[:,Tratio*(j-1)+1:Tratio*j],1)
            Winc = np.sum(dW[:,np.arange(Tratio*(j-1)+1,Tratio*j)],1)
            #Xtemp = Xtemp + Dt*lamda*Xtemp + mu*Xtemp*Winc;
            Xtemp = Xtemp + self._drift(x=Xtemp) + self._sig(x=Xtemp) * Winc
            #Dt*lamda*Xtemp + mu*Xtemp*Winc;
            Xem[:,j] = Xtemp
        return Xem

'''
    R = 4; Dt = R*dt; L = N/R;        % L EM steps of size Dt = R*dt
    Xem = zeros(1,L);                 % preallocate for efficiency
    Xtemp = Xzero;
    for j = 1:L
       Winc = sum(dW(R*(j-1)+1:R*j));
       Xtemp = Xtemp + Dt*lambda*Xtemp + mu*Xtemp*Winc;
       Xem(j) = Xtemp;
    end
'''

class ExactDiffusion(AffineDiffusion):
    '''Diffusion that has an exact integral representation

    this is currently mainly for geometric, log processes

    '''

    def __init__(self):
        pass

    def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
        '''ddt : discrete delta t



        should be the same as an AR(1)
        not tested yet
        '''
        t = np.linspace(ddt, nobs*ddt, nobs)
        #expnt = np.exp(-self.lambd * t)
        expddt = np.exp(-self.lambd * ddt)
        normrvs = np.random.normal(size=(nrepl,nobs))
        #do I need lfilter here AR(1) ? if mean reverting lag-coeff<1
        #lfilter does not handle 2d arrays, it does?
        inc = self._exactconst(expddt) + self._exactstd(expddt) * normrvs
        return signal.lfilter([1.], [1.,-expddt], inc)

    def exactdist(self, xzero, t):
        expnt = np.exp(-self.lambd * t)
        meant = xzero * expnt + self._exactconst(expnt)
        stdt = self._exactstd(expnt)
        return stats.norm(loc=meant, scale=stdt)

class ArithmeticBrownian(AffineDiffusion):
    '''
    :math::
    dx_t &= \\mu dt + \\sigma dW_t
    '''

    def __init__(self, xzero, mu, sigma):
        self.xzero = xzero
        self.mu = mu
        self.sigma = sigma

    def _drift(self, *args, **kwds):
        return self.mu
    def _sig(self, *args, **kwds):
        return self.sigma
    def exactprocess(self, nobs, xzero=None, ddt=1., nrepl=2):
        '''ddt : discrete delta t

        not tested yet
        '''
        if xzero is None:
            xzero = self.xzero
        t = np.linspace(ddt, nobs*ddt, nobs)
        normrvs = np.random.normal(size=(nrepl,nobs))
        inc = self._drift + self._sigma * np.sqrt(ddt) * normrvs
        #return signal.lfilter([1.], [1.,-1], inc)
        return xzero + np.cumsum(inc,1)

    def exactdist(self, xzero, t):
        expnt = np.exp(-self.lambd * t)
        meant = self._drift * t
        stdt = self._sigma * np.sqrt(t)
        return stats.norm(loc=meant, scale=stdt)


class GeometricBrownian(AffineDiffusion):
    '''Geometric Brownian Motion

    :math::
    dx_t &= \\mu x_t dt + \\sigma x_t dW_t

    $x_t $ stochastic process of Geometric Brownian motion,
    $\\mu $ is the drift,
    $\\sigma $ is the Volatility,
    $W$ is the Wiener process (Brownian motion).

    '''
    def __init__(self, xzero, mu, sigma):
        self.xzero = xzero
        self.mu = mu
        self.sigma = sigma

    def _drift(self, *args, **kwds):
        x = kwds['x']
        return self.mu * x
    def _sig(self, *args, **kwds):
        x = kwds['x']
        return self.sigma * x


class OUprocess(AffineDiffusion):
    '''Ornstein-Uhlenbeck

    :math::
      dx_t&=\\lambda(\\mu - x_t)dt+\\sigma dW_t

    mean reverting process



    TODO: move exact higher up in class hierarchy
    '''
    def __init__(self, xzero, mu, lambd, sigma):
        self.xzero = xzero
        self.lambd = lambd
        self.mu = mu
        self.sigma = sigma

    def _drift(self, *args, **kwds):
        x = kwds['x']
        return self.lambd * (self.mu - x)
    def _sig(self, *args, **kwds):
        x = kwds['x']
        return self.sigma * x
    def exact(self, xzero, t, normrvs):
        #TODO: aggregate over time for process with observations for all t
        #      i.e. exact conditional distribution for discrete time increment
        #      -> exactprocess
        #TODO: for single t, return stats.norm -> exactdist
        expnt = np.exp(-self.lambd * t)
        return (xzero * expnt + self.mu * (1-expnt) +
                self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd) * normrvs)

    def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
        '''ddt : discrete delta t

        should be the same as an AR(1)
        not tested yet
        # after writing this I saw the same use of lfilter in sitmo
        '''
        t = np.linspace(ddt, nobs*ddt, nobs)
        expnt = np.exp(-self.lambd * t)
        expddt = np.exp(-self.lambd * ddt)
        normrvs = np.random.normal(size=(nrepl,nobs))
        #do I need lfilter here AR(1) ? lfilter does not handle 2d arrays, it does?
        from scipy import signal
        #xzero * expnt
        inc = ( self.mu * (1-expddt) +
                self.sigma * np.sqrt((1-expddt*expddt)/2./self.lambd) * normrvs )

        return signal.lfilter([1.], [1.,-expddt], inc)


    def exactdist(self, xzero, t):
        #TODO: aggregate over time for process with observations for all t
        #TODO: for single t, return stats.norm
        expnt = np.exp(-self.lambd * t)
        meant = xzero * expnt + self.mu * (1-expnt)
        stdt = self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd)
        from scipy import stats
        return stats.norm(loc=meant, scale=stdt)

    def fitls(self, data, dt):
        '''assumes data is 1d, univariate time series
        formula from sitmo
        '''
        # brute force, no parameter estimation errors
        nobs = len(data)-1
        exog = np.column_stack((np.ones(nobs), data[:-1]))
        parest, res, rank, sing = np.linalg.lstsq(exog, data[1:], rcond=-1)
        const, slope = parest
        errvar = res/(nobs-2.)
        lambd = -np.log(slope)/dt
        sigma = np.sqrt(-errvar * 2.*np.log(slope)/ (1-slope**2)/dt)
        mu = const / (1-slope)
        return mu, lambd, sigma


class SchwartzOne(ExactDiffusion):
    '''the Schwartz type 1 stochastic process

    :math::
    dx_t = \\kappa (\\mu - \\ln x_t) x_t dt + \\sigma x_tdW \\

    The Schwartz type 1 process is a log of the Ornstein-Uhlenbeck stochastic
    process.

    '''

    def __init__(self, xzero, mu, kappa, sigma):
        self.xzero = xzero
        self.mu = mu
        self.kappa = kappa
        self.lambd = kappa #alias until I fix exact
Loading ...