У меня есть субъект subj = Subject()
, чей on_next(event)
будет вызываться асинхронно в другом потоке через NewThreadScheduler
observable.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler())
Теперь я хочу прослушать операцию on_next()
, чтобы знать, что к subj
добавляется новое событие. Как мне это сделать?
Я сейчас пишу как,
subj = ReplaySubject()
scheduler = NewThreadScheduler()
# this is an Observable created from grpc stream
grpc_stream: Observable = TreeLabClient.subscribe_to_workspace(subscription_input)
def subscribe_func(e):
subj.on_next(e)
print('completed on_next', subj, e)
grpc_dispose: Disposable = grpc_stream.subscribe(subscribe_func, scheduler=scheduler)
# creating an event to grpc_stream (not immediately), after a while I can see screen print out 'completed on_next' as written the subscribe_func
send_event()
async def a():
print('waiting...', subj)
# it blocks here and never get resolved, it is expected to be resolved as long as subj has on_next() called on it
return await subj.pipe(op.take(1))
asyncio.run(a())
Таким образом, я могу ждать, пока первое событие будет добавлено в subj
, пока не выполнятся другие задачи, которые зависят от создания первого события, через send_event()
.
Код, который я пробовал выше, на самом деле не работает, так как меня заблокировали на await subj.pipe(op.take(1))
, я почти уверен, что grpc_stream
получил события, и on_next(event)
был отлично выполнен с NewThreadScheduler
, так что победил не быть блоком, когда я выполняю код. Тем не менее, линия await subj.pipe(op.take(1))
, кажется, ждет вечно и никогда не будет решена. Где я сделал не так? Должен ли я использовать другой планировщик или что-нибудь, что помогает?