асинхронный Python itertools цепочка нескольких генераторов - PullRequest
0 голосов
/ 22 ноября 2018

ОБНОВЛЕННЫЙ ВОПРОС ДЛЯ ЯСНОСТИ:

предположим, у меня есть 2 функции генератора обработки:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

Я могу связать их с помощью itertools

from itertools import chain

mix = chain(gen1(), gen2())

и затем я могу создать другой объект функции генератора с ним,

def mix_yield():
   for item in mix:
      yield item

или просто, если я просто хочу next(mix), он есть.

Мой вопрос таков:Как я могу сделать эквивалент в асинхронном коде?

Потому что мне нужно:

  • возврат в yield (один за другим), или с next итератор
  • самый быстрый разрешенный доход первым (асинхронный)

ПРЕД.ОБНОВЛЕНИЕ:

После экспериментов и исследований я обнаружил библиотеку aiostream , которая сообщает как асинхронную версию itertools, поэтому я сделал:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

, но явсе еще не могу сделать next(a_mix)

TypeError: 'merge' object is not an iterator

или next(await a_mix)

raise StreamEmpty()

Хотя я все еще могу сделать это в списке:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

такодна цель достигнута, еще одна должна пройти:

  • доходность (одна за другой) или next итератор

    - самый быстрый результатпервый выход (асинхронный)

1 Ответ

0 голосов
/ 22 ноября 2018

Асинхронный эквивалент next - это метод __anext__ на асинхронном итераторе.Итератор, в свою очередь, получается путем вызова __aiter__ (по аналогии с __iter__) для итерируемого.Развернутая асинхронная итерация выглядит следующим образом:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

Метод __anext__ вызовет StopAsyncIteration, когда больше нет доступных элементов.Чтобы выполнить итерацию по асинхронным итераторам, вы должны использовать async for вместо for.

Вот пример запуска, основанный на вашем коде, использующий и 1014 *, и async for для исчерпания настроенного потокас aiostream.stream.combine.merge:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...