Мне нужно создать поток Observable, который будет генерировать результат асинхронной сопрограммы через равные промежутки времени.
intervalRead
- это функция, которая возвращает Observable и принимает в качестве параметров интервал rate
и асинхронную функцию сопрограммы fun
, которую необходимо вызывать через определенный интервал.
Мой первый подход состоял в том, чтобы создать наблюдаемую с помощью метода фабрики интервалов, а затем использовать map для вызова сопрограммы, используя from_future, чтобы обернуть ее в Observable, а затем получить значение, возвращаемое сопрограммой.
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
map(lambda i: rx.from_future(loop.create_task(fun()))),
)
async def main():
obs = intervalRead(5, foo)
obs.subscribe(
on_next= lambda item: print(item)
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
Тем не менее, вывод, который я получаю, не является результатом сопрограммы, а Observable, возвращаемым from_future, генерируемым через указанный интервал
output: <rx.core.observable.observable.Observable object at 0x033B5650>
Как я мог получитьфактическое значение, возвращаемое этим Observable?Я ожидал бы 42
Мой второй подход состоял в создании настраиваемой наблюдаемой:
def intervalRead(rate, fun) -> rx.Observable:
interval = rx.interval(rate)
def subs(observer: Observer, scheduler = None):
loop = asyncio.get_event_loop()
def on_timer(i):
task = loop.create_task(fun())
from_future(task).subscribe(
on_next= lambda i: observer.on_next(i),
on_error= lambda e: observer.on_error(e),
on_completed= lambda: print('coro completed')
)
interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))
return rx.create(subs)
Однако при подписке from_future(task)
никогда не выдает значения, почему это происходит?
Тем не менее, если я напишу intervalRead
так:
def intervalRead(rate, fun):
loop = asyncio.get_event_loop()
task = loop.create_task(fun())
return from_future(task)
Я получу ожидаемый результат: 42
.Очевидно, что это не решает мою проблему, но меня смущает, почему это не работает в моем втором подходе?
Наконец, я экспериментировал с третьим подходом, используя rx.concurrency CurrentThreadScheduler
, и планировал какое-то действие с помощьюschedule_periodic
метод.Тем не менее, я сталкиваюсь с той же проблемой, с которой я сталкиваюсь при втором подходе.
def funWithScheduler(rate, fun):
loop = asyncio.get_event_loop()
scheduler = CurrentThreadScheduler()
subject = rx.subjects.Subject()
def action(param):
obs = rx.from_future(loop.create_task(fun())).subscribe(
on_next= lambda item: subject.on_next(item),
on_error= lambda e: print(f'error in action {e}'),
on_completed= lambda: print('action completed')
)
obs.dispose()
scheduler.schedule_periodic(rate,action)
return subject
Буду признателен за понимание того, что мне не хватает, или за любые другие предложения по достижению того, что мне нужно.Это мой первый проект с asyncio и RxPY, я использую RxJS только в контексте углового проекта, поэтому любая помощь приветствуется.