У меня есть несколько вопросов об использовании вместе функций @Scheduled
и @Async
Spring.
Допустим, мое требование состоит в том, чтобы обрабатывать 5 строк из БД каждую 1 секунду, поэтому за 1 проход потока планировщика будет создано 5 асинхронных потоков для обработки каждой строки из базы данных.
У меня следующие вопросы:
1) Есть ли альтернативный способ создания 5 потоков ascynchonis вместо использования цикла while в запланированном методе?
Одна проблема, которую яПосмотрите, что при таком подходе число активных пулов потоков может не совпадать с максимальным размером пула, и поэтому цикл не прервется до истечения 1 секунды.
2) В некоторых случаях оператор log в AsyncService, т. е. Executing dbItem on the following asyncExecutor : task-scheduler-1
отображает task-scheduler-1
в качестве имени потока, а не async_thread_
, как я всегда ожидал?
3) Если мой поток планировщика занимает больше 1 секунды для запуска, что произойдет при последующем проходе планировщика?
asyncExecutor:
@Override
@Bean(name="asyncExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setQueueCapacity(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setThreadNamePrefix("async_thread_");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
, который внедряется в класс с запланированным методом:
@Autowired
@Qualifier("asyncExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
AsyncService asyncService;
@Autowired
private DaoService daoService;
@Scheduled(fixedRateString = "1000")
public void schedulerMetod() {
try {
while (threadPoolTaskExecutor.getActiveCount() < threadPoolTaskExecutor.getMaxPoolSize()) {
DbItem dbItem = daoService.retrieveNewItemFromDB();
if (dbItem != null){
asyncService.processNewItem(dbItem);
}
}
} catch (ObjectOptimisticLockingFailureException ole){
log.info(ole.getMessage());
} catch (Exception ex){
log.error(ex.getMessage());
}
}
@Service
public class AsyncServiceImpl implements AsyncService {
@Autowired
private TaskService taskService;
@Override
@Transactional
@Async("asyncExecutor")
public void processNewItem(DbItem dbItem) {
log.debug("Executing dbItem on the following asyncExecutor : " + Thread.currentThread().getName());
taskService.processNewItem(dbItem);
}
}