Как ждать операции on_next () над темой в rxpy? - PullRequest
0 голосов
/ 16 апреля 2019

У меня есть субъект subj = Subject(), чей on_next(event) будет вызываться асинхронно в другом потоке через

observable.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler())

Теперь я хочу прослушать операцию on_next(), поэтому я буду знать, что естьновое событие, добавляемое к subj, как мне это сделать?

Я ожидаю что-то вроде

observable_stream = rx.from_(grpc_stream)
subj = Subject()

observable_stream.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler()) 
// this will be ran in a different thread I assume? the observable_stream is from grpc and will receive event data continously

await subj.pipe(op.take(1))  
// I want to wait until the observable_stream has any new event being "on_next(event)" to subj, in the meanwhile subj.on_next(event) will be executed the thread on the previous line...

other functions to be executed...

Так что я могу ждать первого события, добавляемого к subj до тех пор, пока не будут выполнены другие задачи, которые зависят от создания первого события.

Код, который я пробовал выше, на самом деле не работает, так как меня заблокировали на await subj.pipe(op.take(1)), я почти уверен, что observable_stream имеетполученные события, и on_next(event) был прекрасно выполнен с NewThreadScheduler, поэтому при выполнении кода блока не будет.Тем не менее, линия await subj.pipe(op.take(1)), кажется, ждет вечно и никогда не будет решена.Где я сделал не так?Должен ли я использовать другой планировщик или что-нибудь, что помогает?

...