У меня есть поток, который должен испускать предмет почти сразу. После этого он может не излучать предмет в течение длительного периода времени. Я хочу, чтобы время ожидания истекло, если элемент не был получен изначально. Но если я использую метод timeout(Duration)
, он будет зависать каждый раз, когда в данный период времени не будет получено ни одного элемента.
У меня сейчас есть код, который не работает по вышеуказанной причине:
messageFlux.timeout(Duration.ofSeconds(30)).doOnError(e -> {
// handle error
}).subscribe(m -> messageService.consumeMessage(m));
Есть ли способ сделать это эффективно?