Как периодически вызывать асинхронную сопрограмму, используя наблюдаемый интервал RxPY? - PullRequest
0 голосов
/ 22 июня 2019

Мне нужно создать поток Observable, который будет генерировать результат асинхронной сопрограммы через равные промежутки времени.

intervalRead - это функция, которая возвращает Observable и принимает в качестве параметров интервал rate и асинхронную функцию сопрограммы fun, которую необходимо вызывать через определенный интервал.

Мой первый подход состоял в том, чтобы создать наблюдаемую с помощью метода фабрики интервалов, а затем использовать map для вызова сопрограммы, используя from_future, чтобы обернуть ее в Observable, а затем получить значение, возвращаемое сопрограммой.

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

Тем не менее, вывод, который я получаю, не является результатом сопрограммы, а Observable, возвращаемым from_future, генерируемым через указанный интервал

output: <rx.core.observable.observable.Observable object at 0x033B5650>

Как я мог получитьфактическое значение, возвращаемое этим Observable?Я ожидал бы 42

Мой второй подход состоял в создании настраиваемой наблюдаемой:


def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None):
        loop = asyncio.get_event_loop()
        def on_timer(i):
            task = loop.create_task(fun())
            from_future(task).subscribe(
                on_next= lambda i: observer.on_next(i),
                on_error= lambda e: observer.on_error(e),
                on_completed= lambda: print('coro completed')
            )
        interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))        
    return rx.create(subs)

Однако при подписке from_future(task) никогда не выдает значения, почему это происходит?

Тем не менее, если я напишу intervalRead так:

def intervalRead(rate, fun):
    loop = asyncio.get_event_loop()
    task = loop.create_task(fun())
    return from_future(task)

Я получу ожидаемый результат: 42.Очевидно, что это не решает мою проблему, но меня смущает, почему это не работает в моем втором подходе?

Наконец, я экспериментировал с третьим подходом, используя rx.concurrency CurrentThreadScheduler, и планировал какое-то действие с помощьюschedule_periodic метод.Тем не менее, я сталкиваюсь с той же проблемой, с которой я сталкиваюсь при втором подходе.

def funWithScheduler(rate, fun):
    loop = asyncio.get_event_loop()
    scheduler = CurrentThreadScheduler()
    subject = rx.subjects.Subject()
    def action(param):
        obs = rx.from_future(loop.create_task(fun())).subscribe(
            on_next= lambda item: subject.on_next(item),
            on_error= lambda e: print(f'error in action {e}'),
            on_completed= lambda: print('action completed')
        )     
        obs.dispose()   
    scheduler.schedule_periodic(rate,action)
    return subject

Буду признателен за понимание того, что мне не хватает, или за любые другие предложения по достижению того, что мне нужно.Это мой первый проект с asyncio и RxPY, я использую RxJS только в контексте углового проекта, поэтому любая помощь приветствуется.

1 Ответ

0 голосов
/ 24 июня 2019

Ваш первый пример почти работает. Есть только два изменения, необходимые для его работы:

Сначала результат from_future является наблюдаемой, которая испускает один элемент (значение будущего, когда оно завершится). Таким образом, выходные данные map являются наблюдаемыми более высокого порядка (наблюдаемые, которые испускают наблюдаемые). Эти дочерние наблюдаемые могут быть сплющены с помощью оператора merge_all после карты или с помощью flat_map вместо карты.

Тогда оператор интервала должен запланировать свой таймер в цикле AsyncIO, который не применяется по умолчанию: планировщиком по умолчанию является TimeoutScheduler, и он порождает новый поток. Таким образом, в исходном коде задачу нельзя запланировать в цикле событий AsyncIO, поскольку create_task вызывается из другого потока. Использование параметра планировщика в вызове для подписки объявляет планировщик по умолчанию, который будет использоваться для всей цепочки операторов.

Работает следующий код (42 печатается каждые 5 секунд):

import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler


async def foo():
    await asyncio.sleep(1)
    return 42


def intervalRead(rate, fun) -> rx.Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
        ops.merge_all()
    )


async def main(loop):
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next=lambda item: print(item),
        scheduler=AsyncIOScheduler(loop)
    )

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
...