EmitterProcessor, использующий zip и last (значение по умолчанию), никогда не выдает значения - PullRequest
0 голосов
/ 20 января 2020

Я пытаюсь использовать zip вместе с операторами last, чтобы объединить три разных EmitterProcessor и обработать объединенный результат. Это отлично работает, как определено ниже, за исключением сценария, когда один из процессоров не испускает элемент. Насколько я понимаю, оператор last(defaultValue) состоит в том, что он будет выдавать значение по умолчанию, когда EmitterProcessor завершится, даже не выдав значение. Это, в свою очередь, должно завершить zip и отправить значение по умолчанию вместе с любыми фактическими значениями от других процессоров. Проблема в том, что zip никогда не завершается после выполнения вызова complete на всех процессорах. Я что-то упустил или в коде Reactor есть ошибка?

EmitterProcessor.zip(
    emitter1.last("Missing Value 1"),
    emitter2.last("Missing Value 2"),
    emitter3.last("Missing Value 3")
).subscribe {
    logger.error(it.t1)
    logger.error(it.t2)
    logger.error(it.t3)
}

// Later in the code I call complete on all emitters
emitter1.sink().complete()
emitter2.sink().complete()
emitter3.sink().complete()

1 Ответ

0 голосов
/ 22 января 2020

Может быть, последняя возможность решит вашу проблему. Обратите внимание, что вы должны как минимум выдавать 1 значение каждому процессору

   EmitterProcessor<String> emitter1= EmitterProcessor.create();
    EmitterProcessor<String> emitter2= EmitterProcessor.create();
    EmitterProcessor<String> emitter3= EmitterProcessor.create();

    EmitterProcessor.zip(
            emitter1.defaultIfEmpty("Missing Value 1"),
            emitter2.defaultIfEmpty("Missing Value 2"),
            emitter3.defaultIfEmpty("Missing Value 3")
    ).log();
EmitterProcessor.combineLatest(emitter1, emitter2, emitter3, objects -> {
return objects;
}).log().subscribe();

    emitter1.onNext("a");
    emitter2.onNext("a");

    emitter3.onNext("a");

    emitter1.onNext("a");
    emitter1.onNext("a");
    emitter1.onNext("a");
    emitter1.onNext("a");
    emitter1.onNext("a");

   // Later in the code I call complete on all emitters
    emitter1.sink().complete();
    emitter2.sink().complete();
    emitter3.sink().complete();
...