Генератор общего питона - PullRequest
17 голосов
/ 14 апреля 2019

Я пытаюсь воспроизвести концепцию «разделяемых» реактивных расширений с генераторами Python.

Допустим, у меня есть API, который дает мне бесконечный поток, который я могу использовать следующим образом:

def my_generator():
    for elem in the_infinite_stream():
        yield elem

Я мог бы использовать этот генератор несколько раз, например:

stream1 = my_generator()
stream2 = my_generator()

И the_infinite_stream() будет вызываться дважды (один раз для каждого генератора).

Теперь скажите, что the_infinite_stream()дорогая операция.Есть ли способ «разделить» генератор между несколькими клиентами?Кажется, что tee сделает это, но я должен заранее знать, сколько независимых генераторов я хочу.

Идея состоит в том, что в других языках (Java, Swift) используются реактивные расширения.(RxJava, RxSwift) «общие» потоки, я могу удобно дублировать поток на стороне клиента.Мне интересно, как это сделать в Python.

Примечание: я использую asyncio

Ответы [ 4 ]

9 голосов
/ 19 апреля 2019

Я взял tee реализацию и изменил ее так, что вы можете иметь различное количество генераторов от infinite_stream:

import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1

Чтобы кэшировать только некоторое количество значений, вы можете изменить already_gone = [] на already_gone = collections.deque(maxlen=size) и добавьте size=None параметр к generators_factory.

4 голосов
/ 23 апреля 2019

Вы можете использовать один генератор и «генераторы подписчика»:

subscribed_generators = []


def my_generator():
    while true:
        elem = yield
        do_something(elem) # or yield do_something(elem) depending on your actual use

def publishing_generator():
    for elem in the_infinite_stream():
        for generator in subscribed_generators:
            generator.send(elem)

subscribed_generators.extend([my_generator(), my_generator()])

# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
    pass

Вместо функции генератора вы также можете создать класс с методами: __next__, __iter__, send, throw.Таким образом, вы можете изменить метод MyGenerator.__init__, чтобы автоматически добавлять его новые экземпляры в subscribed_generators.

Это несколько похоже на подход, основанный на событиях, с «тупой реализацией»:

  • for elem in the_infinite_stream аналогично отправке события
  • for generator ...: generator.send аналогично отправке события каждому подписчику.

Таким образом, один из способов реализации «более сложного, но структурированного решения»будет использовать подход, основанный на событиях:

  • Например, вы можете использовать asyncio.Event
  • или какое-либо стороннее решение, такое как aiopubsub
  • Для любого из этих подходов вы должны генерировать событие для каждого элемента из the_infinite_stream, и ваши экземпляры my_generator должны быть подписаны на эти события.

