Source code for timewave.stochasticconsumer

# -*- coding: utf-8 -*-

# timewave
# --------
# timewave, a stochastic process evolution simulation engine in python.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.6, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/timewave
# License:  Apache License 2.0 (see LICENSE file)


from math import sqrt
from random import sample

from .consumers import TransposedConsumer


# statistics and stochastic process consumers


class _Statistics(object):
    """
    calculate basic statistics for a 1 dim empirical sample
    """
    _available = 'count', 'mean', 'stdev', 'variance', 'skewness', 'kurtosis', 'median', 'min', 'max'

    def get(self, item, default=None):
        return getattr(self, item, default)

    def keys(self):
        return self._available

    def values(self):
        return tuple(getattr(self, a, 0.0) for a in list(self.keys()))

    def items(self):
        return list(zip(list(self.keys()), list(self.values())))

    def __init__(self, data, description='', **expected):
        sps = sorted(data)
        l = float(len(sps))
        p = [int(i * l * 0.01) for i in range(100)]
        m1 = self._moment(sps)
        cm2 = self._moment(sps, 2, m1)
        cm3 = self._moment(sps, 3, m1)
        cm4 = self._moment(sps, 4, m1)
        self.description = description
        self.count = len(sps)
        self.mean = m1
        self.variance = 0. if len(set(sps)) == 1 else cm2 * l / (l - 1.)
        self.stdev = sqrt(self.variance)
        self.skewness = 0. if len(set(sps)) == 1 else cm3 / (self.stdev ** 3)
        self.kurtosis = 0. if len(set(sps)) == 1 else cm4 / cm2 ** 2 - 3.
        self.median = sps[p[50]]
        self.min = sps[0]
        self.max = sps[-1]
        self.box = [sps[0], sps[p[25]], sps[p[50]], sps[p[75]], sps[-1]]
        self.percentile = [sps[int(i)] for i in p]
        self.sample = data
        process, time = expected.get('process'), expected.get('time', 1.)
        if process:
            expected.update(dict((k, getattr(process, k)(time)) for k in self._available if hasattr(process, k)))
        self.expected = expected

    @staticmethod
    def _moment(data, degree=1, mean=0.):
        return sum([(rr - mean) ** degree for rr in data]) / float(len(data))

    def __contains__(self, item):
        return item in list(self.keys())

    def __iter__(self):
        return list(self.keys())

    def __getitem__(self, item):
        return getattr(self, item)

    def __repr__(self):
        return '\n' + str(self)

    def __str__(self):
        f = (lambda v: '%0.8f' % v if isinstance(v, (int, float)) else '')
        keys, values = list(self.keys()), list(map(f, list(self.values())))
        mk, mv = max(list(map(len, keys))), max(list(map(len, values)))
        res = [a.ljust(mk) + ' : ' + v.rjust(mv) for a, v in zip(keys, values)]

        if self.expected:
            for l, k in enumerate(self.keys()):
                if k in self.expected:
                    e, v = self.expected[k], getattr(self, k)
                    # diff v - e
                    res[l] += ' - ' + ('%0.8f' % e).rjust(mv) + ' = ' + ('%0.8f' % (v - e)).rjust(mv)
                    if e:  # rel diff if non zero e
                        res[l] += '  (' + ('%+0.3f' % (100. * (v - e) / e)).rjust(mv) + ' %)'

        res = [self.__class__.__name__ + '(' + self.description + ')'] + res
        return '\n'.join(res)


class _MetaStatistics(list):
    _available = 'mean', 'stdev', 'variance', 'skewness', 'kurtosis', 'median'

    def __init__(self, iterable, **expected):
        super(_MetaStatistics, self).__init__(iterable)
        self.expected = expected

    def keys(self):
        return self.__class__._available

    def values(self):
        return list(v for k, v in list(self.items()))

    def items(self):
        keys = list(self.keys())
        data = dict((k, list()) for k in keys)
        for s in self:
            for k in keys:
                data[k].append(getattr(s, k))
        return list((k, data[k]) for k in list(self.keys()))


