Как запустить весеннюю интеграцию с многопоточностью - PullRequest
0 голосов
/ 08 ноября 2018

Я хотел бы сделать следующее с пружинной интеграцией

  1. Загрузка файлов из sftp
  2. Отправьте загруженные файлы на http, а также на s3

Вот что у меня есть.

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setMaxFetchSize(2);
        return source;
    }

Вот мой сервисный активатор. Проблема с моим активатором службы заключается в том, что этот запуск выполняется в том же потоке, что и модуль обработки запросов, поэтому, когда файл обрабатывается слишком долго, следующий процесс не обрабатывается до тех пор, пока не будет выполнен первый.

@ServiceActivator(inputChannel = "sftpChannel")
        public void sftpChannel(@Payload File payload, @Header("timestamp") long timestamp) {
            log.info("Message arrived at sftpChannel");
         //do something with file
    }

Как я могу запустить файловый процесс в отдельном потоке и освободить вместо него поток опроса, чтобы он мог продолжать извлекать файлы из sftp?

Ответы [ 2 ]

0 голосов
/ 08 ноября 2018

Примерно так:

@Bean
public ThreadPoolTaskExecutor executor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(5);
    return executor; 
}

И используйте это executor в качестве имени бина в @Poller из @InboundChannelAdapter:

@Bean
@InboundChannelAdapter(channel = "sftpChannel", 
        poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3", taskExecutor="executor"))

См. @Poller JavaDocs:

/**
 * @return The {@link org.springframework.core.task.TaskExecutor} bean name.
 */
String taskExecutor() default "";

А также см. Документы в Справочном руководстве: https://docs.spring.io/spring-integration/docs/5.0.9.RELEASE/reference/html/messaging-channels-section.html#conditional-pollers

Важно: Async Handoff

Этот совет изменяет триггер на основе результата receive(). Это будет работать, только если вызов вызывается в потоке опросов. Это не будет работать, если у опроса есть task-executor. Чтобы воспользоваться этим советом, когда вы хотите использовать асинхронные операции после результата опроса, выполните асинхронную передачу обслуживания позже, возможно, с помощью ExecutorChannel.

0 голосов
/ 08 ноября 2018

Вы можете использовать аннотацию @Async для запуска любого метода в отдельном потоке. Вам просто нужно добавить @EnableAsync в любой файл @Configuration, и при его вызове он будет работать асинхронно. Вы можете найти больше информации в этом блоге .

...