Я создал поток интеграции для чтения файлов с SFTP-сервера и их обработки.Я понял, что как только возникает ошибка с одним из файлов (исключение выдается), опрос останавливается, и любой другой файл не обрабатывается до следующего опроса.Как я могу избежать этого, не отмечая файл как обработанный, и обрабатывая оставшиеся файлы в этом опросе?
Моя конфигурация довольно проста.Я использую нетранзакционный опросчик, который запускается каждую минуту с max-message-per-poll
, равным 1000. SftpStreamingInboundChannelAdapterSpec
имеет max-fetch-size
, равный 10, и использует фильтр составного списка файлов с SftpRegexPatternFileListFilter
и SftpPersistentAcceptOnceFileListFilter
.
@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
.transform(new StreamTransformer())
.channel("processingChannel")
.get();
}
private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
.filter(fileListFilter(jdbcMetadataStore))
.maxFetchSize(10)
.remoteDirectory("/the-directory");
SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
sftpStreamingMessageSource.setFileInfoJson(false);
return sftpStreamingInboundChannelAdapterSpec;
}
private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000);
sourcePollingChannelAdapterSpec.autoStartup(true)
.poller(pollerSpec);
}
@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
String fileNameRegex = // get regex
SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
return compositeFileListFilter;
}
После прочтения этого ответа я попытался использовать транзакционный поллер следующим образом:
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000)
.transactional(dataSourceTransactionManager);
, но в результате после обработки файла происходит сбой,опрос останавливается, все обработанные сообщения откатываются, а оставшиеся сообщения не обрабатываются до следующего опроса.Из этого ответа я понял, что каждое сообщение будет обрабатываться в отдельной транзакции.
Единственный способ, который я нашел для достижения этой цели, - заключить код обработки в блок try / catch, перехватывая все исключения дляИзбегайте прерывания опроса.В блоке catch я вручную удаляю ChannelSftp.LsEntry
из фильтра составного списка файлов.Для этого мне нужно было установить свойство fileInfoJson
в false
в SftpStreamingMessageSource
, предоставленном SftpStreamingInboundChannelAdapterSpec
.
. Я считаю этот подход довольно запутанным и с недостатком, что файлы, которые терпят неудачу и удаляютсяиз фильтра сразу же обрабатываются позже, а не в следующем опросе. Я надеялся, что есть более простое решение моей проблемы.