class _BootstrapStatistics(_MetaStatistics):

    def __init__(self, data, statistics=None, sample_len=0.5, sample_num=100, **expected):
        # Jack knife n*(n-1)
        # bootstrap n*(n-1), (n-1)*(n-2), ...
        statistics = _Statistics if statistics is None else statistics
        if not expected:
            expected = dict(list(statistics(data).items()))
        self.sample = data
        k = int(float(len(data)) * sample_len)
        p = str(expected.get('process', ''))
        iterable = (statistics(sample(data, k), description='%s(%d)' % (p,i), **expected) for i in range(sample_num))
        super(_BootstrapStatistics, self).__init__(iterable, **expected)

    def items(self):
        expected = self.expected
        process, time = expected.get('process'), expected.get('time', 1.)
        if process:
            expected.update(dict((k, getattr(process, k)(time)) for k in self._available if hasattr(process, k)))

        res = list()
        for k, v in list(super(_BootstrapStatistics, self).items()):
            p = str(expected.get('process', ''))
            d = '%s[%s]' % (p, k)
            if k in expected:
                s = _Statistics(v, description=d, mean=expected.get(k))
            else:
                s = _Statistics(v, description=d)
            res.append((k, s))
        return res


class _ConvergenceStatistics(_MetaStatistics):
    def __init__(self, data, statistics=None, sample_num=10, **expected):
        # convergence [:1] -> [:n]
        statistics = _Statistics if statistics is None else statistics
        if not expected:
            expected = dict(list(statistics(data).items()))
        self.sample = data
        k = int(len(data)/sample_num)
        p = str(expected.get('process', ''))
        iterable = ((statistics(data[:i+k], description='%s[0:%d]' % (p,i+k), **expected)) for i in range(0, len(data), k))
        super(_ConvergenceStatistics, self).__init__(iterable, **expected)


# todo class _ValidationStatistics(object): # 60:40 validation test


class _MultiStatistics(object):
    _available = 'count', 'mean', 'variance', 'stdev', 'skewness', 'kurtosis', \
                 'min', 'max', 'median', 'box', 'percentile', 'sample'

    def __init__(self, data):
        self._inner = list(_Statistics(d) for d in zip(*data))

    def __getattr__(self, item):
        if item in _MultiStatistics._available and hasattr(_Statistics(list(range(10))), item):
            return list(getattr(s, item) for s in self._inner)
        else:
            return super(_MultiStatistics, self).__getattribute__(item)


[docs]class StatisticsConsumer(TransposedConsumer): """ run basic statistics on storage consumer result per time slice """ def __init__(self, func=None, statistics=None, **kwargs): if statistics is None: statistics = _Statistics self.statistics = statistics self.kwargs = kwargs super(StatisticsConsumer, self).__init__(func)
[docs] def finalize(self): """finalize for StatisticsConsumer""" super(StatisticsConsumer, self).finalize() # run statistics on timewave slice w at grid point g # self.result = [(g, self.statistics(w)) for g, w in zip(self.grid, self.result)] if self.kwargs: self.result = list(zip(self.grid, (self.statistics(w, **self.kwargs) for w in self.result))) else: self.result = list(zip(self.grid, list(map(self.statistics, self.result))))
[docs]class StochasticProcessStatisticsConsumer(StatisticsConsumer): """ run basic statistics on storage consumer result as a stochastic process """
[docs] def finalize(self): """finalize for StochasticProcessStatisticsConsumer""" super(StochasticProcessStatisticsConsumer, self).finalize() class StochasticProcessStatistics(self.statistics): """local version to store statistics""" def __str__(self): s = [k.rjust(12) + str(getattr(self, k)) for k in dir(self) if not k.startswith('_')] return '\n'.join(s) sps = StochasticProcessStatistics([0, 0]) keys = list() for k in dir(sps): if not k.startswith('_'): a = getattr(sps, k) if isinstance(a, (int, float, str)): keys.append(k) #else: # delattr(sps, k) for k in keys: setattr(sps, k, list()) grid = list() for g, r in self.result: grid.append(g) for k in keys: a = getattr(sps, k) a.append(getattr(r, k)) self.result = grid, sps
[docs]class TimeWaveConsumer(TransposedConsumer):
[docs] def finalize(self): super(TimeWaveConsumer, self).finalize() max_v = max(max(w) for w in self.result) min_v = min(min(w) for w in self.result) min_l = min(len(w) for w in self.result) n = int(sqrt(min_l)) # number of y grid y_grid = [min_v + (max_v - min_v) * float(i) / float(n) for i in range(n)] y_grid.append(max_v + 1e-12) x, y, z = list(), list(), list() # grid, value, count for point, wave in zip(self.grid, self.result): for l, u in zip(y_grid[:-1], y_grid[1:]): x.append(point) y.append(l + (u - l) * .5) z.append(float(len([w for w in wave if l <= w < u])) / float(min_l)) self.result = x, y, z