У меня есть субъект subj = Subject()
, чей on_next(event)
будет вызываться асинхронно в другом потоке через
observable.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler())
Теперь я хочу прослушать операцию on_next()
, поэтому я буду знать, что естьновое событие, добавляемое к subj
, как мне это сделать?
Я ожидаю что-то вроде
observable_stream = rx.from_(grpc_stream)
subj = Subject()
observable_stream.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler())
// this will be ran in a different thread I assume? the observable_stream is from grpc and will receive event data continously
await subj.pipe(op.take(1))
// I want to wait until the observable_stream has any new event being "on_next(event)" to subj, in the meanwhile subj.on_next(event) will be executed the thread on the previous line...
other functions to be executed...
Так что я могу ждать первого события, добавляемого к subj
до тех пор, пока не будут выполнены другие задачи, которые зависят от создания первого события.
Код, который я пробовал выше, на самом деле не работает, так как меня заблокировали на await subj.pipe(op.take(1))
, я почти уверен, что observable_stream
имеетполученные события, и on_next(event)
был прекрасно выполнен с NewThreadScheduler
, поэтому при выполнении кода блока не будет.Тем не менее, линия await subj.pipe(op.take(1))
, кажется, ждет вечно и никогда не будет решена.Где я сделал не так?Должен ли я использовать другой планировщик или что-нибудь, что помогает?