Spring-WebFlux Flux не работает с Context - PullRequest
1 голос
/ 08 апреля 2019

Я хочу использовать Context в моей трубе Flux для обхода фильтрации.

Вот что у меня есть:

public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
        )
                .filter(l ->
                        l.getT3() ||
                        (!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
                .map(Tuple2::getT1)
                .log()
                .map(this::
}

который является входом для этого:

    public void setRealtime(Flux<Bar> input) {
        Flux.zip(input, Mono.subscriberContext())
                .doOnComplete(() -> {
...
   })
                .doOnNext(t -> {
...
    })
    .subscribe()
}

Я могу сказать, что мой код в ... не вызывает ошибок, я могу даже получить доступ к карте Context, но когда первая итерация завершится, я получу:

onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})

и абонент отключается.

Итак, мой вопрос: правильно ли я это использую и в чем здесь проблема?

EDIT:

Я пытался repeat() Mono.subscriberContext(), когда я использую значение из него:

        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext()
                        .map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
        )
                .filter(l ->
                        l.getT3().get() ||
                                (!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
                .map(Tuple2::getT1)

и установите AtomicBoolean в контекст на стороне подписчика и просто измените значение внутри этой ссылки на переменную, когда мне понадобится сигнал в восходящем потоке, но он совсем не изменится:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> {
                    System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
                    // Analysis
                    System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
                })
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
                .subscribe();

по крайней мере с повтором Flux не отключается, но значение, которое я получаю, не обновляется. Других подсказок у меня нет.

Spring-webflux: 2.1.3.RELEASE

1 Ответ

0 голосов
/ 11 мая 2019

это работает:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> { ... }
                .flatMap(bar -> Mono.subscriberContext()
                        .map(c -> Tuples.of(bar, c)))
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
                .subscribe();

, поэтому нужно установить AtomicBoolean в моем случае как текст и затем извлечь эту переменную из контекста, если вы хотите изменить ее значение.то же самое на восходящем потоке.

...