У меня есть rx.Singles (или rx.Observables), которые должны быть выполнены в порядке зависимости, как показано в этом DAG (Directed-Acyclic-Graph):
one
должно быть выполнено первым two
, а three
должно быть выполнено только после завершения one
two
иthree
может выполняться параллельно four
должен выполняться только после завершения two
five
должен выполняться только после завершения two
и three
four
и five
могут выполняться параллельно six
должно быть выполнено наконец, после four
и five
.
Как лучше всего этого добиться?
Следующее решение ( Solution # 1 ) работает как положено:
package rxtest;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;
public class ObservablesDagTest {
private static final Logger logger = LoggerFactory.getLogger(ObservablesDagTest.class);
private static Executor customExecutor = Executors.newFixedThreadPool(20);
@Test
public void dagTest() {
Single<Integer> one = createSingle(1, 100);
Single<Integer> two = createSingle(2, 200);
Single<Integer> three = createSingle(3, 500);
Single<Integer> four = createSingle(4, 400);
Single<Integer> five = createSingle(5, 200);
Single<Integer> six = createSingle(6, 150);
executeDag(one, two, three, four, five, six);
}
private void executeDag(Single<Integer> one, Single<Integer> two, Single<Integer> three, Single<Integer> four,
Single<Integer> five, Single<Integer> six) {
logger.info("BEGIN");
Single<Integer> twoCached = two.toObservable().cache().toSingle();
Observable.concat(
one.toObservable(),
Observable.merge(
Single.concat(twoCached, four),
Observable.concat(Single.merge(twoCached, three), five.toObservable())),
six.toObservable())
.toBlocking()
.subscribe(i -> logger.info("Received : " + i));
logger.info("END");
}
private Single<Integer> createSingle(int j, int sleepMs) {
Single<Integer> single = Single.just(j)
.flatMap(i -> Single.<Integer> create(s -> {
logger.info("onSubscribe : {}", i);
sleep(sleepMs);
s.onSuccess(i);
}).subscribeOn(Schedulers.from(customExecutor)));
return single;
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
}
}
}
Вывод
21:16:23.879 [main] INFO rxtest.ObservablesDagTest BEGIN
21:16:23.974 [pool-1-thread-1] INFO rxtest.ObservablesDagTest onSubscribe : 1
21:16:24.079 [main] INFO rxtest.ObservablesDagTest Received : 1
21:16:24.101 [pool-1-thread-3] INFO rxtest.ObservablesDagTest onSubscribe : 3
21:16:24.101 [pool-1-thread-2] INFO rxtest.ObservablesDagTest onSubscribe : 2
21:16:24.303 [main] INFO rxtest.ObservablesDagTest Received : 2
21:16:24.303 [main] INFO rxtest.ObservablesDagTest Received : 2
21:16:24.304 [pool-1-thread-4] INFO rxtest.ObservablesDagTest onSubscribe : 4
21:16:24.602 [main] INFO rxtest.ObservablesDagTest Received : 3
21:16:24.603 [pool-1-thread-5] INFO rxtest.ObservablesDagTest onSubscribe : 5
21:16:24.704 [main] INFO rxtest.ObservablesDagTest Received : 4
21:16:24.804 [main] INFO rxtest.ObservablesDagTest Received : 5
21:16:24.805 [pool-1-thread-6] INFO rxtest.ObservablesDagTest onSubscribe : 6
21:16:24.956 [main] INFO rxtest.ObservablesDagTest Received : 6
21:16:24.956 [main] INFO rxtest.ObservablesDagTest END
Есть ли более эффективное / быстрое решение, чем это?