Repository URL to install this package:
| 
          
        
        Version: 
           
    
          0.10.2  ▾
        
   | 
'''getting started with diffusions, continuous time stochastic processes
Author: josef-pktd
License: BSD
References
----------
An Algorithmic Introduction to Numerical Simulation of Stochastic Differential
Equations
Author(s): Desmond J. Higham
Source: SIAM Review, Vol. 43, No. 3 (Sep., 2001), pp. 525-546
Published by: Society for Industrial and Applied Mathematics
Stable URL: http://www.jstor.org/stable/3649798
http://www.sitmo.com/  especially the formula collection
Notes
-----
OU process: use same trick for ARMA with constant (non-zero mean) and drift
some of the processes have easy multivariate extensions
*Open Issues*
include xzero in returned sample or not? currently not
*TODOS*
* Milstein from Higham paper, for which processes does it apply
* Maximum Likelihood estimation
* more statistical properties (useful for tests)
* helper functions for display and MonteCarlo summaries (also for testing/checking)
* more processes for the menagerie (e.g. from empirical papers)
* characteristic functions
* transformations, non-linear e.g. log
* special estimators, e.g. Ait Sahalia, empirical characteristic functions
* fft examples
* check naming of methods, "simulate", "sample", "simexact", ... ?
stochastic volatility models: estimation unclear
finance applications ? option pricing, interest rate models
'''
from __future__ import print_function
import numpy as np
from scipy import stats, signal
import matplotlib.pyplot as plt
#np.random.seed(987656789)
class Diffusion(object):
    '''Wiener Process, Brownian Motion with mu=0 and sigma=1
    '''
    def __init__(self):
        pass
    def simulateW(self, nobs=100, T=1, dt=None, nrepl=1):
        '''generate sample of Wiener Process
        '''
        dt = T*1.0/nobs
        t = np.linspace(dt, 1, nobs)
        dW = np.sqrt(dt)*np.random.normal(size=(nrepl, nobs))
        W = np.cumsum(dW,1)
        self.dW = dW
        return W, t
    def expectedsim(self, func, nobs=100, T=1, dt=None, nrepl=1):
        '''get expectation of a function of a Wiener Process by simulation
        initially test example from
        '''
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        U = func(t, W)
        Umean = U.mean(0)
        return U, Umean, t
class AffineDiffusion(Diffusion):
    r'''
    differential equation:
    :math::
    dx_t = f(t,x)dt + \sigma(t,x)dW_t
    integral:
    :math::
    x_T = x_0 + \int_{0}^{T}f(t,S)dt + \int_0^T  \sigma(t,S)dW_t
    TODO: check definition, affine, what about jump diffusion?
    '''
    def __init__(self):
        pass
    def sim(self, nobs=100, T=1, dt=None, nrepl=1):
        # this doesn't look correct if drift or sig depend on x
        # see arithmetic BM
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        dx =  self._drift() + self._sig() * W
        x  = np.cumsum(dx,1)
        xmean = x.mean(0)
        return x, xmean, t
    def simEM(self, xzero=None, nobs=100, T=1, dt=None, nrepl=1, Tratio=4):
        '''
        from Higham 2001
        TODO: reverse parameterization to start with final nobs and DT
        TODO: check if I can skip the loop using my way from exactprocess
              problem might be Winc (reshape into 3d and sum)
        TODO: (later) check memory efficiency for large simulations
        '''
        #TODO: reverse parameterization to start with final nobs and DT
        nobs = nobs * Tratio  # simple way to change parameter
        # maybe wrong parameterization,
        # drift too large, variance too small ? which dt/Dt
        # _drift, _sig independent of dt is wrong
        if xzero is None:
            xzero = self.xzero
        if dt is None:
            dt = T*1.0/nobs
        W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
        dW = self.dW
        t = np.linspace(dt, 1, nobs)
        Dt = Tratio*dt
        L = nobs/Tratio        # L EM steps of size Dt = R*dt
        Xem = np.zeros((nrepl,L))    # preallocate for efficiency
        Xtemp = xzero
        Xem[:,0] = xzero
        for j in np.arange(1,L):
            #Winc = np.sum(dW[:,Tratio*(j-1)+1:Tratio*j],1)
            Winc = np.sum(dW[:,np.arange(Tratio*(j-1)+1,Tratio*j)],1)
            #Xtemp = Xtemp + Dt*lamda*Xtemp + mu*Xtemp*Winc;
            Xtemp = Xtemp + self._drift(x=Xtemp) + self._sig(x=Xtemp) * Winc
            #Dt*lamda*Xtemp + mu*Xtemp*Winc;
            Xem[:,j] = Xtemp
        return Xem
