Как обработать оставшиеся файлы в опросе входящего адаптера SFTP после возникновения исключения? - PullRequest
1 голос
/ 11 июня 2019

Я создал поток интеграции для чтения файлов с SFTP-сервера и их обработки.Я понял, что как только возникает ошибка с одним из файлов (исключение выдается), опрос останавливается, и любой другой файл не обрабатывается до следующего опроса.Как я могу избежать этого, не отмечая файл как обработанный, и обрабатывая оставшиеся файлы в этом опросе?

Моя конфигурация довольно проста.Я использую нетранзакционный опросчик, который запускается каждую минуту с max-message-per-poll, равным 1000. SftpStreamingInboundChannelAdapterSpec имеет max-fetch-size, равный 10, и использует фильтр составного списка файлов с SftpRegexPatternFileListFilter и SftpPersistentAcceptOnceFileListFilter.

@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
    return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
                .transform(new StreamTransformer())
                .channel("processingChannel")
                .get();
}

private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
    SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
                .filter(fileListFilter(jdbcMetadataStore))
                .maxFetchSize(10)
                .remoteDirectory("/the-directory");
    SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
    sftpStreamingMessageSource.setFileInfoJson(false);

    return sftpStreamingInboundChannelAdapterSpec;
}

private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
    PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
                .maxMessagesPerPoll(1000);
    sourcePollingChannelAdapterSpec.autoStartup(true)
                .poller(pollerSpec);
}

@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
    String fileNameRegex = // get regex

    SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
    SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");

    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
    compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
    compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);

    return compositeFileListFilter;
}

После прочтения этого ответа я попытался использовать транзакционный поллер следующим образом:

PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
    .maxMessagesPerPoll(1000)
    .transactional(dataSourceTransactionManager);

, но в результате после обработки файла происходит сбой,опрос останавливается, все обработанные сообщения откатываются, а оставшиеся сообщения не обрабатываются до следующего опроса.Из этого ответа я понял, что каждое сообщение будет обрабатываться в отдельной транзакции.

Единственный способ, который я нашел для достижения этой цели, - заключить код обработки в блок try / catch, перехватывая все исключения дляИзбегайте прерывания опроса.В блоке catch я вручную удаляю ChannelSftp.LsEntry из фильтра составного списка файлов.Для этого мне нужно было установить свойство fileInfoJson в false в SftpStreamingMessageSource, предоставленном SftpStreamingInboundChannelAdapterSpec.

. Я считаю этот подход довольно запутанным и с недостатком, что файлы, которые терпят неудачу и удаляютсяиз фильтра сразу же обрабатываются позже, а не в следующем опросе. Я надеялся, что есть более простое решение моей проблемы.

1 Ответ

1 голос
/ 11 июня 2019

Решение с try ... catch - путь. Это действительно тот факт, что исключение, выброшенное из процесса, попадает в устройство опроса и останавливает текущий цикл вокруг maxMessagesPerPoll:

private Runnable createPoller() {
    return () ->
            this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }
            });
}

Где это pollForMessage() так:

private Message<?> pollForMessage() {
    try {
        return this.pollingTask.call();
    }
    catch (Exception e) {
        if (e instanceof MessagingException) {
            throw (MessagingException) e;
        }
        else {
            Message<?> failedMessage = null;
            if (this.transactionSynchronizationFactory != null) {
                Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                if (resource instanceof IntegrationResourceHolder) {
                    failedMessage = ((IntegrationResourceHolder) resource).getMessage();
                }
            }
            throw new MessagingException(failedMessage, e); // NOSONAR (null failedMessage)
        }
    }
    finally {
        if (this.transactionSynchronizationFactory != null) {
            Object resource = getResourceToBind();
            if (TransactionSynchronizationManager.hasResource(resource)) {
                TransactionSynchronizationManager.unbindResource(resource);
            }
        }
    }
}

В любом случае, в одном цикле опроса все еще есть способ изолировать одно сообщение от других. Для этого вам нужно взглянуть на Цепочку рекомендаций для обработчиков запросов и исследовать решение с помощью ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/current/reference/html/#message-handler-advice-chain

Итак, вы добавляете это в конечную точку вашего обработчика вниз по потоку и отлавливаете там исключения, а также выполняете некоторую конкретную обработку ошибок, не перебрасывая исключения в обработчик.

...