Я пытаюсь использовать zip
вместе с операторами last
, чтобы объединить три разных EmitterProcessor
и обработать объединенный результат. Это отлично работает, как определено ниже, за исключением сценария, когда один из процессоров не испускает элемент. Насколько я понимаю, оператор last(defaultValue)
состоит в том, что он будет выдавать значение по умолчанию, когда EmitterProcessor
завершится, даже не выдав значение. Это, в свою очередь, должно завершить zip
и отправить значение по умолчанию вместе с любыми фактическими значениями от других процессоров. Проблема в том, что zip никогда не завершается после выполнения вызова complete
на всех процессорах. Я что-то упустил или в коде Reactor есть ошибка?
EmitterProcessor.zip(
emitter1.last("Missing Value 1"),
emitter2.last("Missing Value 2"),
emitter3.last("Missing Value 3")
).subscribe {
logger.error(it.t1)
logger.error(it.t2)
logger.error(it.t3)
}
// Later in the code I call complete on all emitters
emitter1.sink().complete()
emitter2.sink().complete()
emitter3.sink().complete()