разделение групповой передачи, наблюдаемое, чтобы окончательно слиться в одну наблюдаемую - PullRequest
0 голосов
/ 14 апреля 2020

Моя проблема состоит в следующем:

  • Чтение из источника
  • классифицирует данные на вид a, вид b
  • вид a и вид b, разделяя их и запустить независимый
  • свернуть результаты a и b

, для этого я использую наблюдаемое для источника

AtomicLong appendX = new AtomicLong();
AtomicLong totalNumbers = new AtomicLong();
Observable<Integer> emission = Observable.fromIterable(() -> IntStream.range(1,11).iterator()).map(i -> {

  totalNumbers.incrementAndGet();
  long sleep = random.nextInt(100);
  log.info("Sleeping 1 {} for {}", i, sleep);
  Thread.sleep(sleep);
  return i;
}); 

Я категоризировал

enum Kind {
  DIVISIBLE_BY_THREE,
  EVEN
}

Observable<Tuple2<Integer, List<Kind>>> firstPass = emission.map(i -> {
  Tuple2<Integer, List<Kind>> finding = Tuple.of(i, new ArrayList<>());
  if (i % 3 == 0) {
    finding._2().add(Kind.DIVISIBLE_BY_THREE);
  }
  if (i % 2 == 0) {
    finding._2().add(Kind.EVEN);
  }
  return finding;
}).publish();

Я использую publi sh, чтобы он был общим и мог разделить их, используя фильтр, создающий независимых наблюдателей

Observable<Tuple2<Integer, List<Kind>>> divByThreeFiltered = firstPass.filter(t -> {
  log.info("divByThreeFiltered {}", t._1);
  return t._2().contains(Kind.DIVISIBLE_BY_THREE);
});

Observable<Tuple2<Integer, List<Kind>>> evenFiltered = firstPass.filter(t -> {
  log.info("evenFiltered       {}", t._1);
  return t._2().contains(Kind.EVEN);
});

И работать с ними одновременно

Observable<String> even = evenFiltered
    .map(i -> {
      return i._1() + "-" + i._2().stream()
          .map(e -> e.name()).collect(Collectors.joining(","));
    });
Observable<String> divByThree = divByThreeFiltered.map(i -> {
  long sleep = random.nextInt(100);
  log.info("Sleeping 2 {} for {}", i._1, sleep);
  Thread.sleep(sleep);
  return i._1() + "-" + i._2().stream()
      .map(e -> e.name()).collect(Collectors.joining(","));
});

, чтобы окончательно объединить их

List<String> l = new ArrayList<>();
Single<List<String>> vals =
    divByThree.mergeWith(even)
        .map(
            i -> {
              appendX.incrementAndGet();
              log.info("append xxxx {}", i);
              return i + "-XXXX";
            })
        .reduce(
            l,
            (List<String> li, String s) -> {
              li.add(s);
              return li;
            });

firstPass.connect();
List<String> to = vals.blockingGet();

Этот процесс испускает наблюдаемый один раз, но зависает, получая сокращение

List<String> to = vals.blockingGet();

Почему не происходит сокращение или фильтрация по этому вопросу?

и как можно решить проблему?

  • Я читаю, используя наблюдаемые
  • данные по категориям, используя дорогостоящие вычисления
  • после категоризации наблюдаемые потоки требуют другой обработки и запускаются одновременно из соображений производительности
...