Статистический аккумулятор в Python - PullRequest
4 голосов
/ 23 сентября 2010

Статистический накопитель позволяет выполнять инкрементные вычисления. Например, для вычисления среднего арифметического потока чисел, заданных в произвольные моменты времени, можно создать объект, который отслеживает текущее количество заданных элементов n и их сумму sum. Когда кто-то запрашивает среднее значение, объект просто возвращает sum/n.

Аккумулятор, подобный этому, позволяет вам постепенно вычислять в том смысле, что при получении нового числа вам не нужно пересчитывать всю сумму и считать.

Подобные аккумуляторы могут быть записаны для другой статистики (см. boost библиотеки для реализации C ++).

Как бы вы внедрили аккумуляторы в Python? Код, который я придумал :

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

Интересные вопросы дизайна возникают:

  1. Как сделать аккумулятор потокобезопасный?
  2. Как безопасно удалить Предметы?
  3. Как проектировать в пути что позволяет другой статистике быть легко подключается (завод для статистики)

Ответы [ 2 ]

3 голосов
/ 23 сентября 2010

Для обобщенной поточно-ориентированной функции более высокого уровня вы можете использовать что-то вроде следующего в сочетании с классом Queue.Queue и некоторыми другими битами:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

Эта функция генератора уклоняется от большей части своей предполагаемойответственность, делегируя его другим организациям:

  • Он полагается на Queue.Queue для обеспечения своих исходных элементов потокобезопасным способом
  • A collections.deque объект может быть передан какзначение параметра storage;это обеспечивает, среди прочего, удобный способ использовать только последние n (в данном случае 3) значения
  • Сама функция (в данном случае mean) передается в качестве параметра.В некоторых случаях это приведет к неоптимально эффективному коду, но с готовностью применяется ко всем видам ситуаций.

Обратите внимание, что существует вероятность тайм-аута накопителя, если ваш поток производителя принимаетболее 0,1 секунды на значение.Это легко исправить, передав более длительное время ожидания или полностью удалив параметр времени ожидания.В последнем случае функция будет блокироваться на неопределенный срок в конце очереди;такое использование имеет больше смысла в случае, когда оно используется в подпотоке (обычно в потоке daemon).Конечно, вы также можете параметризовать аргументы, которые передаются в q.get в качестве четвертого аргумента, в Accumulator.

Если вы хотите сообщить конец очереди, то есть, что больше нет значений дляПоток производителя (здесь putting_thread), вы можете передать и проверить значение часового или использовать другой метод.Больше информации в этой теме ;Я решил написать подкласс Queue.Queue с именем CloseableQueue , который предоставляет метод close.

Существуют различные другие способы, позволяющие настроить поведение такой функции, например, с помощьюограничение размера очереди;это только пример использования.

edit

Как уже упоминалось выше, это теряет некоторую эффективность из-за необходимости пересчета, а также, я думаю, на самом деле не отвечает на ваш вопрос.

Функция генератора также может принимать значения с помощью метода send.Таким образом, вы можете написать функцию генератора средних значений, например

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

. Здесь выражение yield приносит оба значения - аргументы send - в функцию, одновременно передавая вычисленные значения в качестве возвращаемого значенияsend.

Вы можете передать генератор, возвращенный вызовом этой функции, в более оптимизируемую функцию генератора аккумулятора, такую ​​как эта:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass
1 голос
/ 23 сентября 2010

Если бы я делал это в Python, я бы сделал по-разному две вещи:

  1. Выделил бы функциональность каждого аккумулятора.
  2. Ни в коем случае не используйте @propertyвы сделали.

Для первого, я, вероятно, хотел бы придумать API для выполнения накопления, возможно, что-то вроде:

def add(self, num) # add a number
def compute(self) # compute the value of the accumulator

Затем я бы создалAccumulatorRegistry, который хранит эти аккумуляторы и позволяет пользователю вызывать действия и добавлять к ним все.Код может выглядеть следующим образом:

class Accumulators(object):
    _accumulator_library = {}

    def __init__(self):
        self.accumulator_library = {}
        for key, value in Accumulators._accumulator_library.items():
            self.accumulator_library[key] = value()

    @staticmethod
    def register(name, accumulator):
        Accumulators._accumulator_library[name] = accumulator

    def add(self, num):
        for accumulator in self.accumulator_library.values():
            accumulator.add(num)

    def compute(self, name):
        self.accumulator_library[name].compute()

    @staticmethod
    def register_decorator(name):
        def _inner(cls):
            Accumulators.register(name, cls)
            return cls


@Accumulators.register_decorator("Mean")
class Mean(object):
    def __init__(self):
        self.total = 0
        self.count = 0

    def add(self, num):
        self.count += 1
        self.total += num

    def compute(self):
        return self.total / float(self.count)

Я, вероятно, должен ответить на ваш поточно-ориентированный вопрос.GIL Python защищает вас от множества проблем с потоками.Однако есть несколько способов защитить себя:

  • Если эти объекты локализованы в одном потоке, используйте threading.local
  • Если нет, вы можете обернутьоперации в блокировке, используя синтаксис with context для удержания блокировки за вас.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...