Flux.buffer () не работает с switchIfEmpty - PullRequest
0 голосов
/ 01 ноября 2019

У меня есть сценарий, в котором я получу список сущностей из DB, используя

repository.getAllByIds(ids)

, который вернет Flux<Entity>

, если поток тогда пустмне нужно позвонить handleAllEntitiesNotFound() иначе мне нужно позвонить handleNotFoundEntities()

repository.getAllByIds(ids)
                .buffer()
                .switchIfEmpty(__ -> handleAllEntitiesNotFound(ids, erroneousEntities))
                .flatMap(list -> handleNotFoundEntities(list))


private Flux<Entity> handleAllEntitiesNotFound(List<String> ids, List<ResponseError> erroneousEntities) {
    Flux.fromIterable(ids).subscribe(id -> erroneousEntities.add(new ResponseError("Not Found", "Not Found", id)));
    return Flux.empty();
}

Я использую buffer(), чтобы собрать список в Flux<List<Entity>>

Проблема в том, когдая вызываю службу, она останавливается, нет ответа, нет журналов, ничего нет, если я удалил строку .switchIfEmpty(__ -> handleAllEntitiesNotFound(ids, erroneousEntities)), она работает и возвращает ответ, но без обработки handleAllEntitiesNotFound

В чем может быть проблема при использовании buffer() с switchIfEmpty()

1 Ответ

1 голос
/ 01 ноября 2019

Я думаю, что вы пришли к неправильному выводу - buffer() и switchIfEmpty() работают без проблем вместе:

Flux.empty()
        .buffer()
        .switchIfEmpty(Mono.just(List.of(1)))
        .subscribe(System.out::println); //Prints "[1]"

Однако ваш handleAllEntitiesNotFound() метод очень подозрительный. Похоже, вы переходите в существующий список, создаете новый Flux, чтобы добавить к нему, а затем возвращаете пустой поток. Пример не может быть выполнен, поэтому невозможно сузить точную причину , но есть несколько моментов, которые вполне могут быть виновником (индивидуально или в тандеме):

  • Мутирование существующего объекта, который передается в реактивный поток, обычно считается плохой формой. Намного проще и безопаснее вернуть новый список (и вы можете объединить этот список с другим, если хотите, когда реактивный поток завершится.)
  • Вы просто создаете Fluxчитать из одного списка и добавлять элементы в другой. Это сбивает с толку, и имеет мало смысла. Просто используйте стандартные потоки Java (например, ids.stream().map(id -> new ResponseError("Not Found", "Not Found", id)).collect(Collectors.toList()).)
  • . Вы возвращаете Flux.empty(), поэтому почти наверняка нет ответа. Обычно можно ожидать, что switchIfEmpty() вернет непустой поток, если только вы не намеренно используете его как побочный эффект.
  • handleNotFoundEntities кажется странным выбором имени для метода, которыйкажется, что переданы сущности, которые были найдены.
...