Как я могу вызвать новый метод @ Asyn c, но ждать условия? - PullRequest
0 голосов
/ 26 февраля 2020

У меня есть приложение Spring Boot, которое реализует AMQP MessageListener. Этот слушатель вызывает метод @ Asyn c, управляемый ThreadPoolTaskExecutor с размером пула. Проблема возникает, когда поступает много входящих сообщений, поэтому эти сообщения теряются из-за отсутствия доступных асинхронных рабочих.

Я использую Spring Core 5.0.7-RELEASE, Java 8

Это мой код:

AsyncConfigurator:

@EnableAsync
@Configuration
public class AsyncConfiguration extends AsyncConfigurerSupport {

    @Override
    @Bean("docThreadPoolTaskExecutor")
    public Executor getAsyncExecutor() {
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("DocWorkerThread-");
        executor.initialize();
        return executor;
    }

Служба поддержки ( MyAsyncService ):


    @Async("docThreadPoolTaskExecutor")
    @Override
    public void generaDocumento(String idUser, int year) {
       //... some heavy and slow process
    }

Слушатель моего сообщения:

...
    @Autowired
    private MyAsyncService myAsyncService;

    @Override
    @EntryPoint
    public void onMessage(Message message) {
        try {
            final String mensaje = new String(message.getBody(), StandardCharsets.UTF_8); 
            final MyPojo payload = JsonUtils.readFromJson(mensaje , MyPojo.class);

            myAsyncService.generaDocumento(payload.getIdUser(), Integer.valueOf(payload.getYear()));

        } catch ( Throwable t ) { 
            throw new AmqpRejectAndDontRequeueException( t );
        }
    }

Мне нужен кто-то, чтобы дать мне идею, чтобы решить эту проблему.

1 Ответ

0 голосов
/ 26 февраля 2020

ThreadPoolTaskExecutor имеет очередь по умолчанию. Сообщения должны быть добавлены в очередь, когда вы достигнете corePoolSize. Когда очередь заполнится, будет запущено больше рабочих, пока вы не достигнете maxPoolSize. Вы можете проверить текущий размер очереди по executor.getThreadPoolExecutor().getQueue().size(), чтобы проверить, действительно ли сообщения потеряны или просто застряли в очереди.

...