Source code for timewave.stochasticprocess.gauss

# -*- coding: utf-8 -*-

# timewave
# --------
# timewave, a stochastic process evolution simulation engine in python.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.6, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/timewave
# License:  Apache License 2.0 (see LICENSE file)


from math import sqrt, exp, log

from .base import StochasticProcess


[docs]class WienerProcess(StochasticProcess): """ class implementing general Gauss process between grid dates """ def __init__(self, mu=0., sigma=1., start=0.): super(WienerProcess, self).__init__(start) self._mu = mu self._sigma = sigma def __str__(self): return 'N(mu=%0.4f, sigma=%0.4f)' % (self._mu, self._sigma) def _drift(self, x, s, e): return self._mu * (e - s) def _diffusion(self, x, s, e): return self._sigma * sqrt(e - s)
[docs] def evolve(self, x, s, e, q): return x + self._drift(x, s, e) + self._diffusion(x, s, e) * q
[docs] def mean(self, t): return self.start + self._drift(0., 0., t)
[docs] def median(self, t): return self.mean(t)
[docs] def variance(self, t): return self._diffusion(0., 0, t) ** 2
[docs]class OrnsteinUhlenbeckProcess(StochasticProcess): """ class implementing Ornstein Uhlenbeck process """ def __init__(self, theta=0.1, mu=0.1, sigma=0.1, start=0.0): r""" :param flaot theta: mean reversion speed :param float mu: drift :param float sigma: diffusion :param float start: initial value .. math:: dx_t = \theta ( \mu - x_t) dt + \sigma dW_t, x_0 = a """ super(OrnsteinUhlenbeckProcess, self).__init__(start) self._theta = theta self._mu = mu self._sigma = sigma def __str__(self): return 'OU(theta=%0.4f, mu=%0.4f, sigma=%0.4f)' % (self._theta, self._mu, self._sigma) def _drift(self, x, s, e): if self._theta: return x * exp(-self._theta * float(e - s)) + self._mu * (1. - exp(-self._theta * float(e - s))) else: return x def _diffusion(self, x, s, e): # return self._sigma * (1. - exp(-self._theta * float(e - s))) return sqrt(self.variance(float(e - s)))
[docs] def evolve(self, x, s, e, q): return self._drift(x, s, e) + self._diffusion(x, s, e) * q
[docs] def mean(self, t): return self._drift(self.start, 0., t)
[docs] def variance(self, t): if self._theta: return self._sigma * self._sigma * (1. - exp(-2. * self._theta * t)) * .5 / self._theta else: return t
[docs]class GeometricBrownianMotion(WienerProcess): """ class implementing general Gauss process between grid dates """ def __init__(self, mu=0., sigma=1., start=1.): super(GeometricBrownianMotion, self).__init__(mu, sigma, start) self._diffusion_driver = super(GeometricBrownianMotion, self).diffusion_driver def __str__(self): return 'LN(mu=%0.4f, sigma=%0.4f)' % (self._mu, self._sigma)
[docs] def evolve(self, x, s, e, q): return x * exp(super(GeometricBrownianMotion, self).evolve(0., s, e, q))
[docs] def mean(self, t): return self.start * exp(self._drift(0., 0., t) + 0.5 * self._diffusion(0., 0., t) ** 2)
[docs] def median(self, t): return self.start * exp(self._drift(0., 0., t))
[docs] def variance(self, t): return (self.mean(t) ** 2) * (exp(self._diffusion(0., 0., t) ** 2) - 1)
[docs] def skewness(self, t): es = exp(self._diffusion(0., 0., t) ** 2) return (es + 2.) * sqrt(es - 1)
[docs] def kurtosis(self, t): es = exp(self._diffusion(0., 0., t) ** 2) return es ** 4 + 2 * es ** 3 + 3 * es ** 2 - 6
[docs]class TimeDependentParameter(object): def __init__(self, parameter=0.0, time=1.0): if isinstance(parameter, float): func = (lambda x: parameter) elif isinstance(parameter, (tuple, list)): if isinstance(time, float): time = list(float(i) * time for i, s in enumerate(parameter)) func = (lambda x: max(s for t, s in zip(time, parameter) if t <= x)) else: func = (lambda x: parameter(x)) self._parameter = parameter self._time = time self._func = func def __call__(self, x): return self._func(x) def __str__(self): if isinstance(self._parameter, float): return str(self._parameter) elif isinstance(self._time, (tuple, list)): return '%f ... %f' % (self(self._time[0]), self(self._time[-1])) else: return '%f ...' % self(0.)
[docs] def integrate(self, s, e): if e < s: return self.integrate(e, s) elif e == s: return 0.0 if isinstance(self._parameter, float): return self._parameter * (e - s) if isinstance(self._time, (tuple, list)): time = [s] + [t for t in self._time if s < t < e] + [e] else: time = [s + self._time * i for i in range(int((e - s) / self._time))] + [e] return sum(self(x) * (y - x) for x, y in zip(time[:-1], time[1:]))
[docs]class TimeDependentWienerProcess(WienerProcess): """ class implementing a Gauss process with time depending drift and diffusion """ def __init__(self, mu=0., sigma=1., time=1., start=0.): super(TimeDependentWienerProcess, self).__init__(mu, sigma, start) # init time if isinstance(time, (tuple, list)): self._time = time else: self._time = float(time) # init mu and sigma self._mu = TimeDependentParameter(mu, time) self._sigma = TimeDependentParameter(sigma, time) if isinstance(sigma, float): var = sigma ** 2 elif isinstance(sigma, (tuple, list)): var = tuple(x ** 2 for x in sigma) else: var = (lambda x: self._sigma(x) ** 2) self._variance = TimeDependentParameter(var, time) self._diffusion_driver = super(TimeDependentWienerProcess, self).diffusion_driver def __str__(self): return 'term-N(mu=%s, sigma=%s)' % (str(self._mu), str(self._sigma)) def _drift(self, x, s, e): return self._mu.integrate(s, e) def _diffusion(self, x, s, e): return sqrt(self._variance.integrate(s, e))
[docs]class TimeDependentGeometricBrownianMotion(GeometricBrownianMotion): def __init__(self, mu=0., sigma=1., time=1., start=1.): super(TimeDependentGeometricBrownianMotion, self).__init__(mu, sigma, start) # init time if isinstance(time, (tuple, list)): self._time = time else: self._time = float(time) # init mu and sigma self._mu = TimeDependentParameter(mu, time) self._sigma = TimeDependentParameter(sigma, time) if isinstance(sigma, float): var = sigma ** 2 elif isinstance(sigma, (tuple, list)): var = tuple(x ** 2 for x in sigma) else: var = (lambda x: self._sigma(x) ** 2) self._variance = TimeDependentParameter(var, time) self._diffusion_driver = super(TimeDependentGeometricBrownianMotion, self).diffusion_driver def __str__(self): return 'term-LN(mu=%s, sigma=%s)' % (str(self._mu), str(self._sigma)) def _drift(self, x, s, e): return self._mu.integrate(s, e) def _diffusion(self, x, s, e): return sqrt(self._variance.integrate(s, e))