Контекст: я хочу сохранить данные асинхронно в MongoDb. Когда Front (например, Angular / Mobile) вызывает Endpoint Controller, он вызывает метод Service, предназначенный для первого сохранения данных в MongoDb, и, если это удастся, публикует эти же данные как Kafka topi c и вызывает другую конечную точку, передавая вперед те же данные. Чтобы достичь этого поведения, я хочу использовать CompletableFuture с «двумя шагами»: сначала сохранить на MongoDb, а затем (затем AcceptAsyn c) отправить данные в Kafka Topi c и затем AcceptAsyn c отправить в другой сервис отдыха.
Проблема: я полностью застрял, чтобы заставить работать первый «шаг»: сохранение данных в MongoDb. Я могу запустить простой системный system.out.print, но никогда не получу ReactiveCrudRepository.save (), сохраняющий данные. См. Нижеприведенный код.
Я довольно остальная часть работает, так как, если я пытаюсь сохранить без CompletableFuture, все идет хорошо с этим методом:
//properly working without CompletableFuture
public void SimpleSaveMehtod(Extrato e) {
extratoRepository.save(e); // .subscribe();
}
Вот код метода службы, который "runPrintAsyn c .get ();» работает отлично (печатает сообщение), но "runSaveAsyn c .get ();" не сохранить вообще. Я попробовал "ForkJoinPool.commonPool (). AwaitQuiescence (3, TimeUnit.SECONDS)", и это тоже не сработало (честно говоря, я не думаю, что имеет смысл ждать, если я заблокировал с помощью get btw, я попробовал).
@Async("threadPoolTaskExecutor")
public void transferirVoidReturned(Extrato e) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Void> runSaveAsync = CompletableFuture.runAsync(() -> extratoRepository.save(e),newFixedThreadPool);
CompletableFuture<Void> runPrintAsync = CompletableFuture.runAsync(() -> System.out.print("I am printed"), newFixedThreadPool);
try {
// ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS);
runSaveAsync.get();
runPrintAsync.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
}
Если это уместно, вот хранилище:
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}