Интеграция Spring inboundChannelAdapter неожиданно останавливает опрос - PullRequest
0 голосов
/ 06 апреля 2020

В нашем проекте нам нужно получить цены с удаленного FTP-сервера. В рабочее время это работает нормально, цены извлекаются и успешно обрабатываются. В нерабочее время на ftp-сервере не публикуются новые цены, поэтому, как и ожидалось, мы не нашли ничего нового.

Наша проблема в том, что после нескольких часов отсутствия новых цен, опросчик просто прекращает опрос. Нет ошибок в лог-файлах (даже при работе на org.springframework.integration на уровне отладки) и исключений. Сейчас мы используем отдельную TaskExecutor, чтобы изолировать проблему, но все же программа опроса просто останавливается. В то же время мы скорректировали выражение cron в соответствии с этими часами, чтобы ограничить использование ресурса, но при этом опросчик просто останавливается, когда он должен работать.

Любая помощь в устранении этой проблемы очень ценится!

Мы используем @InboudChannelAdapter для FtpStreamingMessageSource, который настроен следующим образом:

@Bean
    @InboundChannelAdapter(
        value = FTP_PRICES_INBOUND,
        poller = [Poller(
            maxMessagesPerPoll = "\${ftp.fetch.size}",
            cron = "\${ftp.poll.cron}",
            taskExecutor = "ftpTaskExecutor"
        )],
        autoStartup = "\${ftp.fetch.enabled:false}"
    )
    fun ftpInboundFlow(
        @Value("\${ftp.remote.prices.dir}") pricesDir: String,
        @Value("\${ftp.remote.prices.file.pattern}") remoteFilePattern: String,
        @Value("\${ftp.fetch.size}") fetchSize: Int,
        @Value("\${ftp.fetch.enabled:false}") fetchEnabled: Boolean,
        clock: Clock,
        remoteFileTemplate: RemoteFileTemplate<FTPFile>,
        priceParseService: PriceParseService,
        ftpFilterOnlyFilesFromMaxDurationAgo: FtpFilterOnlyFilesFromMaxDurationAgo
    ): FtpStreamingMessageSource {
        val messageSource = FtpStreamingMessageSource(remoteFileTemplate, null)

        messageSource.setRemoteDirectory(pricesDir)
        messageSource.maxFetchSize = fetchSize
        messageSource.setFilter(
            inboundFilters(
                remoteFilePattern,
                ftpFilterOnlyFilesFromMaxDurationAgo
            )
        )
        return messageSource;
    }

Значения свойств:

poll.cron: "*/30 * 4-20 * * MON-FRI"
fetch.size: 10
fetch.enabled: true

Мы ограничиваем опрос .cron мы использовали извлечение каждую минуту.

В соответствующем DefaultFtpSessionFactory тайм-ауты установлены на 60 секунд, чтобы переопределить значение по умолчанию -1 (что означает отсутствие тайм-аута вообще):

sessionFactory.setDataTimeout(timeOut)
sessionFactory.setConnectTimeout(timeOut)
sessionFactory.setDefaultTimeout(timeOut)

Ответы [ 2 ]

0 голосов
/ 22 апреля 2020

Оказалось, что обработка заняла больше запланированного интервала, поэтому во время обработки новая задача уже была выполнена. Таким образом, в конечном итоге несколько задач пытались выполнить sh одно и то же.

Мы решили эту проблему с помощью fixedDelay на устройстве опроса вместо fixedRate. Разница в том, что fixedRate планирует регулярный интервал независимо от того, была ли задача завершена, а fixedDelay планирует задержку после завершения задачи.

0 голосов
/ 06 апреля 2020

Может быть, мой ответ кажется слишком легким, но дело в том, что ваше выражение cron утверждает, что оно должно запланировать работу от 4 до 20 часов. После 8:00 вечера он больше не будет планировать работу и снова начнет опрос в 4:00 утра.

...