Source code for timewave.stochasticprocess.markovchain

# -*- coding: utf-8 -*-

# timewave
# --------
# timewave, a stochastic process evolution simulation engine in python.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.6, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/timewave
# License:  Apache License 2.0 (see LICENSE file)


from math import sqrt
from random import Random

import numpy as np

from scipy.stats import norm
from scipy.linalg import expm, logm

from .base import StochasticProcess

EPS = 1e-7


[docs]class FiniteStateMarkovChain(StochasticProcess): @property def transition(self): return self._transition_matrix.tolist() @property def r_squared(self): return self._r_squared
[docs] @classmethod def random(cls, d=5): # pick random vector and matrix with positive values s = np.random.random((1, d)) t = np.random.random((d, d)) # turn them into stochastic ones s = s / s.sum(1) t = t / t.sum(1).reshape((d, 1)) # build process instance return cls(transition=t.tolist(), r_squared=1., start=list(s.flat))
def __init__(self, transition=None, r_squared=1., start=None): """ :param list transition: stochastic matrix of transition probabilities, i.e. np.ndarray with shape=2 and sum of each line equal to 1 (optional) default: identity matrix :param float r_squared: square of systematic correlation in factor simulation (optional) default: 1. :param list start: initial state distribution, i.e. np.ndarray with shape=1 or list, adding up to 1, (optional) default: unique distribution """ # if any argument is missing use defaults if transition is None and start is None: dim = 2 else: dim = len(transition) if start is None else len(start) start = (np.ones(dim) / dim).tolist() if start is None else list(start) transition = np.identity(dim) if transition is None else transition self._transition_matrix = np.matrix(transition, float) self._r_squared = r_squared self._idiosyncratic_random = Random() super(FiniteStateMarkovChain, self).__init__(start) # validate argument shapes in shapes and sum for being stochastic if not abs(self._transition_matrix.sum(1) - 1.).sum() < EPS: raise ValueError('transition probabilities do not from distribution.\n' + str(self._transition_matrix)) if not abs(sum(self.start) - 1.) < EPS: raise ValueError('start does not from distribution.\n' + str(self.start)) if not (len(self.start), len(self.start)) == self._transition_matrix.shape: msg = 'dimension of transition and start argument must meet.\n' \ + str(self.start) + '\n' + str(self._transition_matrix) raise ValueError(msg) def __len__(self): return 1 def __str__(self): return self.__class__.__name__ + '(dim=%d)' % len(self.start) def _m_pow(self, t, s=0.): return self._transition_matrix ** int(t - s)
[docs] def evolve(self, x, s, e, q): q = sqrt(self._r_squared) * q + sqrt(1. - self._r_squared) * self._idiosyncratic_random.gauss(0., 1.) p = norm.cdf(q) m = self._m_pow(e, s) mm = np.greater(m.cumsum(1), p) * 1. mm[0:, 1:] -= mm[0:, :-1] return list(np.asarray(x, float).dot(mm).flat)
[docs] def mean(self, t): s = np.asarray(self.start, float) m = self._m_pow(t) return list(s.dot(m).flat)
[docs] def variance(self, t): return list(np.diag(self.covariance(t)).flat)
[docs] def covariance(self, t): def _e_xy(prop_x, cum_prop_x, prop_y, cum_prop_y): b = list() for p, c in zip(prop_x, cum_prop_x): b.append([max(0., min(c, d) - max(c - p, d - q)) for q, d in zip(prop_y, cum_prop_y)]) return np.matrix(b) s = np.asarray(self.start, float) m = self._m_pow(t) c = list() for prop_x, cum_prop_x, mean_x in zip(m.T, m.cumsum(1).T, self.mean(t)): r = list() for prop_y, cum_prop_y, mean_y in zip(m.T, m.cumsum(1).T, self.mean(t)): exy_m = _e_xy(list(prop_x.flat), list(cum_prop_x.flat), list(prop_y.flat), list(cum_prop_y.flat)) exx = s.dot(exy_m).dot(s.T)[0, 0] cov_xy = exx - mean_x * mean_y cov_xy = max(0., cov_xy) if cov_xy > -EPS else cov_xy # avoid numerical negatives r.append(cov_xy) c.append(r) return c
[docs]class FiniteStateContinuousTimeMarkovChain(FiniteStateMarkovChain): def __init__(self, transition=None, r_squared=1., start=None): super(FiniteStateContinuousTimeMarkovChain, self).__init__(transition, r_squared, start) self._transition_generator = logm(self._transition_matrix) # todo: handle complex solution def _m_pow(self, t, s=0.): return expm(self._transition_generator * (t - s))
[docs]class FiniteStateInhomogeneousMarkovChain(FiniteStateMarkovChain):
[docs] @classmethod def random(cls, d=5, l=3): s = np.random.random((1, d)) # pick random vector or matrix with positive values s = s / s.sum(1) # turn them into stochastic ones g = list() for _ in range(l): t = np.random.random((d, d)) # pick random vector or matrix with positive values t = t / t.sum(1).reshape((d, 1)) # turn them into stochastic ones g.append(t) # build process instance return cls(transition=g, start=list(s.flat))
def __init__(self, transition=None, r_squared=1., start=None): transition = [transition] if not isinstance(transition, (tuple, list)) else transition super(FiniteStateInhomogeneousMarkovChain, self).__init__(transition.pop(-1), r_squared, start) self._transition_grid = list(np.matrix(t, float) for t in transition) self._identity = np.identity(len(self.start), float) def _m_pow(self, t, s=0): l = len(self._transition_grid) # use transition grid at start n = self._identity for i in range(min(s, l), min(t, l)): n = n.dot(self._transition_grid[i]) # use super beyond the transition grid return n * super(FiniteStateInhomogeneousMarkovChain, self)._m_pow(t, min(t, max(s, l)))
[docs]class AugmentedFiniteStateMarkovChain(StochasticProcess):
[docs] @classmethod def random(cls, d=5, augmentation=None): underlying = FiniteStateMarkovChain.random(d) return cls(underlying, augmentation)
@property def diffusion_driver(self): return self._underlying.diffusion_driver @property def start(self): return self._underlying.start @start.setter def start(self, value): self._underlying.start = value def __init__(self, underlying, augmentation=None): r""" :param FiniteStateMarkovChain underlying: underlying Markov chain stochastic process :param callable augmentation: function :math:`f:S \rightarrow \mathbb{R}` defined on single states to weight augmentation (aggregate) of state distributions (optional) default: :math:`f=id` Augmentation argument can be list or tuple, too. In this case :code:`__getitem__` is called. """ self._underlying = underlying super(AugmentedFiniteStateMarkovChain, self).__init__(self._underlying.start) augmentation = augmentation.__getitem__ if isinstance(augmentation, (list, tuple)) else augmentation self._augmentation = (lambda x: 1.) if augmentation is None else augmentation def __len__(self): return len(self._underlying) def __str__(self): return self.__class__.__name__ + '(underlying=%s)' % str(self._underlying)
[docs] def evolve(self, x, s, e, q): return self._underlying.evolve(x, s, e, q)
[docs] def eval(self, s): s = s.value if hasattr(s, 'value') else s return sum(x * self._augmentation(i) for i, x in enumerate(s))
[docs] def mean(self, t): return self.eval(self._underlying.mean(t))
[docs] def variance(self, t): w = np.array([self._augmentation(i) for i in range(len(self.start))], float) c = np.matrix(self._underlying.covariance(t), float) return w.dot(c).dot(w.T)[0, 0]