springframework.data.repository.reactive.ReactiveCrudRepository.save () не сохраняет данные при вызове из CompletableFuture.runAsync - PullRequest
0 голосов
/ 26 марта 2020

Контекст: я хочу сохранить данные асинхронно в MongoDb. Когда Front (например, Angular / Mobile) вызывает Endpoint Controller, он вызывает метод Service, предназначенный для первого сохранения данных в MongoDb, и, если это удастся, публикует эти же данные как Kafka topi c и вызывает другую конечную точку, передавая вперед те же данные. Чтобы достичь этого поведения, я хочу использовать CompletableFuture с «двумя шагами»: сначала сохранить на MongoDb, а затем (затем AcceptAsyn c) отправить данные в Kafka Topi c и затем AcceptAsyn c отправить в другой сервис отдыха.

Проблема: я полностью застрял, чтобы заставить работать первый «шаг»: сохранение данных в MongoDb. Я могу запустить простой системный system.out.print, но никогда не получу ReactiveCrudRepository.save (), сохраняющий данные. См. Нижеприведенный код.

Я довольно остальная часть работает, так как, если я пытаюсь сохранить без CompletableFuture, все идет хорошо с этим методом:

//properly working without CompletableFuture
public void SimpleSaveMehtod(Extrato e) {
    extratoRepository.save(e); // .subscribe();
}

Вот код метода службы, который "runPrintAsyn c .get ();» работает отлично (печатает сообщение), но "runSaveAsyn c .get ();" не сохранить вообще. Я попробовал "ForkJoinPool.commonPool (). AwaitQuiescence (3, TimeUnit.SECONDS)", и это тоже не сработало (честно говоря, я не думаю, что имеет смысл ждать, если я заблокировал с помощью get btw, я попробовал).

@Async("threadPoolTaskExecutor")
public void transferirVoidReturned(Extrato e) {

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

    CompletableFuture<Void> runSaveAsync = CompletableFuture.runAsync(() -> extratoRepository.save(e),newFixedThreadPool);

    CompletableFuture<Void> runPrintAsync = CompletableFuture.runAsync(() -> System.out.print("I am printed"), newFixedThreadPool);

    try {
        // ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS);

        runSaveAsync.get();
        runPrintAsync.get();

    } catch (InterruptedException | ExecutionException e1) {
        e1.printStackTrace();
    }
}

Если это уместно, вот хранилище:

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

1 Ответ

1 голос
/ 30 марта 2020

tl; dr : Вам необходимо подписаться на Mono:

public void SimpleSaveMehtod(Extrato e) {
    extratoRepository.save(e).subscribe();
}

Справочная информация : одно из многих обоснований использования реактивных типов над Future и похоже, что они позволяют выполнять вычисления лениво. Это означает, что когда вы создаете Mono, Flux или любую другую структуру и добавляете некоторые вычисления, она не будет выполняться до тех пор, пока она не понадобится. Mono создано, поток возвращается в пул, потому что больше не нужно выполнять никаких действий. Поскольку подписка не была сделана, вычисления в Mono никогда не выполняются.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...