Я использую Spring Webflux с jpa-файлами данных Spring, используя PostgreSql в качестве базы данных.Я не хочу блокировать основной поток при выполнении вызовов БД, таких как find
и save
.Чтобы добиться того же, у меня есть главный планировщик в классе Controller
и классы обслуживания jdbcScheduler
.
Я определил их следующим образом:
@Configuration
@EnableJpaAuditing
public class CommonConfig {
@Value("${spring.datasource.hikari.maximum-pool-size}")
int connectionPoolSize;
@Bean
public Scheduler scheduler() {
return Schedulers.parallel();
}
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
}
Теперь, пока я делаювызов get / save в моем слое обслуживания я делаю:
@Override
public Mono<Config> getConfigByKey(String key) {
return Mono.defer(
() -> Mono.justOrEmpty(configRepository.findByKey(key)))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
return Flux
.fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> addConfigs(List<Config> configList) {
return Flux.fromIterable(configRepository.saveAll(configList))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
А в контроллере я делаю:
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
return configService.addConfigs(configs).collectList()
.map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
.subscribeOn(scheduler);
}
Это правильно?и / или есть лучший способ сделать это?
Что я понимаю под:
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
, так это то, что задача будет выполняться в потоках jdbcScheduler
, а последующий результат будет опубликован вмоя основная параллель scheduler
.Это понимание правильно?