Вы можете использовать один генератор и «генераторы подписчика»:
subscribed_generators = []
def my_generator():
while true:
elem = yield
do_something(elem) # or yield do_something(elem) depending on your actual use
def publishing_generator():
for elem in the_infinite_stream():
for generator in subscribed_generators:
generator.send(elem)
subscribed_generators.extend([my_generator(), my_generator()])
# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
pass
Вместо функции генератора вы также можете создать класс с методами: __next__
, __iter__
, send
, throw
.Таким образом, вы можете изменить метод MyGenerator.__init__
, чтобы автоматически добавлять его новые экземпляры в subscribed_generators
.
Это несколько похоже на подход, основанный на событиях, с «тупой реализацией»:
for elem in the_infinite_stream
аналогично отправке события for generator ...: generator.send
аналогично отправке события каждому подписчику.
Таким образом, один из способов реализации «более сложного, но структурированного решения»будет использовать подход, основанный на событиях:
- Например, вы можете использовать asyncio.Event
- или какое-либо стороннее решение, такое как aiopubsub
- Для любого из этих подходов вы должны генерировать событие для каждого элемента из
the_infinite_stream
, и ваши экземпляры my_generator
должны быть подписаны на эти события.
И другиеПодходы также могут быть использованы, и лучший выбор зависит: от деталей вашей задачи, от того, как вы используете цикл обработки событий в asyncio.Например:
Вы можете реализовать the_infinite_stream
(или обертку для него) как некоторый класс с «курсорами» (объекты, которые отслеживают текущую позицию в потоке для разных подписчиков);затем каждый my_generator
регистрирует новый курсор и использует его для получения следующего элемента в бесконечном потоке.При таком подходе цикл обработки событий не будет автоматически пересматривать my_generator
экземпляры, которые могут потребоваться, если эти экземпляры "не равны" (например, имеют некоторое "выравнивание приоритетов")
Промежуточное звеноГенератор вызывает все экземпляры my_generator
(как описано ранее).При таком подходе каждый экземпляр my_generator
автоматически возвращается к циклу обработки событий.Скорее всего, этот подход является поточно-ориентированным.
Основанные на событиях подходы:
с использованием asyncio.Event
.Аналогично использованию промежуточного генератора.Не поточно-ориентированный
aiopubsub.
то, что использует Шаблон наблюдателя
Сделать the_infinite_generator
(или оболочку для него) таким, чтобы он был "Singleton", который "кэширует" последнее событие.Некоторые подходы были описаны в других ответах.Могут быть использованы другие решения «кэширования»:
испускают один и тот же элемент один раз для каждого экземпляра the_infinite_generator
(используйте класс с пользовательским методом __new__
, который отслеживает экземпляры, или используйте тот же самыйэкземпляр класса, у которого есть метод, возвращающий «сдвинутый» итератор над the_infinite_loop
), пока кто-то не вызовет специальный метод для экземпляра the_infinite_generator
(или для класса): infinite_gen.next_cycle
.В этом случае всегда должен быть какой-то «последний завершающий генератор / процессор», который в конце цикла каждого цикла обработки будет делать the_infinite_generator().next_cycle()
Аналогично предыдущему, но разрешено то же событиезапускать несколько раз в одном экземпляре my_generator
(поэтому они должны следить за этим случаем).В этом подходе the_infinite_generator().next_cycle()
можно вызывать «периодически» с помощью loop.call_later или loop.cal_at .Этот подход может понадобиться, если «подписчики» должны иметь возможность обрабатывать / анализировать: задержки, ограничения скорости, тайм-ауты между событиями и т. Д.
Многие другие решениявозможныТрудно предложить что-то конкретное, не глядя на текущую реализацию и не зная, каково желаемое поведение генераторов, которые используют the_infinite_loop
Если я правильно понимаю ваше описание «общих» потоков, что вам действительно нужен «один» генератор the_infinite_stream
и «обработчик» для него.Пример, который пытается сделать это:
class StreamHandler:
def __init__(self):
self.__real_stream = the_infinite_stream()
self.__sub_streams = []
def get_stream(self):
sub_stream = [] # or better use some Queue/deque object. Using list just to show base principle
self.__sub_streams.append(sub_stream)
while True:
while sub_stream:
yield sub_stream.pop(0)
next(self)
def __next__(self):
next_item = next(self.__real_stream)
for sub_stream in self.__sub_steams:
sub_stream.append(next_item)
some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop
def my_generator():
for elem in some_global_variable.get_stream():
yield elem
Но если все ваши my_generator
объекты инициализируются в одной и той же точке бесконечного потока и «одинаково» повторяются внутри цикла, тогда этот подход привнесет «ненужные» накладные расходы памяти для каждого «sub_stream» (используется как очередь) , Нет необходимости: потому что эти очереди всегда будут одинаковыми (но это можно оптимизировать: если есть какой-то существующий «пустой» подпоток, то он может быть повторно использован для новых подпотоков с некоторыми изменениями в «pop
-logic»). И много-много других реализаций и нюансов можно обсудить