Я создаю торговое приложение, используя RxPy.Мне нужно подписаться на несколько бесконечных Observables одновременно, но любой вызов подписки препятствует выполнению остальных подписок.
from threading import Lock
import multiprocessing
thread = None
thread_lock = Lock()
thread_count = multiprocessing.cpu_count() + 1
pool_scheduler = ThreadPoolScheduler(thread_count)
def background_thread():
Observable \
.interval(1000) \
.subscribe_on(pool_scheduler) \
.from_(symbols) \
.repeat() \
.map(lambda t: getQuote(t)) \
.subscribe(lambda q: processQuote(q))
print('Never gets here')
Observable \
.interval(10000) \
.subscribe_on(pool_scheduler) \
.from_(symbols) \
.repeat() \
.map(lambda t: getChart(t, None, None, None)) \
.subscribe(lambda q: processChart(q))
Я попытался обернуть две Observables в Observable.merge(obs1, obs2)
, но этотоже не работалУ меня все еще есть активность на первом Наблюдаемом.
Каждый пример, который я нахожу, имеет дело с одним Наблюдаемым.Что вы делаете, когда вам нужно подписаться на несколько Observables, которые все транслируются одновременно?Я ожидаю, что в конечном итоге потребуется от шести до десяти потоков.