Бесконечный цикл с asyncio и rxpy - PullRequest
1 голос
/ 28 мая 2019

Вот что я пытаюсь достичь:

  1. Создание 2 независимых объектов, которые возвращают асинхронные сообщения
  2. Преобразование сообщений в Observable

Функции длинные, поэтому каждый объект находится в отдельном файле. Вот базовая структура моих объектов:

class Object:
    def message(self, observer, scheduler):
        _task = None

        def _teardown():
            if _task:
                _task.cancel()
            observer.on_completed()

        async def _loop():
            while True:
                #DO STUFF
                observer.on_next(message)

        async def _run_loop():
            try:
                await _loop()
            except asyncio.CancelledError:
                print('error')
            finally:
                _teardown()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        _task = loop.create_task(_run_loop())
        loop.run_forever()

Вот код для RXPY:

object1 = Object1()

object1_observable = defer(lambda _: create(object1.message)).pipe(
    op.observe_on(EventLoopScheduler()),
    op.share()
)

object1_observable.subscribe(lambda value: print('Object1 Observer Received'))

object2 = Object2()

object2_observable = defer(lambda _: create(object2.message)).pipe(
    op.observe_on(EventLoopScheduler()),
    op.share()
)

object2_observable.subscribe(lambda value: print('Object2 Observer Received'))

Вопрос 1: С помощью этого кода я получаю сообщения только от object1. Как мне адаптировать этот код, чтобы он работал и получать сообщения от обоих объектов?

Вопрос 2: Я хотел бы, чтобы сообщения от object1 были параметрами для другого класса, скажем, Object3. Object3 тогда будет слушать Object2. Для каждого сообщения от Object2 Object3 будет что-то делать. И для каждого сообщения от Object1 атрибуты Object3 будут меняться. Однако атрибуты не должны изменяться, пока Object3 не будет завершено с последним сообщением от Object2. Я понятия не имею, как это сделать с RXPY. Есть идеи?

Обновление С протекторами код работает. Я не уверен, что это хороший способ продолжить. Я обновлю его с правильным процессом убийства, и я все еще работаю над вопросом 2

class Object:
    def message(self, observer, scheduler):
        def _loop():
            while True:
                #DO STUFF
                observer.on_next(message)

        thread = threading.Thread(target=_loop)
        thread.daemon = False
        thread.start()
...