'''
    R = 4; Dt = R*dt; L = N/R;        % L EM steps of size Dt = R*dt
    Xem = zeros(1,L);                 % preallocate for efficiency
    Xtemp = Xzero;
    for j = 1:L
       Winc = sum(dW(R*(j-1)+1:R*j));
       Xtemp = Xtemp + Dt*lambda*Xtemp + mu*Xtemp*Winc;
       Xem(j) = Xtemp;
    end
'''
class ExactDiffusion(AffineDiffusion):
    '''Diffusion that has an exact integral representation
    this is currently mainly for geometric, log processes
    '''
    def __init__(self):
        pass
    def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
        '''ddt : discrete delta t
        should be the same as an AR(1)
        not tested yet
        '''
        t = np.linspace(ddt, nobs*ddt, nobs)
        #expnt = np.exp(-self.lambd * t)
        expddt = np.exp(-self.lambd * ddt)
        normrvs = np.random.normal(size=(nrepl,nobs))
        #do I need lfilter here AR(1) ? if mean reverting lag-coeff<1
        #lfilter doesn't handle 2d arrays, it does?
        inc = self._exactconst(expddt) + self._exactstd(expddt) * normrvs
        return signal.lfilter([1.], [1.,-expddt], inc)
    def exactdist(self, xzero, t):
        expnt = np.exp(-self.lambd * t)
        meant = xzero * expnt + self._exactconst(expnt)
        stdt = self._exactstd(expnt)
        return stats.norm(loc=meant, scale=stdt)
class ArithmeticBrownian(AffineDiffusion):
    '''
    :math::
    dx_t &= \\mu dt + \\sigma dW_t
    '''
    def __init__(self, xzero, mu, sigma):
        self.xzero = xzero
        self.mu = mu
        self.sigma = sigma
    def _drift(self, *args, **kwds):
        return self.mu
    def _sig(self, *args, **kwds):
        return self.sigma
    def exactprocess(self, nobs, xzero=None, ddt=1., nrepl=2):
        '''ddt : discrete delta t
        not tested yet
        '''
        if xzero is None:
            xzero = self.xzero
        t = np.linspace(ddt, nobs*ddt, nobs)
        normrvs = np.random.normal(size=(nrepl,nobs))
        inc = self._drift + self._sigma * np.sqrt(ddt) * normrvs
        #return signal.lfilter([1.], [1.,-1], inc)
        return xzero + np.cumsum(inc,1)
    def exactdist(self, xzero, t):
        expnt = np.exp(-self.lambd * t)
        meant = self._drift * t
        stdt = self._sigma * np.sqrt(t)
        return stats.norm(loc=meant, scale=stdt)
class GeometricBrownian(AffineDiffusion):
    '''Geometric Brownian Motion
    :math::
    dx_t &= \\mu x_t dt + \\sigma x_t dW_t
    $x_t $ stochastic process of Geometric Brownian motion,
    $\\mu $ is the drift,
    $\\sigma $ is the Volatility,
    $W$ is the Wiener process (Brownian motion).
    '''
    def __init__(self, xzero, mu, sigma):
        self.xzero = xzero
        self.mu = mu
        self.sigma = sigma
    def _drift(self, *args, **kwds):
        x = kwds['x']
        return self.mu * x
    def _sig(self, *args, **kwds):
        x = kwds['x']
        return self.sigma * x
