У меня есть (возможно, бесконечный) источник Flux
, который должен сначала сохранять каждое сообщение (например, в базе данных), а затем асинхронно пересылать сообщения (например, используя Spring WebClient
).
forward (s) в случае сбоя должны регистрировать ошибку без завершения источника Flux
.
Однако я понял, что forward (s) с потоком (flatMap(...)
) блокируют выполнение источника Flux
после ровно 256 сообщений, которые вызывают исключения (например, reactor.retry.RetryExhaustedException
).
Репрезентативный пример сбоя в утверждении, поскольку обрабатываются только 256 сообщений:
@Test
@SneakyThrows
public void sourceBlockAfter256Exceptions() {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
Flux
.fromStream(IntStream.range(0, numberOfRequests).boxed())
.map(sink::add)
.flatMap(i -> Mono
// normally the forwards are contained here e.g. by means of Mono.when(...).thenReturn(...).retryWhen(...):
.error(new Exception("any"))
)
.onErrorContinue((throwable, o) -> log.error("Error", throwable))
.subscribe();
Thread.sleep(3000);
Assertions.assertEquals(numberOfRequests, sink.size());
}
Выполнение пересылки в пределах subscribe(...)
не блокирует источник Flux
, но это, безусловно, не решение, поскольку я не хочу терять сообщения.
Вопросы:
- Что произошло Вот? (возможно, связано с некоторым состоянием, хранящимся всего в одном бите)
- Как я могу сделать это правильно?
РЕДАКТИРОВАТЬ:
Согласно приведенному ниже обсуждению я построил пример, который использует FluxMessageChannel
(который, насколько я понимаю, сделан для бесконечных потоков) и точно не ожидается блокирование после 256 ошибок) и имеет точно такое же поведение:
@Test
@SneakyThrows
public void maxConnectionWithChannelTest() {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
FluxMessageChannel fluxMessageChannel = MessageChannels.flux().get();
fluxMessageChannel.subscribeTo(
Flux
.fromStream(IntStream
.range(0, numberOfRequests).boxed()
.map(i -> MessageBuilder.withPayload(i).build())
)
.map(Message::getPayload)
.map(sink::add)
.flatMap(i -> Mono.error(new Exception("whatever")))
);
Flux
.from(fluxMessageChannel)
.subscribe();
Thread.sleep(3000);
Assert.assertEquals(numberOfRequests, sink.size());
}
РЕДАКТИРОВАТЬ:
Я только что поднял проблему в проекте активной зоны реактора : https://github.com/reactor/reactor-core/issues/2011