И другиеПодходы также могут быть использованы, и лучший выбор зависит: от деталей вашей задачи, от того, как вы используете цикл обработки событий в asyncio.Например:

  • Вы можете реализовать the_infinite_stream (или обертку для него) как некоторый класс с «курсорами» (объекты, которые отслеживают текущую позицию в потоке для разных подписчиков);затем каждый my_generator регистрирует новый курсор и использует его для получения следующего элемента в бесконечном потоке.При таком подходе цикл обработки событий не будет автоматически пересматривать my_generator экземпляры, которые могут потребоваться, если эти экземпляры "не равны" (например, имеют некоторое "выравнивание приоритетов")

  • Промежуточное звеноГенератор вызывает все экземпляры my_generator (как описано ранее).При таком подходе каждый экземпляр my_generator автоматически возвращается к циклу обработки событий.Скорее всего, этот подход является поточно-ориентированным.

  • Основанные на событиях подходы:

    • с использованием asyncio.Event.Аналогично использованию промежуточного генератора.Не поточно-ориентированный

    • aiopubsub.

    • то, что использует Шаблон наблюдателя

  • Сделать the_infinite_generator (или оболочку для него) таким, чтобы он был "Singleton", который "кэширует" последнее событие.Некоторые подходы были описаны в других ответах.Могут быть использованы другие решения «кэширования»:

    • испускают один и тот же элемент один раз для каждого экземпляра the_infinite_generator (используйте класс с пользовательским методом __new__, который отслеживает экземпляры, или используйте тот же самыйэкземпляр класса, у которого есть метод, возвращающий «сдвинутый» итератор над the_infinite_loop), пока кто-то не вызовет специальный метод для экземпляра the_infinite_generator (или для класса): infinite_gen.next_cycle.В этом случае всегда должен быть какой-то «последний завершающий генератор / процессор», который в конце цикла каждого цикла обработки будет делать the_infinite_generator().next_cycle()

    • Аналогично предыдущему, но разрешено то же событиезапускать несколько раз в одном экземпляре my_generator (поэтому они должны следить за этим случаем).В этом подходе the_infinite_generator().next_cycle() можно вызывать «периодически» с помощью loop.call_later или loop.cal_at .Этот подход может понадобиться, если «подписчики» должны иметь возможность обрабатывать / анализировать: задержки, ограничения скорости, тайм-ауты между событиями и т. Д.

  • Многие другие решениявозможныТрудно предложить что-то конкретное, не глядя на текущую реализацию и не зная, каково желаемое поведение генераторов, которые используют the_infinite_loop

Если я правильно понимаю ваше описание «общих» потоков, что вам действительно нужен «один» генератор the_infinite_stream и «обработчик» для него.Пример, который пытается сделать это:


class StreamHandler:
    def __init__(self):
        self.__real_stream = the_infinite_stream()
        self.__sub_streams = []

    def get_stream(self):
        sub_stream = []  # or better use some Queue/deque object. Using list just to show base principle
        self.__sub_streams.append(sub_stream)
        while True:
            while sub_stream:
                yield sub_stream.pop(0)
            next(self)

    def __next__(self):
        next_item = next(self.__real_stream)
        for sub_stream in self.__sub_steams:
            sub_stream.append(next_item)

some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop

def my_generator():
    for elem in some_global_variable.get_stream():
        yield elem

Но если все ваши my_generator объекты инициализируются в одной и той же точке бесконечного потока и «одинаково» повторяются внутри цикла, тогда этот подход привнесет «ненужные» накладные расходы памяти для каждого «sub_stream» (используется как очередь) , Нет необходимости: потому что эти очереди всегда будут одинаковыми (но это можно оптимизировать: если есть какой-то существующий «пустой» подпоток, то он может быть повторно использован для новых подпотоков с некоторыми изменениями в «pop -logic»). И много-много других реализаций и нюансов можно обсудить

4 голосов
/ 23 апреля 2019

Вы можете вызывать «tee» несколько раз, чтобы создать несколько итераторов по мере необходимости.

it  = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)

Образец: http://tpcg.io/ZGc6l5.

4 голосов
/ 23 апреля 2019

Рассмотрим простые атрибуты класса .

Дано

def infinite_stream():
    """Yield a number from a (semi-)infinite iterator."""
    # Alternatively, `yield from itertools.count()`
    yield from iter(range(100000000))


# Helper
def get_data(iterable):
    """Print the state of `data` per stream."""
    return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])

Код

class SharedIterator:
    """Share the state of an iterator with subclasses."""
    _gen = infinite_stream()
    data = None

    @staticmethod
    def modify():
        """Advance the shared iterator + assign new data."""
        cls = SharedIterator
        cls.data = next(cls._gen)

Демо

Учитывая кортеж клиента streams (A, B и C),

# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass


streams = A, B, C

давайте изменим ивывести состояние одного итератора, общего для них:

# Observe changed state in subclasses    
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))

Вывод

1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2

Хотя любой поток может изменять итератор, атрибут класса является общим для подклассов.

См. Также

  • Документы в asyncio.Queue - асинхронная альтернатива общему контейнеру
  • Post на схеме наблюдателя + asyncio
...