class OUprocess(AffineDiffusion):
    '''Ornstein-Uhlenbeck
    :math::
      dx_t&=\\lambda(\\mu - x_t)dt+\\sigma dW_t
    mean reverting process
    TODO: move exact higher up in class hierarchy
    '''
    def __init__(self, xzero, mu, lambd, sigma):
        self.xzero = xzero
        self.lambd = lambd
        self.mu = mu
        self.sigma = sigma
    def _drift(self, *args, **kwds):
        x = kwds['x']
        return self.lambd * (self.mu - x)
    def _sig(self, *args, **kwds):
        x = kwds['x']
        return self.sigma * x
    def exact(self, xzero, t, normrvs):
        #TODO: aggregate over time for process with observations for all t
        #      i.e. exact conditional distribution for discrete time increment
        #      -> exactprocess
        #TODO: for single t, return stats.norm -> exactdist
        expnt = np.exp(-self.lambd * t)
        return (xzero * expnt + self.mu * (1-expnt) +
                self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd) * normrvs)
    def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
        '''ddt : discrete delta t
        should be the same as an AR(1)
        not tested yet
        # after writing this I saw the same use of lfilter in sitmo
        '''
        t = np.linspace(ddt, nobs*ddt, nobs)
        expnt = np.exp(-self.lambd * t)
        expddt = np.exp(-self.lambd * ddt)
        normrvs = np.random.normal(size=(nrepl,nobs))
        #do I need lfilter here AR(1) ? lfilter doesn't handle 2d arrays, it does?
        from scipy import signal
        #xzero * expnt
        inc = ( self.mu * (1-expddt) +
                self.sigma * np.sqrt((1-expddt*expddt)/2./self.lambd) * normrvs )
        return signal.lfilter([1.], [1.,-expddt], inc)
    def exactdist(self, xzero, t):
        #TODO: aggregate over time for process with observations for all t
        #TODO: for single t, return stats.norm
        expnt = np.exp(-self.lambd * t)
        meant = xzero * expnt + self.mu * (1-expnt)
        stdt = self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd)
        from scipy import stats
        return stats.norm(loc=meant, scale=stdt)
    def fitls(self, data, dt):
        '''assumes data is 1d, univariate time series
        formula from sitmo
        '''
        # brute force, no parameter estimation errors
        nobs = len(data)-1
        exog = np.column_stack((np.ones(nobs), data[:-1]))
        parest, res, rank, sing = np.linalg.lstsq(exog, data[1:], rcond=-1)
        const, slope = parest
        errvar = res/(nobs-2.)
        lambd = -np.log(slope)/dt
        sigma = np.sqrt(-errvar * 2.*np.log(slope)/ (1-slope**2)/dt)
        mu = const / (1-slope)
        return mu, lambd, sigma
class SchwartzOne(ExactDiffusion):
    '''the Schwartz type 1 stochastic process
    :math::
    dx_t = \\kappa (\\mu - \\ln x_t) x_t dt + \\sigma x_tdW \\
    The Schwartz type 1 process is a log of the Ornstein-Uhlenbeck stochastic
    process.
    '''
    def __init__(self, xzero, mu, kappa, sigma):
        self.xzero = xzero
        self.mu = mu
        self.kappa = kappa
        self.lambd = kappa #alias until I fix exact
        self.sigma = sigma
    def _exactconst(self, expnt):
        return (1-expnt) * (self.mu - self.sigma**2 / 2. /self.kappa)
    def _exactstd(self, expnt):
        return self.sigma * np.sqrt((1-expnt*expnt)/2./self.kappa)
    def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
        '''uses exact solution for log of process
        '''
        lnxzero = np.log(xzero)
        lnx = super(self.__class__, self).exactprocess(xzero, nobs, ddt=ddt, nrepl=nrepl)
        return np.exp(lnx)
    def exactdist(self, xzero, t):
        expnt = np.exp(-self.lambd * t)
        #TODO: check this is still wrong, just guessing
        meant = np.log(xzero) * expnt + self._exactconst(expnt)
        stdt = self._exactstd(expnt)
        return stats.lognorm(loc=meant, scale=stdt)
    def fitls(self, data, dt):
        '''assumes data is 1d, univariate time series
        formula from sitmo
        '''
        # brute force, no parameter estimation errors
        nobs = len(data)-1
        exog = np.column_stack((np.ones(nobs),np.log(data[:-1])))
        parest, res, rank, sing = np.linalg.lstsq(exog, np.log(data[1:]), rcond=-1)
        const, slope = parest
        errvar = res/(nobs-2.)  #check denominator estimate, of sigma too low
        kappa = -np.log(slope)/dt
        sigma = np.sqrt(errvar * kappa / (1-np.exp(-2*kappa*dt)))
        mu = const / (1-np.exp(-kappa*dt)) + sigma**2/2./kappa
        if np.shape(mu)== (1,): mu = mu[0]   # how to remove scalar array ?
        if np.shape(sigma)== (1,): sigma = sigma[0]
        #mu, kappa are good, sigma too small
        return mu, kappa, sigma
