Разделение, а затем объединение потоков с RX - PullRequest
0 голосов
/ 23 февраля 2020

Я ищу какой-нибудь лучший метод или шаблон для обработки следующего случая:

У меня есть два потока, которые я хочу объединить следующим образом:

                               |-------------|
                               | Transform A |------------>|---------|
              |---------|----->|-------------|             |         |
Stream A----->| Split A |                                  | Combine |-------> ?
              |---------|----->|--------------------|      |   All   |
                               | Transform and      |      |         |
                               | Combine A (latest) |----->|---------|
                               | and B (latest)     |
Stream B---------------------->|--------------------|

Поток A и B генерируют события асинхронно. Я хотел бы объединить последние из A и B и объединить результат с результатом вычисления над A.

Когда получено событие в B, весь конвейер запускается со значением из этого события и самое последнее из A.

Существует ли элегантный способ убедиться, что при получении события в A Combine All запускается с событиями, основанными на этом событии от A и избежать условия гонки между Преобразование A и Преобразование и объединение A и B

Ответы [ 3 ]

1 голос
/ 25 февраля 2020

Вы можете сделать это, объединив однопоточный планировщик с оператором debounce:

class ManualExecutor : Executor {
    private val tasks = ArrayDeque<Runnable>()

    override fun execute(command: Runnable) = tasks.push(command)

    fun runAllTasks() {
        while (tasks.isNotEmpty()) {
            tasks.pop().run()
        }
    }
}

val a: Observable<A>
val b: Observable<B>

val scheduler = Schedulers.from(ManualExecutor())

val aTransformed = a.observeOn(scheduler).map { transformA(it) }
val aCombinedWithB = combine(a, b).observeOn(scheduler)

val final = combine(aTransformed, aCombinedWithB).debounce(0)

// some time later....
emitA() // now all the updates are queued in our ManualExecutor
scheduler.runAllTasks() // final will only emit once, not twice!

Конечно, это не скомпилируется из коробки, и вам придется возиться с планировщиком и тестами чтобы понять это правильно, но, возможно, идея поможет. Если использование debounce с «нулевым» тайм-аутом кажется слишком хакерским, вы также можете использовать другую сигнатуру, которая обеспечивает полный контроль над периодом debounce с наблюдаемым значением.

Однако, если вы не используете его по назначению. в случае, указанном c только для вышеперечисленного, то вы можете упростить проблему с помощью чего-то подобного:

              |-----------------------|
Stream A----->| map A to              |
              | pair(A, transform(A)) |                 
              |-----------------------|                 
                        |                               
                        |----->|--------------------|   
                               | Combine A, t(A)    |   
                               | and B (latest)     |--> ?
                               | into 3-tuple       |
Stream B---------------------->|--------------------|
0 голосов
/ 03 марта 2020

Основываясь на последнем комментарии @tonicsoft, идея состоит в том, чтобы иметь что-то подобное:

Flowable<Pair<Long, String>> streamPairA = Flowable.interval(3, SECONDS)
                                                   .doOnNext(l -> System.out.println("new value of A: " + l))
                                                   .map(l -> new Pair<>(l, "Hello " + l));
Flowable<Long> streamB = Flowable.interval(5, SECONDS)
                                 .doOnNext(l -> System.out.println("new value of B: " + l));

Flowable.combineLatest(streamPairA, streamB, (x, y) -> "--> " + x.value()
                                                        + " (latest A: " + x.key() + ", latest B: " + y + ")")
        .subscribe(System.out::println);

Flowable.timer(1, MINUTES) // Just to block the main thread for a while
        .blockingSubscribe();
0 голосов
/ 24 февраля 2020

Использование Observable.Publish(source, x => ...) гарантирует, что у вас есть только одна подписка на source, но вы можете использовать ее в качестве переменной x столько раз, сколько захотите. Это гарантирует, что вы никогда не получите никаких условий гонки на source.

. Вы можете использовать его так:

Func<A, C> transform_A = a => ...;
Func<A, B, D> combine_A_and_B = (a, b) => ...;
Func<C, D, E> combine_all = (c, d) => ...;

IObservable<A> stream_a = ...;
IObservable<B> stream_b = ...;

IObservable<E> query =
    stream_a
        .Publish(stream_a_published =>
        {
            var stream_c = stream_a_published.Select(transform_A);
            var stream_d = stream_a_published.CombineLatest(stream_b, combine_A_and_B);
            return stream_c.CombineLatest(stream_d, combine_all);
        });
...