Я использую Spring ListenableFuture для выполнения асинхронных задач. Я пытаюсь сделать следующее.
- У меня есть список объектов. Я делю список на партии размером 3.
- Я передаю каждую партию для выполнения некоторых асинхронных операций с кассандрой.
- Мне нужно подождать, пока первая партия не будет сделана, а затем забрать следующую партию
- Если пакетное сообщение выдало ошибку, выдается ошибка, почему оно не удалось, и не принимайте следующую партию.
Но по некоторым причинам это работает не так, как ожидалось.
- Даже если партия не удалась, будет получена следующая партия
- Следующая партия извлекается, даже если первая партия выполняется.
Я что-то не так делаю.
public void run(String... args) throws Exception {
List<MyEntity> entities = Arrays.asList(
new MyEntity("3", "val1"),
new MyEntity("2", "val1"),
new MyEntity(null, "val1"),
new MyEntity("21", "val1")
);
partition(entities, 3).forEach(p -> {
List<ListenableFuture<MyEntity>> futures = new ArrayList<>();
final CountDownLatch countDownLatch = new CountDownLatch(1);
entities.stream().forEach(s -> {
ListenableFuture<MyEntity> future = asyncCassandraOperations.insert(s);
future.addCallback(it -> {
countDownLatch.countDown();
System.out.println("Pass" + it);
}, throwable -> {
countDownLatch.countDown();
System.out.println("fail");
});
futures.add(future);
});
futures.forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
countDownLatch.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("done patch");
});
System.out.println("done all");
}