Source code for timewave.engine

# -*- coding: utf-8 -*-

# timewave
# --------
# timewave, a stochastic process evolution simulation engine in python.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.6, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/timewave
# License:  Apache License 2.0 (see LICENSE file)


"""
module containing simulation method related classes incl. multiprocessing support
"""
from cProfile import runctx
from copy import copy
from random import Random

try:  # try accepted due to lack of multiprocessing on iOS Pythonista
    from multiprocessing import cpu_count, current_process, Process, Queue

    CPU_COUNT = cpu_count()
except ImportError:
    cpu_count, current_process, Process, Queue = None, None, None, None
    CPU_COUNT = None


[docs]class Producer(object): """ abstract class implementing simple producer for a model between grid dates """ def __init__(self, func=None, initial_state=None): super(Producer, self).__init__() if func is None: func = (lambda s, d: s.value) self.func = func if initial_state is None: initial_state = State() self.initial_state = initial_state self.random = Random() self.grid = None self.num_of_paths = None self.seed = None self.state = None
[docs] def initialize(self, grid, num_of_paths, seed): """ inits producer for a simulation run """ self.grid = grid self.num_of_paths = num_of_paths self.seed = seed if self.initial_state.date is None: self.initial_state.date = grid[0]
[docs] def initialize_worker(self, process_num=None): """ inits producer for a simulation run on a single process """ self.initial_state.process = process_num seed = self.seed if process_num is None else hash(self.seed) + hash(process_num) seed = self.seed if process_num is None else self.seed + process_num self.random.seed(seed)
[docs] def initialize_path(self, path_num=None): """ inits producer for next path, i.e. sets current state to initial state""" self.state = copy(self.initial_state) self.state.path = path_num
# seed = self.seed if path_num is None else hash(self.seed) + hash(path_num) # self.random.seed(seed)
[docs] def evolve(self, new_date): """ evolve to the new process state at the next date, i.e. do one step in the simulation :param date new_date: date of the new state :return State: """ if self.state.date == new_date and not self.initial_state.date == new_date: return self.state self.state.value = self.func(self.state, new_date) self.state.date = new_date return self.state
[docs]class State(object): """ simulation state """ def __init__(self, value=0.0): super(State, self).__init__() self.value = value self.date = None self.process = None self.path = None
[docs]class Engine(object): """ This class implements Monte Carlo engine """ def __init__(self, producer=None, consumer=None): super(Engine, self).__init__() if not isinstance(producer, Producer) and not isinstance(consumer, Consumer): raise ValueError("%s argunments must me either Producer oder Consumer." % self.__class__.__name__) self.producer = producer self.consumer = consumer self.grid = None self.num_of_paths = None self.num_of_workers = None self.seed = None
[docs] def run(self, grid=None, num_of_paths=2000, seed=0, num_of_workers=CPU_COUNT, profiling=False): """ implements simulation :param list(date) grid: list of Monte Carlo grid dates :param int num_of_paths: number of Monte Carlo paths :param hashable seed: seed used for rnds initialisation (additional adjustment in place) :param int or None num_of_workers: number of parallel workers (default: cpu_count()), if None no parallel processing is used :param bool profiling: signal whether to use profiling, True means used, else not :return object: final consumer state It returns a list of lists. The list contains per path a list produced by consumer at observation dates """ self.grid = sorted(set(grid)) self.num_of_paths = num_of_paths self.num_of_workers = num_of_workers self.seed = seed # pre processing self.producer.initialize(self.grid, self.num_of_paths, self.seed) self.consumer.initialize(self.grid, self.num_of_paths, self.seed) if num_of_workers: # processing workers = list() queue = Queue() path_per_worker = int(num_of_paths // num_of_workers) start_path, stop_path = 0, path_per_worker for i in range(num_of_workers): if i == num_of_workers - 1: stop_path = num_of_paths # ensure exact num of path as required name = 'worker-%d' % i if profiling: # display profile with `snakeviz worker-0.prof` # if not installed `pip install snakeviz` workers.append(Process(target=self._run_parallel_process_with_profiling, name=name, args=(start_path, stop_path, queue, name + '.prof'))) else: workers.append(Process(target=self._run_parallel_process, name=name, args=(start_path, stop_path, queue))) start_path, stop_path = stop_path, stop_path + path_per_worker for worker in workers: worker.start() # post processing for _ in range(num_of_workers): self.consumer.get(queue.get()) for worker in workers: worker.join() else: self._run_process(0, num_of_paths) self.consumer.finalize() return self.consumer.result
def _run_parallel_process_with_profiling(self, start_path, stop_path, queue, filename): """ wrapper for usage of profiling """ runctx('Engine._run_parallel_process(self, start_path, stop_path, queue)', globals(), locals(), filename) def _run_parallel_process(self, start_path, stop_path, queue): """ The function calls _run_process and puts results produced by consumer at observations of top most consumer in to the queue """ process_num = int(current_process().name.split('-', 2)[1]) self._run_process(start_path, stop_path, process_num) queue.put(self.consumer.put()) def _run_process(self, start_path, stop_path, process_num=0): """ The function calls _run_path for given set of paths """ # pre processing self.producer.initialize_worker(process_num) self.consumer.initialize_worker(process_num) # processing for path in range(start_path, stop_path): self._run_path(path) # post processing self.consumer.finalize_worker(process_num) def _run_path(self, path_num): """ standalone function implementing a single loop of Monte Carlo It returns list produced by consumer at observation dates :param int path_num: path number """ # pre processing self.producer.initialize_path(path_num) self.consumer.initialize_path(path_num) # processing for new_date in self.grid: state = self.producer.evolve(new_date) self.consumer.consume(state) # post processing self.consumer.finalize_path(path_num)
[docs]class Consumer(object): """ base class for simulation consumers """ def __init__(self, func=None): """ initiatlizes consumer by providing a function :param func: consumer function with exact 1 argument which will consume the producer state. Default will return `state.value` :type func: callable """ super(Consumer, self).__init__() if func is None: func = (lambda s: s.value) self.func = func self.initial_state = list() self.state = list() self.result = list() self.num_of_paths = None self.grid = None self.seed = None
[docs] def initialize(self, grid=None, num_of_paths=None, seed=None): """ initialize consumer for simulation :param num_of_paths: number of path :type num_of_paths: int :param grid: list of grid point :type grid: list(date) :param seed: simulation seed :type seed: hashable """ self.num_of_paths = num_of_paths self.grid = grid self.seed = seed self.result = list() self.state = self.initial_state
[docs] def initialize_worker(self, process_num=None): """ reinitialize consumer for process in multiprocesing """ self.initialize(self.grid, self.num_of_paths, self.seed)
[docs] def initialize_path(self, path_num=None): """ initialize consumer for next path """ self.state = copy(self.initial_state) return self.state
[docs] def consume(self, state): """ consume new producer state """ self.state.append(self.func(state)) return self.state
[docs] def finalize_path(self, path_num=None): """ finalize last path for consumer """ self.result.append((path_num, self.state))
[docs] def finalize_worker(self, process_num=None): """ finalize process for consumer """ pass
[docs] def finalize(self): """ finalize simulation for consumer """ # todo sort self.result by path_num if self.result: self.result = sorted(self.result, key=lambda x: x[0]) p, r = list(map(list, list(zip(*self.result)))) self.result = r
[docs] def put(self): """ to put state into multiprocessing.queue """ return self.result
[docs] def get(self, queue_get): """ to get states from multiprocessing.queue """ if isinstance(queue_get, (tuple, list)): self.result.extend(queue_get)