Почему операция on_next () субъекта в NewThreadScheduler не влияет на его поведение в основном потоке? - PullRequest
0 голосов
/ 17 апреля 2019

У меня есть субъект subj = Subject(), чей on_next(event) будет вызываться асинхронно в другом потоке через NewThreadScheduler

observable.subscribe(lambda event: subj.on_next(event), scheduler=NewThreadScheduler())

Теперь я хочу прослушать операцию on_next(), чтобы знать, что к subj добавляется новое событие. Как мне это сделать?

Я сейчас пишу как,

subj = ReplaySubject()
scheduler = NewThreadScheduler()

# this is an Observable created from grpc stream
grpc_stream: Observable = TreeLabClient.subscribe_to_workspace(subscription_input)

def subscribe_func(e):
    subj.on_next(e)
    print('completed on_next', subj, e)

grpc_dispose: Disposable = grpc_stream.subscribe(subscribe_func, scheduler=scheduler)

# creating an event to grpc_stream (not immediately), after a while I can see screen print out 'completed on_next' as written the subscribe_func 
send_event()

async def a():
    print('waiting...', subj)

    # it blocks here and never get resolved, it is expected to be resolved as long as subj has on_next() called on it
    return await subj.pipe(op.take(1))

asyncio.run(a())

Таким образом, я могу ждать, пока первое событие будет добавлено в subj, пока не выполнятся другие задачи, которые зависят от создания первого события, через send_event().

Код, который я пробовал выше, на самом деле не работает, так как меня заблокировали на await subj.pipe(op.take(1)), я почти уверен, что grpc_stream получил события, и on_next(event) был отлично выполнен с NewThreadScheduler, так что победил не быть блоком, когда я выполняю код. Тем не менее, линия await subj.pipe(op.take(1)), кажется, ждет вечно и никогда не будет решена. Где я сделал не так? Должен ли я использовать другой планировщик или что-нибудь, что помогает?

...