Особенности проектирования Использование Springs `@ Scheduled` и` @ Async` вместе - PullRequest
0 голосов
/ 21 мая 2018

У меня есть несколько вопросов об использовании вместе функций @Scheduled и @Async Spring.

Допустим, мое требование состоит в том, чтобы обрабатывать 5 строк из БД каждую 1 секунду, поэтому за 1 проход потока планировщика будет создано 5 асинхронных потоков для обработки каждой строки из базы данных.

У меня следующие вопросы:

1) Есть ли альтернативный способ создания 5 потоков ascynchonis вместо использования цикла while в запланированном методе?

Одна проблема, которую яПосмотрите, что при таком подходе число активных пулов потоков может не совпадать с максимальным размером пула, и поэтому цикл не прервется до истечения 1 секунды.

2) В некоторых случаях оператор log в AsyncService, т. е. Executing dbItem on the following asyncExecutor : task-scheduler-1 отображает task-scheduler-1 в качестве имени потока, а не async_thread_, как я всегда ожидал?

3) Если мой поток планировщика занимает больше 1 секунды для запуска, что произойдет при последующем проходе планировщика?

asyncExecutor:

@Override
@Bean(name="asyncExecutor")
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();        
    threadPoolTaskExecutor.setCorePoolSize(5);
    threadPoolTaskExecutor.setQueueCapacity(5);
    threadPoolTaskExecutor.setMaxPoolSize(10);
    threadPoolTaskExecutor.setThreadNamePrefix("async_thread_");
    threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);     
    return threadPoolTaskExecutor;
}

, который внедряется в класс с запланированным методом:

@Autowired
@Qualifier("asyncExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Autowired
AsyncService asyncService;

@Autowired
private DaoService daoService;

@Scheduled(fixedRateString = "1000")
public void schedulerMetod() {

    try {       

        while (threadPoolTaskExecutor.getActiveCount() < threadPoolTaskExecutor.getMaxPoolSize()) {
            DbItem dbItem = daoService.retrieveNewItemFromDB();         
            if (dbItem != null){            
                asyncService.processNewItem(dbItem);
            }
        }

    } catch (ObjectOptimisticLockingFailureException ole){
        log.info(ole.getMessage());     
    } catch (Exception ex){
        log.error(ex.getMessage());
    }   
}


@Service
public class AsyncServiceImpl implements AsyncService {

    @Autowired
    private TaskService taskService;

    @Override
    @Transactional
    @Async("asyncExecutor")
    public void processNewItem(DbItem dbItem) {
        log.debug("Executing dbItem on the following asyncExecutor : " + Thread.currentThread().getName());     
        taskService.processNewItem(dbItem); 
    }

}

1 Ответ

0 голосов
/ 08 января 2019

Используя компонент ThreadPoolTaskScheduler, вы получите двойную цель, поскольку он реализует интерфейсы TaskExecutor (поддерживает @Async) и TaskScheduler (поддерживает @Scheduled).Вы можете настроить следующим образом:

@Bean
public ThreadPoolTaskScheduler taskScheduler() {
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(5);
    threadPoolTaskScheduler.setThreadNamePrefix("async_thread_");
    threadPoolTaskScheduler.initialize();
    return threadPoolTaskScheduler;
}

Если вы дадите имя другому исполнителю, например taskScheduler1, вы должны указать имя исполнителя в качестве атрибута в @Async("taskScheduler1").Благодаря этой реализации у вас уже есть ответы на ваши вопросы 1 и 2.

О вашем третьем вопросе: если все потоки в вашем пуле потоков заняты, тогда ваши планировщики ничего не делают, пока поток не освободится, тогда он выполнит задачу планирования.Можно подумать, чтобы увеличить размер пула потоков.

И последнее, но не менее важное: достаточно использовать @Scheduled, поскольку он работает асинхронно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...