class BrownianBridge(object):
    def __init__(self):
        pass
    def simulate(self, x0, x1, nobs, nrepl=1, ddt=1., sigma=1.):
        nobs=nobs+1
        dt = ddt*1./nobs
        t = np.linspace(dt, ddt-dt, nobs)
        t = np.linspace(dt, ddt, nobs)
        wm = [t/ddt, 1-t/ddt]
        #wmi = wm[1]
        #wm1 = x1*wm[0]
        wmi = 1-dt/(ddt-t)
        wm1 = x1*(dt/(ddt-t))
        su = sigma* np.sqrt(t*(1-t)/ddt)
        s = sigma* np.sqrt(dt*(ddt-t-dt)/(ddt-t))
        x = np.zeros((nrepl, nobs))
        x[:,0] = x0
        rvs = s*np.random.normal(size=(nrepl,nobs))
        for i in range(1,nobs):
            x[:,i] = x[:,i-1]*wmi[i] + wm1[i] + rvs[:,i]
        return x, t, su
class CompoundPoisson(object):
    '''nobs iid compound poisson distributions, not a process in time
    '''
    def __init__(self, lambd, randfn=np.random.normal):
        if len(lambd) != len(randfn):
            raise ValueError('lambd and randfn need to have the same number of elements')
        self.nobj = len(lambd)
        self.randfn = randfn
        self.lambd = np.asarray(lambd)
    def simulate(self, nobs, nrepl=1):
        nobj = self.nobj
        x = np.zeros((nrepl, nobs, nobj))
        N = np.random.poisson(self.lambd[None,None,:], size=(nrepl,nobs,nobj))
        for io in range(nobj):
            randfnc = self.randfn[io]
            nc = N[:,:,io]
            #print nrepl,nobs,nc
            #xio = randfnc(size=(nrepl,nobs,np.max(nc))).cumsum(-1)[np.arange(nrepl)[:,None],np.arange(nobs),nc-1]
            rvs = randfnc(size=(nrepl,nobs,np.max(nc)))
            print('rvs.sum()', rvs.sum(), rvs.shape)
            xio = rvs.cumsum(-1)[np.arange(nrepl)[:,None],np.arange(nobs),nc-1]
            #print xio.shape
            x[:,:,io] = xio
        x[N==0] = 0
        return x, N
