Как использовать Spring Integration для настройки ThreadPool для обработки источника сообщений файла? - PullRequest
0 голосов
/ 03 июля 2018

Может ли кто-нибудь помочь мне переписать этот поток с помощью пула потоков? Приведенный ниже код работает, но использует фиксированную задержку для обслуживания входящих файлов:

@Bean
  public IntegrationFlow sampleFlow() {
    return IntegrationFlows
          .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
          .channel(new DirectChannel())
          .transform(fileMessageToJobRequest())
          .handle(springBatchJobLauncher())
          .handle(jobExecution -> {
            logger.info("jobExecution payload: {}", jobExecution.getPayload());
          })
          .get();
  }

Темы нужны, потому что файлы приходят с высокой скоростью.

Ответы [ 2 ]

0 голосов
/ 03 июля 2018

Спасибо @Артем. Я нашел решение, основываясь на ответе Артема. Хитрость заключается в использовании TaskExecutor в коде ниже. Также Pollers.maxMessagesPerPoll (nbfiles) должен быть настроен на обработку более одного сообщения (= файл) одновременно.

  @Bean
  public IntegrationFlow sampleFlow() throws InterruptedException {
    return IntegrationFlows
          .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
          .channel(MessageChannels.executor(threadPoolTaskExecutor()))
          .transform(fileMessageToJobRequest())
          .handle(springBatchJobLauncher())
          .handle(jobExecution -> {
            logger.debug("jobExecution payload: {}", jobExecution.getPayload());
          })
          .get();
  }

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    int poolSize = 20;
    logger.debug("...... createing ThreadPool of size {}.", poolSize);
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("Dama_Thread_");
    executor.setMaxPoolSize(5);
    executor.setCorePoolSize(5);
    executor.setQueueCapacity(22);
    return executor;
  }
0 голосов
/ 03 июля 2018

Опрос может быть настроен с помощью этой опции:

    /**
 * Specify an {@link Executor} to perform the {@code pollingTask}.
 * @param taskExecutor the {@link Executor} to use.
 * @return the spec.
 */
public PollerSpec taskExecutor(Executor taskExecutor) {

Где вы действительно можете предоставить ThreadPoolTaskExecutor экземпляр.

...