Предел для `onErrorContinue (...)` в Flux? - PullRequest
2 голосов
/ 08 января 2020

У меня есть (возможно, бесконечный) источник Flux, который должен сначала сохранять каждое сообщение (например, в базе данных), а затем асинхронно пересылать сообщения (например, используя Spring WebClient).

forward (s) в случае сбоя должны регистрировать ошибку без завершения источника Flux.

Однако я понял, что forward (s) с потоком (flatMap(...)) блокируют выполнение источника Flux после ровно 256 сообщений, которые вызывают исключения (например, reactor.retry.RetryExhaustedException).

Репрезентативный пример сбоя в утверждении, поскольку обрабатываются только 256 сообщений:

@Test
@SneakyThrows
public void sourceBlockAfter256Exceptions() {
    int numberOfRequests = 500;
    Set<Integer> sink = new HashSet<>();
    Flux
            .fromStream(IntStream.range(0, numberOfRequests).boxed())
            .map(sink::add)
            .flatMap(i -> Mono
                    // normally the forwards are contained here e.g. by means of Mono.when(...).thenReturn(...).retryWhen(...):
                    .error(new Exception("any"))
            )
            .onErrorContinue((throwable, o) -> log.error("Error", throwable))
            .subscribe();

    Thread.sleep(3000);
    Assertions.assertEquals(numberOfRequests, sink.size());
}

Выполнение пересылки в пределах subscribe(...) не блокирует источник Flux, но это, безусловно, не решение, поскольку я не хочу терять сообщения.

Вопросы:

  • Что произошло Вот? (возможно, связано с некоторым состоянием, хранящимся всего в одном бите)
  • Как я могу сделать это правильно?

РЕДАКТИРОВАТЬ:

Согласно приведенному ниже обсуждению я построил пример, который использует FluxMessageChannel (который, насколько я понимаю, сделан для бесконечных потоков) и точно не ожидается блокирование после 256 ошибок) и имеет точно такое же поведение:

@Test
@SneakyThrows
public void maxConnectionWithChannelTest() {
    int numberOfRequests = 500;
    Set<Integer> sink = new HashSet<>();

    FluxMessageChannel fluxMessageChannel = MessageChannels.flux().get();

    fluxMessageChannel.subscribeTo(
            Flux
                    .fromStream(IntStream
                            .range(0, numberOfRequests).boxed()
                            .map(i -> MessageBuilder.withPayload(i).build())
                    )
                    .map(Message::getPayload)
                    .map(sink::add)
                    .flatMap(i -> Mono.error(new Exception("whatever")))
    );

    Flux
            .from(fluxMessageChannel)
            .subscribe();

    Thread.sleep(3000);
    Assert.assertEquals(numberOfRequests, sink.size());
}

РЕДАКТИРОВАТЬ:

Я только что поднял проблему в проекте активной зоны реактора : https://github.com/reactor/reactor-core/issues/2011

...