Вот что я пытаюсь достичь:
- Создание 2 независимых объектов, которые возвращают асинхронные сообщения
- Преобразование сообщений в Observable
Функции длинные, поэтому каждый объект находится в отдельном файле. Вот базовая структура моих объектов:
class Object:
def message(self, observer, scheduler):
_task = None
def _teardown():
if _task:
_task.cancel()
observer.on_completed()
async def _loop():
while True:
#DO STUFF
observer.on_next(message)
async def _run_loop():
try:
await _loop()
except asyncio.CancelledError:
print('error')
finally:
_teardown()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_task = loop.create_task(_run_loop())
loop.run_forever()
Вот код для RXPY:
object1 = Object1()
object1_observable = defer(lambda _: create(object1.message)).pipe(
op.observe_on(EventLoopScheduler()),
op.share()
)
object1_observable.subscribe(lambda value: print('Object1 Observer Received'))
object2 = Object2()
object2_observable = defer(lambda _: create(object2.message)).pipe(
op.observe_on(EventLoopScheduler()),
op.share()
)
object2_observable.subscribe(lambda value: print('Object2 Observer Received'))
Вопрос 1: С помощью этого кода я получаю сообщения только от object1. Как мне адаптировать этот код, чтобы он работал и получать сообщения от обоих объектов?
Вопрос 2: Я хотел бы, чтобы сообщения от object1 были параметрами для другого класса, скажем, Object3. Object3 тогда будет слушать Object2. Для каждого сообщения от Object2 Object3 будет что-то делать. И для каждого сообщения от Object1 атрибуты Object3 будут меняться. Однако атрибуты не должны изменяться, пока Object3 не будет завершено с последним сообщением от Object2. Я понятия не имею, как это сделать с RXPY. Есть идеи?
Обновление
С протекторами код работает. Я не уверен, что это хороший способ продолжить.
Я обновлю его с правильным процессом убийства, и я все еще работаю над вопросом 2
class Object:
def message(self, observer, scheduler):
def _loop():
while True:
#DO STUFF
observer.on_next(message)
thread = threading.Thread(target=_loop)
thread.daemon = False
thread.start()