Непрерывное восстановление состояния с помощью Flux - PullRequest
0 голосов
/ 27 марта 2019

Допустим, у меня есть два типа событий (A и B) и Flux es, которые их как-то генерируют:

Flux<A> aFlux = ...;
Flux<B> bFlux = ...;

, а также тип, который содержит текущее состояние, обозначенное типомS:

class S {
  final int val;
}

Я хочу создать следующее:

final S sInitial = ...;

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
  .scan((a, e) -> {
    if(e instanceof A) {
      return mapA(a, (A)e);
    } else if(e instanceof B) {
      return mapB(a, (B)e);
    } else {
      throw new RuntimeException("invalid event");
    }
  })
  .startWith(sInitial);

, где sCurr - это экземпляр S, который последний раз выводился sFlux, начиная с sInitial и mapA / mapB возвращают новое значение типа SS, и sInitial являются неизменяемыми.

То есть я хочу:

  • Постоянно выводить последнее состояние ...
  • ...который генерируется ...
  • ... на основе текущего состояния и полученного события ...
  • ... как предписано функциями отображения

Есть ли способ реорганизовать вышеуказанный поток потока другим способом, особенно во избежание использования instanceof?

1 Ответ

0 голосов
/ 27 марта 2019

Вы можете добавить интерфейс и реализовать его для ваших классов A и B

interface ToSConvertible {
    S toS(S s);
}

Теперь вы можете использовать reactor.core.publisher.Flux#scan(A, java.util.function.BiFunction<A,? super T,A>) метод:

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
        .scan(sInitial, (s, e) -> e.toS(s));
...