Я использую Heartbeat для конечной точки WebFlux SSE. Чтобы избежать тайм-аута в клиенте, я хочу убедиться, что элемент излучается, по крайней мере, каждые, например, 10 секунд.
Я пришел к следующему решению, которое генерирует элемент heartbeat каждые 10 секунд независимо отбыл ли испущен реальный элемент:
originalFlux.mergeWith(Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(10)).map(ignored -> "heartbeat")
Это, вероятно, достаточно хорошо для моего случая использования, но все же я задаюсь вопросом, возможно ли испускать сердцебиение, только если реальный элемент не был испущен впоследние 10 секундЯ поигрался с оператором timeout
, который реализует именно то поведение синхронизации, которое я ищу, но которое выдает ошибку и отменяет originalFlux
вместо простого выброса дополнительного элемента.
Следующий код с использованием timeout
проходит мой тест, но выглядит слишком сложным и, насколько я понимаю, может потерять элементы из originalFlux
, если они излучаются между отменой и повторной подпиской на него:
ConnectableFlux<String> sharedOriginalFlux = originalFlux.publish();
CompletableFuture<Disposable> eventualSubscription = new CompletableFuture<>();
return addHeartbeat(sharedOriginalFlux)
.doOnSubscribe(ignored -> eventualSubscription.complete(sharedOriginalFlux.connect()))
.doFinally(ignored -> eventualSubscription.thenAccept(Disposable::dispose))
private Flux<String> addHeartbeat(Flux<String> sharedOriginalFlux) {
return sharedOriginalFlux.timeout(
Duration.ofSeconds(10),
Flux.mergeSequential(
Mono.just("heartbeat"),
Flux.defer(() -> addHeartbeat(sharedOriginalFlux))));
}
Есть ли простой ибезопасный способ сделать это?