Для обобщенной поточно-ориентированной функции более высокого уровня вы можете использовать что-то вроде следующего в сочетании с классом Queue.Queue
и некоторыми другими битами:
from Queue import Empty
def Accumulator(f, q, storage):
"""Yields successive values of `f` over the accumulation of `q`.
`f` should take a single iterable as its parameter.
`q` is a Queue.Queue or derivative.
`storage` is a persistent sequence that provides an `append` method.
`collections.deque` may be particularly useful, but a `list` is quite acceptable.
>>> from Queue import Queue
>>> from collections import deque
>>> from threading import Thread
>>> def mean(it):
... vals = tuple(it)
... return sum(it) / len(it)
>>> value_queue = Queue()
>>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(LastThreeAverage)
[0, 1, 2, 4, 6, 8]
"""
try:
while True:
storage.append(q.get(timeout=0.1))
q.task_done()
yield f(storage)
except Empty:
pass
Эта функция генератора уклоняется от большей части своей предполагаемойответственность, делегируя его другим организациям:
- Он полагается на
Queue.Queue
для обеспечения своих исходных элементов потокобезопасным способом - A
collections.deque
объект может быть передан какзначение параметра storage
;это обеспечивает, среди прочего, удобный способ использовать только последние n
(в данном случае 3) значения - Сама функция (в данном случае
mean
) передается в качестве параметра.В некоторых случаях это приведет к неоптимально эффективному коду, но с готовностью применяется ко всем видам ситуаций.
Обратите внимание, что существует вероятность тайм-аута накопителя, если ваш поток производителя принимаетболее 0,1 секунды на значение.Это легко исправить, передав более длительное время ожидания или полностью удалив параметр времени ожидания.В последнем случае функция будет блокироваться на неопределенный срок в конце очереди;такое использование имеет больше смысла в случае, когда оно используется в подпотоке (обычно в потоке daemon
).Конечно, вы также можете параметризовать аргументы, которые передаются в q.get
в качестве четвертого аргумента, в Accumulator
.
Если вы хотите сообщить конец очереди, то есть, что больше нет значений дляПоток производителя (здесь putting_thread
), вы можете передать и проверить значение часового или использовать другой метод.Больше информации в этой теме ;Я решил написать подкласс Queue.Queue с именем CloseableQueue , который предоставляет метод close
.
Существуют различные другие способы, позволяющие настроить поведение такой функции, например, с помощьюограничение размера очереди;это только пример использования.
edit
Как уже упоминалось выше, это теряет некоторую эффективность из-за необходимости пересчета, а также, я думаю, на самом деле не отвечает на ваш вопрос.
Функция генератора также может принимать значения с помощью метода send
.Таким образом, вы можете написать функцию генератора средних значений, например
def meangen():
"""Yields the accumulated mean of sent values.
>>> g = meangen()
>>> g.send(None) # Initialize the generator
>>> g.send(4)
4.0
>>> g.send(10)
7.0
>>> g.send(-2)
4.0
"""
sum = yield(None)
count = 1
while True:
sum += yield(sum / float(count))
count += 1
. Здесь выражение yield приносит оба значения - аргументы send
- в функцию, одновременно передавая вычисленные значения в качестве возвращаемого значенияsend
.
Вы можете передать генератор, возвращенный вызовом этой функции, в более оптимизируемую функцию генератора аккумулятора, такую как эта:
def EfficientAccumulator(g, q):
"""Similar to Accumulator but sends values to a generator `g`.
>>> from Queue import Queue
>>> from threading import Thread
>>> value_queue = Queue()
>>> g = meangen()
>>> g.send(None)
>>> mean_accumulator = EfficientAccumulator(g, value_queue)
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(mean_accumulator)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
try:
while True:
yield(g.send(q.get(timeout=0.1)))
q.task_done()
except Empty:
pass