'''getting started with diffusions, continuous time stochastic processes
Author: josef-pktd
License: BSD
References
----------
An Algorithmic Introduction to Numerical Simulation of Stochastic Differential
Equations
Author(s): Desmond J. Higham
Source: SIAM Review, Vol. 43, No. 3 (Sep., 2001), pp. 525-546
Published by: Society for Industrial and Applied Mathematics
Stable URL: http://www.jstor.org/stable/3649798
http://www.sitmo.com/ especially the formula collection
Notes
-----
OU process: use same trick for ARMA with constant (non-zero mean) and drift
some of the processes have easy multivariate extensions
*Open Issues*
include xzero in returned sample or not? currently not
*TODOS*
* Milstein from Higham paper, for which processes does it apply
* Maximum Likelihood estimation
* more statistical properties (useful for tests)
* helper functions for display and MonteCarlo summaries (also for testing/checking)
* more processes for the menagerie (e.g. from empirical papers)
* characteristic functions
* transformations, non-linear e.g. log
* special estimators, e.g. Ait Sahalia, empirical characteristic functions
* fft examples
* check naming of methods, "simulate", "sample", "simexact", ... ?
stochastic volatility models: estimation unclear
finance applications ? option pricing, interest rate models
'''
import numpy as np
from scipy import stats, signal
import matplotlib.pyplot as plt
#np.random.seed(987656789)
class Diffusion(object):
'''Wiener Process, Brownian Motion with mu=0 and sigma=1
'''
def __init__(self):
pass
def simulateW(self, nobs=100, T=1, dt=None, nrepl=1):
'''generate sample of Wiener Process
'''
dt = T*1.0/nobs
t = np.linspace(dt, 1, nobs)
dW = np.sqrt(dt)*np.random.normal(size=(nrepl, nobs))
W = np.cumsum(dW,1)
self.dW = dW
return W, t
def expectedsim(self, func, nobs=100, T=1, dt=None, nrepl=1):
'''get expectation of a function of a Wiener Process by simulation
initially test example from
'''
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
U = func(t, W)
Umean = U.mean(0)
return U, Umean, t
class AffineDiffusion(Diffusion):
r'''
differential equation:
:math::
dx_t = f(t,x)dt + \sigma(t,x)dW_t
integral:
:math::
x_T = x_0 + \int_{0}^{T}f(t,S)dt + \int_0^T \sigma(t,S)dW_t
TODO: check definition, affine, what about jump diffusion?
'''
def __init__(self):
pass
def sim(self, nobs=100, T=1, dt=None, nrepl=1):
# this does not look correct if drift or sig depend on x
# see arithmetic BM
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
dx = self._drift() + self._sig() * W
x = np.cumsum(dx,1)
xmean = x.mean(0)
return x, xmean, t
def simEM(self, xzero=None, nobs=100, T=1, dt=None, nrepl=1, Tratio=4):
'''
from Higham 2001
TODO: reverse parameterization to start with final nobs and DT
TODO: check if I can skip the loop using my way from exactprocess
problem might be Winc (reshape into 3d and sum)
TODO: (later) check memory efficiency for large simulations
'''
#TODO: reverse parameterization to start with final nobs and DT
nobs = nobs * Tratio # simple way to change parameter
# maybe wrong parameterization,
# drift too large, variance too small ? which dt/Dt
# _drift, _sig independent of dt is wrong
if xzero is None:
xzero = self.xzero
if dt is None:
dt = T*1.0/nobs
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
dW = self.dW
t = np.linspace(dt, 1, nobs)
Dt = Tratio*dt
L = nobs/Tratio # L EM steps of size Dt = R*dt
Xem = np.zeros((nrepl,L)) # preallocate for efficiency
Xtemp = xzero
Xem[:,0] = xzero
for j in np.arange(1,L):
#Winc = np.sum(dW[:,Tratio*(j-1)+1:Tratio*j],1)
Winc = np.sum(dW[:,np.arange(Tratio*(j-1)+1,Tratio*j)],1)
#Xtemp = Xtemp + Dt*lamda*Xtemp + mu*Xtemp*Winc;
Xtemp = Xtemp + self._drift(x=Xtemp) + self._sig(x=Xtemp) * Winc
#Dt*lamda*Xtemp + mu*Xtemp*Winc;
Xem[:,j] = Xtemp
return Xem
'''
R = 4; Dt = R*dt; L = N/R; % L EM steps of size Dt = R*dt
Xem = zeros(1,L); % preallocate for efficiency
Xtemp = Xzero;
for j = 1:L
Winc = sum(dW(R*(j-1)+1:R*j));
Xtemp = Xtemp + Dt*lambda*Xtemp + mu*Xtemp*Winc;
Xem(j) = Xtemp;
end
'''
class ExactDiffusion(AffineDiffusion):
'''Diffusion that has an exact integral representation
this is currently mainly for geometric, log processes
'''
def __init__(self):
pass
def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
'''ddt : discrete delta t
should be the same as an AR(1)
not tested yet
'''
t = np.linspace(ddt, nobs*ddt, nobs)
#expnt = np.exp(-self.lambd * t)
expddt = np.exp(-self.lambd * ddt)
normrvs = np.random.normal(size=(nrepl,nobs))
#do I need lfilter here AR(1) ? if mean reverting lag-coeff<1
#lfilter does not handle 2d arrays, it does?
inc = self._exactconst(expddt) + self._exactstd(expddt) * normrvs
return signal.lfilter([1.], [1.,-expddt], inc)
def exactdist(self, xzero, t):
expnt = np.exp(-self.lambd * t)
meant = xzero * expnt + self._exactconst(expnt)
stdt = self._exactstd(expnt)
return stats.norm(loc=meant, scale=stdt)
class ArithmeticBrownian(AffineDiffusion):
'''
:math::
dx_t &= \\mu dt + \\sigma dW_t
'''
def __init__(self, xzero, mu, sigma):
self.xzero = xzero
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
return self.mu
def _sig(self, *args, **kwds):
return self.sigma
def exactprocess(self, nobs, xzero=None, ddt=1., nrepl=2):
'''ddt : discrete delta t
not tested yet
'''
if xzero is None:
xzero = self.xzero
t = np.linspace(ddt, nobs*ddt, nobs)
normrvs = np.random.normal(size=(nrepl,nobs))
inc = self._drift + self._sigma * np.sqrt(ddt) * normrvs
#return signal.lfilter([1.], [1.,-1], inc)
return xzero + np.cumsum(inc,1)
def exactdist(self, xzero, t):
expnt = np.exp(-self.lambd * t)
meant = self._drift * t
stdt = self._sigma * np.sqrt(t)
return stats.norm(loc=meant, scale=stdt)
class GeometricBrownian(AffineDiffusion):
'''Geometric Brownian Motion
:math::
dx_t &= \\mu x_t dt + \\sigma x_t dW_t
$x_t $ stochastic process of Geometric Brownian motion,
$\\mu $ is the drift,
$\\sigma $ is the Volatility,
$W$ is the Wiener process (Brownian motion).
'''
def __init__(self, xzero, mu, sigma):
self.xzero = xzero
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
x = kwds['x']
return self.mu * x
def _sig(self, *args, **kwds):
x = kwds['x']
return self.sigma * x
class OUprocess(AffineDiffusion):
'''Ornstein-Uhlenbeck
:math::
dx_t&=\\lambda(\\mu - x_t)dt+\\sigma dW_t
mean reverting process
TODO: move exact higher up in class hierarchy
'''
def __init__(self, xzero, mu, lambd, sigma):
self.xzero = xzero
self.lambd = lambd
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
x = kwds['x']
return self.lambd * (self.mu - x)
def _sig(self, *args, **kwds):
x = kwds['x']
return self.sigma * x
def exact(self, xzero, t, normrvs):
#TODO: aggregate over time for process with observations for all t
# i.e. exact conditional distribution for discrete time increment
# -> exactprocess
#TODO: for single t, return stats.norm -> exactdist
expnt = np.exp(-self.lambd * t)
return (xzero * expnt + self.mu * (1-expnt) +
self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd) * normrvs)
def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
'''ddt : discrete delta t
should be the same as an AR(1)
not tested yet
# after writing this I saw the same use of lfilter in sitmo
'''
t = np.linspace(ddt, nobs*ddt, nobs)
expnt = np.exp(-self.lambd * t)
expddt = np.exp(-self.lambd * ddt)
normrvs = np.random.normal(size=(nrepl,nobs))
#do I need lfilter here AR(1) ? lfilter does not handle 2d arrays, it does?
from scipy import signal
#xzero * expnt
inc = ( self.mu * (1-expddt) +
self.sigma * np.sqrt((1-expddt*expddt)/2./self.lambd) * normrvs )
return signal.lfilter([1.], [1.,-expddt], inc)
def exactdist(self, xzero, t):
#TODO: aggregate over time for process with observations for all t
#TODO: for single t, return stats.norm
expnt = np.exp(-self.lambd * t)
meant = xzero * expnt + self.mu * (1-expnt)
stdt = self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd)
from scipy import stats
return stats.norm(loc=meant, scale=stdt)
def fitls(self, data, dt):
'''assumes data is 1d, univariate time series
formula from sitmo
'''
# brute force, no parameter estimation errors
nobs = len(data)-1
exog = np.column_stack((np.ones(nobs), data[:-1]))
parest, res, rank, sing = np.linalg.lstsq(exog, data[1:], rcond=-1)
const, slope = parest
errvar = res/(nobs-2.)
lambd = -np.log(slope)/dt
sigma = np.sqrt(-errvar * 2.*np.log(slope)/ (1-slope**2)/dt)
mu = const / (1-slope)
return mu, lambd, sigma
class SchwartzOne(ExactDiffusion):
'''the Schwartz type 1 stochastic process
:math::
dx_t = \\kappa (\\mu - \\ln x_t) x_t dt + \\sigma x_tdW \\
The Schwartz type 1 process is a log of the Ornstein-Uhlenbeck stochastic
process.
'''
def __init__(self, xzero, mu, kappa, sigma):
self.xzero = xzero
self.mu = mu
self.kappa = kappa
self.lambd = kappa #alias until I fix exact
Loading ...