'''
randn('state',100)                                % set the state of randn
T = 1; N = 500; dt = T/N; t = [dt:dt:1];
M = 1000;                                         % M paths simultaneously
dW = sqrt(dt)*randn(M,N);                         % increments
W = cumsum(dW,2);                                 % cumulative sum
U = exp(repmat(t,[M 1]) + 0.5*W);
Umean = mean(U);
plot([0,t],[1,Umean],'b-'), hold on               % plot mean over M paths
plot([0,t],[ones(5,1),U(1:5,:)],'r--'), hold off  % plot 5 individual paths
xlabel('t','FontSize',16)
ylabel('U(t)','FontSize',16,'Rotation',0,'HorizontalAlignment','right')
legend('mean of 1000 paths','5 individual paths',2)
averr = norm((Umean - exp(9*t/8)),'inf')          % sample error
'''
if __name__ == '__main__':
    doplot = 1
    nrepl = 1000
    examples = []#['all']
    if 'all' in examples:
        w = Diffusion()
        # Wiener Process
        # ^^^^^^^^^^^^^^
        ws = w.simulateW(1000, nrepl=nrepl)
        if doplot:
            plt.figure()
            tmp = plt.plot(ws[0].T)
            tmp = plt.plot(ws[0].mean(0), linewidth=2)
            plt.title('Standard Brownian Motion (Wiener Process)')
        func = lambda t, W: np.exp(t + 0.5*W)
        us = w.expectedsim(func, nobs=500, nrepl=nrepl)
        if doplot:
            plt.figure()
            tmp = plt.plot(us[0].T)
            tmp = plt.plot(us[1], linewidth=2)
            plt.title('Brownian Motion - exp')
        #plt.show()
        averr = np.linalg.norm(us[1] - np.exp(9*us[2]/8.), np.inf)
        print(averr)
        #print us[1][:10]
        #print np.exp(9.*us[2][:10]/8.)
        # Geometric Brownian
        # ^^^^^^^^^^^^^^^^^^
        gb = GeometricBrownian(xzero=1., mu=0.01, sigma=0.5)
        gbs = gb.simEM(nobs=100, nrepl=100)
        if doplot:
            plt.figure()
            tmp = plt.plot(gbs.T)
            tmp = plt.plot(gbs.mean(0), linewidth=2)
            plt.title('Geometric Brownian')
            plt.figure()
            tmp = plt.plot(np.log(gbs).T)
            tmp = plt.plot(np.log(gbs.mean(0)), linewidth=2)
            plt.title('Geometric Brownian - log-transformed')
        ab = ArithmeticBrownian(xzero=1, mu=0.05, sigma=1)
        abs = ab.simEM(nobs=100, nrepl=100)
        if doplot:
            plt.figure()
            tmp = plt.plot(abs.T)
            tmp = plt.plot(abs.mean(0), linewidth=2)
            plt.title('Arithmetic Brownian')
        # Ornstein-Uhlenbeck
        # ^^^^^^^^^^^^^^^^^^
        ou = OUprocess(xzero=2, mu=1, lambd=0.5, sigma=0.1)
        ous = ou.simEM()
        oue = ou.exact(1, 1, np.random.normal(size=(5,10)))
        ou.exact(0, np.linspace(0,10,10/0.1), 0)
        ou.exactprocess(0,10)
        print(ou.exactprocess(0,10, ddt=0.1,nrepl=10).mean(0))
        #the following looks good, approaches mu
        oues = ou.exactprocess(0,100, ddt=0.1,nrepl=100)
        if doplot:
            plt.figure()
            tmp = plt.plot(oues.T)
            tmp = plt.plot(oues.mean(0), linewidth=2)
            plt.title('Ornstein-Uhlenbeck')
        # SchwartsOne
        # ^^^^^^^^^^^
        so = SchwartzOne(xzero=0, mu=1, kappa=0.5, sigma=0.1)
        sos = so.exactprocess(0,50, ddt=0.1,nrepl=100)
        print(sos.mean(0))
        print(np.log(sos.mean(0)))
        doplot = 1
        if doplot:
            plt.figure()
            tmp = plt.plot(sos.T)
            tmp = plt.plot(sos.mean(0), linewidth=2)
            plt.title('Schwartz One')
        print(so.fitls(sos[0,:],dt=0.1))
        sos2 = so.exactprocess(0,500, ddt=0.1,nrepl=5)
        print('true: mu=1, kappa=0.5, sigma=0.1')
        for i in range(5):
            print(so.fitls(sos2[i],dt=0.1))
        # Brownian Bridge
        # ^^^^^^^^^^^^^^^
        bb = BrownianBridge()
        #bbs = bb.sample(x0, x1, nobs, nrepl=1, ddt=1., sigma=1.)
        bbs, t, wm = bb.simulate(0, 0.5, 99, nrepl=500, ddt=1., sigma=0.1)
        if doplot:
            plt.figure()
            tmp = plt.plot(bbs.T)
            tmp = plt.plot(bbs.mean(0), linewidth=2)
            plt.title('Brownian Bridge')
            plt.figure()
            plt.plot(wm,'r', label='theoretical')
            plt.plot(bbs.std(0), label='simulated')
            plt.title('Brownian Bridge - Variance')
            plt.legend()
    # Compound Poisson
    # ^^^^^^^^^^^^^^^^
    cp = CompoundPoisson([1,1], [np.random.normal,np.random.normal])
    cps = cp.simulate(nobs=20000,nrepl=3)
    print(cps[0].sum(-1).sum(-1))
    print(cps[0].sum())
    print(cps[0].mean(-1).mean(-1))
    print(cps[0].mean())
    print(cps[1].size)
    print(cps[1].sum())
    #Note Y = sum^{N} X is compound poisson of iid x, then
    #E(Y) = E(N)*E(X)   eg. eq. (6.37) page 385 in http://ee.stanford.edu/~gray/sp.html
    #plt.show()