RxJava: Как создать граф задач / рабочий процесс rx.Single / rx.Observable? - PullRequest
0 голосов
/ 18 сентября 2018

У меня есть rx.Singles (или rx.Observables), которые должны быть выполнены в порядке зависимости, как показано в этом DAG (Directed-Acyclic-Graph):

Singles Dependency Graph

  • one должно быть выполнено первым
  • two, а three должно быть выполнено только после завершения one
  • two иthree может выполняться параллельно
  • four должен выполняться только после завершения two
  • five должен выполняться только после завершения two и three
  • four и five могут выполняться параллельно
  • six должно быть выполнено наконец, после four и five.

Как лучше всего этого добиться?

Следующее решение ( Solution # 1 ) работает как положено:

package rxtest;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;


public class ObservablesDagTest {
    private static final Logger logger = LoggerFactory.getLogger(ObservablesDagTest.class);
    private static Executor customExecutor = Executors.newFixedThreadPool(20);

    @Test
    public void dagTest() {
        Single<Integer> one = createSingle(1, 100);
        Single<Integer> two = createSingle(2, 200);
        Single<Integer> three = createSingle(3, 500);
        Single<Integer> four = createSingle(4, 400);
        Single<Integer> five = createSingle(5, 200);
        Single<Integer> six = createSingle(6, 150);

        executeDag(one, two, three, four, five, six);
    }

    private void executeDag(Single<Integer> one, Single<Integer> two, Single<Integer> three, Single<Integer> four,
            Single<Integer> five, Single<Integer> six) {
        logger.info("BEGIN");

        Single<Integer> twoCached = two.toObservable().cache().toSingle();

        Observable.concat(
                one.toObservable(),
                Observable.merge(
                        Single.concat(twoCached, four),
                        Observable.concat(Single.merge(twoCached, three), five.toObservable())),
                six.toObservable())
                .toBlocking()
                .subscribe(i -> logger.info("Received : " + i));

        logger.info("END");
    }

    private Single<Integer> createSingle(int j, int sleepMs) {
        Single<Integer> single = Single.just(j)
                .flatMap(i -> Single.<Integer> create(s -> {
                    logger.info("onSubscribe : {}", i);
                    sleep(sleepMs);
                    s.onSuccess(i);
                }).subscribeOn(Schedulers.from(customExecutor)));
        return single;
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
        }
    }
}

Вывод

21:16:23.879 [main] INFO rxtest.ObservablesDagTest BEGIN
21:16:23.974 [pool-1-thread-1] INFO rxtest.ObservablesDagTest onSubscribe : 1
21:16:24.079 [main] INFO rxtest.ObservablesDagTest Received : 1
21:16:24.101 [pool-1-thread-3] INFO rxtest.ObservablesDagTest onSubscribe : 3
21:16:24.101 [pool-1-thread-2] INFO rxtest.ObservablesDagTest onSubscribe : 2
21:16:24.303 [main] INFO rxtest.ObservablesDagTest Received : 2
21:16:24.303 [main] INFO rxtest.ObservablesDagTest Received : 2
21:16:24.304 [pool-1-thread-4] INFO rxtest.ObservablesDagTest onSubscribe : 4
21:16:24.602 [main] INFO rxtest.ObservablesDagTest Received : 3
21:16:24.603 [pool-1-thread-5] INFO rxtest.ObservablesDagTest onSubscribe : 5
21:16:24.704 [main] INFO rxtest.ObservablesDagTest Received : 4
21:16:24.804 [main] INFO rxtest.ObservablesDagTest Received : 5
21:16:24.805 [pool-1-thread-6] INFO rxtest.ObservablesDagTest onSubscribe : 6
21:16:24.956 [main] INFO rxtest.ObservablesDagTest Received : 6
21:16:24.956 [main] INFO rxtest.ObservablesDagTest END

Есть ли более эффективное / быстрое решение, чем это?

...