Я создал издателя с использованием метода Flux.create. Другой поток отправляет данные в FluxSink, данные преобразуются и отправляются в систему обмена сообщениями. Этот поток должен быть вокруг, пока приложение работает. Вот код:
public MyClass {
private final FluxSink<Request> requestsSink;
private final Flux<Request> flux;
MyClass() {
this.flux = Flux.<Request>create(sink -> this.requestsSink = sink)
.bufferTimeout(props.getMaxSize(), props.getMaxTime())
.flatMap(this::mergeRequests)
.flatMap(
req ->
remoteApi
.getData(req)
.onErrorResume(t -> Mono.empty()))
.doOnNext(this::sendToTopic)
.onErrorResume(
t -> {
log.warn("error", t);
return Flux.empty();
})
.subscribe();
}
public void pushData(Request request) {
requestsSink.next(request);
}
... other methods
}
Итак, мне все равно, если где-то произойдет ошибка, я просто хочу зарегистрировать ее и продолжить обработку других записей.
По какой-то причине этопоток get утилизируется при неизвестных обстоятельствах (isDisposed возвращает true). Это происходит при высокой нагрузке.
- Как предотвратить отмену / утилизацию / отделку флюса?
- В каких условиях флюс можно отменить / утилизировать / завершить?
- Как я могу отладить / зарегистрировать поток, чтобы понять причину отмены / удаления?