Я хочу использовать Context
в моей трубе Flux для обхода фильтрации.
Вот что у меня есть:
public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
)
.filter(l ->
l.getT3() ||
(!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
.map(Tuple2::getT1)
.log()
.map(this::
}
который является входом для этого:
public void setRealtime(Flux<Bar> input) {
Flux.zip(input, Mono.subscriberContext())
.doOnComplete(() -> {
...
})
.doOnNext(t -> {
...
})
.subscribe()
}
Я могу сказать, что мой код в ...
не вызывает ошибок, я могу даже получить доступ к карте Context
, но когда первая итерация завершится, я получу:
onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})
и абонент отключается.
Итак, мой вопрос: правильно ли я это использую и в чем здесь проблема?
EDIT:
Я пытался repeat()
Mono.subscriberContext()
, когда я использую значение из него:
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext()
.map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
)
.filter(l ->
l.getT3().get() ||
(!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
.map(Tuple2::getT1)
и установите AtomicBoolean
в контекст на стороне подписчика и просто измените значение внутри этой ссылки на переменную, когда мне понадобится сигнал в восходящем потоке, но он совсем не изменится:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> {
System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
// Analysis
System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
})
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
.subscribe();
по крайней мере с повтором Flux
не отключается, но значение, которое я получаю, не обновляется. Других подсказок у меня нет.
Spring-webflux: 2.1.3.RELEASE