Моя проблема состоит в следующем:
- Чтение из источника
- классифицирует данные на вид a, вид b
- вид a и вид b, разделяя их и запустить независимый
- свернуть результаты a и b
, для этого я использую наблюдаемое для источника
AtomicLong appendX = new AtomicLong();
AtomicLong totalNumbers = new AtomicLong();
Observable<Integer> emission = Observable.fromIterable(() -> IntStream.range(1,11).iterator()).map(i -> {
totalNumbers.incrementAndGet();
long sleep = random.nextInt(100);
log.info("Sleeping 1 {} for {}", i, sleep);
Thread.sleep(sleep);
return i;
});
Я категоризировал
enum Kind {
DIVISIBLE_BY_THREE,
EVEN
}
Observable<Tuple2<Integer, List<Kind>>> firstPass = emission.map(i -> {
Tuple2<Integer, List<Kind>> finding = Tuple.of(i, new ArrayList<>());
if (i % 3 == 0) {
finding._2().add(Kind.DIVISIBLE_BY_THREE);
}
if (i % 2 == 0) {
finding._2().add(Kind.EVEN);
}
return finding;
}).publish();
Я использую publi sh, чтобы он был общим и мог разделить их, используя фильтр, создающий независимых наблюдателей
Observable<Tuple2<Integer, List<Kind>>> divByThreeFiltered = firstPass.filter(t -> {
log.info("divByThreeFiltered {}", t._1);
return t._2().contains(Kind.DIVISIBLE_BY_THREE);
});
Observable<Tuple2<Integer, List<Kind>>> evenFiltered = firstPass.filter(t -> {
log.info("evenFiltered {}", t._1);
return t._2().contains(Kind.EVEN);
});
И работать с ними одновременно
Observable<String> even = evenFiltered
.map(i -> {
return i._1() + "-" + i._2().stream()
.map(e -> e.name()).collect(Collectors.joining(","));
});
Observable<String> divByThree = divByThreeFiltered.map(i -> {
long sleep = random.nextInt(100);
log.info("Sleeping 2 {} for {}", i._1, sleep);
Thread.sleep(sleep);
return i._1() + "-" + i._2().stream()
.map(e -> e.name()).collect(Collectors.joining(","));
});
, чтобы окончательно объединить их
List<String> l = new ArrayList<>();
Single<List<String>> vals =
divByThree.mergeWith(even)
.map(
i -> {
appendX.incrementAndGet();
log.info("append xxxx {}", i);
return i + "-XXXX";
})
.reduce(
l,
(List<String> li, String s) -> {
li.add(s);
return li;
});
firstPass.connect();
List<String> to = vals.blockingGet();
Этот процесс испускает наблюдаемый один раз, но зависает, получая сокращение
List<String> to = vals.blockingGet();
Почему не происходит сокращение или фильтрация по этому вопросу?
и как можно решить проблему?
- Я читаю, используя наблюдаемые
- данные по категориям, используя дорогостоящие вычисления
- после категоризации наблюдаемые потоки требуют другой обработки и запускаются одновременно из соображений производительности