Как я могу заставить Flux излучать дополнительный элемент, если ни один элемент не был выпущен в течение определенного времени? - PullRequest
1 голос
/ 30 октября 2019

Я использую Heartbeat для конечной точки WebFlux SSE. Чтобы избежать тайм-аута в клиенте, я хочу убедиться, что элемент излучается, по крайней мере, каждые, например, 10 секунд.

Я пришел к следующему решению, которое генерирует элемент heartbeat каждые 10 секунд независимо отбыл ли испущен реальный элемент:

originalFlux.mergeWith(Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(10)).map(ignored -> "heartbeat")

Это, вероятно, достаточно хорошо для моего случая использования, но все же я задаюсь вопросом, возможно ли испускать сердцебиение, только если реальный элемент не был испущен впоследние 10 секундЯ поигрался с оператором timeout, который реализует именно то поведение синхронизации, которое я ищу, но которое выдает ошибку и отменяет originalFlux вместо простого выброса дополнительного элемента.

Следующий код с использованием timeout проходит мой тест, но выглядит слишком сложным и, насколько я понимаю, может потерять элементы из originalFlux, если они излучаются между отменой и повторной подпиской на него:

ConnectableFlux<String> sharedOriginalFlux = originalFlux.publish();
CompletableFuture<Disposable> eventualSubscription = new CompletableFuture<>();
return addHeartbeat(sharedOriginalFlux)
    .doOnSubscribe(ignored -> eventualSubscription.complete(sharedOriginalFlux.connect()))
    .doFinally(ignored -> eventualSubscription.thenAccept(Disposable::dispose))

private Flux<String> addHeartbeat(Flux<String> sharedOriginalFlux) {
    return sharedOriginalFlux.timeout(
        Duration.ofSeconds(10),
        Flux.mergeSequential(
            Mono.just("heartbeat"),
            Flux.defer(() -> addHeartbeat(sharedOriginalFlux))));
}

Есть ли простой ибезопасный способ сделать это?

1 Ответ

1 голос
/ 30 октября 2019

Это не обязательно проще , но другой вариант может заключаться в создании отдельного процессора, который может обернуть оригинальную Flux, чтобы обеспечить сердцебиение (которое не должно пропустить ни одного элемента):

public class HeartbeatProcessor<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;
    private final T heartbeatValue;
    private final Duration heartbeatPeriod;
    private Disposable d;

    public HeartbeatProcessor(Flux<T> orig, T heartbeatValue, Duration heartbeatPeriod) {
        this.heartbeatValue = heartbeatValue;
        this.heartbeatPeriod = heartbeatPeriod;
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
        this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
        orig.subscribe(this::emit);
    }

    private void emit(T val) {
        sink.next(val);
        d.dispose();
        this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
    }

    public Flux<T> getFlux() {
        return processor;
    }
}

Вы можете затем назвать это следующим образом:

new HeartbeatProcessor<>(elements, "heartbeat", Duration.ofSeconds(10))
        .getFlux()
        .subscribe(System.out::println);
...