Spring ListenableFuture ждут, пока все элементы не будут закончены - PullRequest
0 голосов
/ 16 мая 2019

Я использую Spring ListenableFuture для выполнения асинхронных задач. Я пытаюсь сделать следующее.

  1. У меня есть список объектов. Я делю список на партии размером 3.
  2. Я передаю каждую партию для выполнения некоторых асинхронных операций с кассандрой.
  3. Мне нужно подождать, пока первая партия не будет сделана, а затем забрать следующую партию
  4. Если пакетное сообщение выдало ошибку, выдается ошибка, почему оно не удалось, и не принимайте следующую партию.

Но по некоторым причинам это работает не так, как ожидалось.

  1. Даже если партия не удалась, будет получена следующая партия
  2. Следующая партия извлекается, даже если первая партия выполняется.

Я что-то не так делаю.

  public void run(String... args) throws Exception {

    List<MyEntity> entities = Arrays.asList(
        new MyEntity("3", "val1"),
        new MyEntity("2", "val1"),
        new MyEntity(null, "val1"),
        new MyEntity("21", "val1")
    );

    partition(entities, 3).forEach(p -> {
      List<ListenableFuture<MyEntity>> futures = new ArrayList<>();
      final CountDownLatch countDownLatch = new CountDownLatch(1);

      entities.stream().forEach(s -> {
        ListenableFuture<MyEntity> future = asyncCassandraOperations.insert(s);
        future.addCallback(it -> {
          countDownLatch.countDown();
          System.out.println("Pass" + it);
        }, throwable -> {
          countDownLatch.countDown();
          System.out.println("fail");
        });
        futures.add(future);
      });

      futures.forEach(f -> {
        try {
          f.get();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });

      try {
        countDownLatch.await();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }

      System.out.println("done patch");
    });

    System.out.println("done all");

  }
...