Spring интеграции потока SFTP: шаблон фильтра не работает - PullRequest
1 голос
/ 07 ноября 2019

У меня есть следующий начальный процесс интеграции пружины:

   final var inboundStreamingAdapter = Sftp.inboundStreamingAdapter(new SftpRemoteFileTemplate(sftpSessionFactory))
            .patternFilter("*.csv")
            .remoteDirectory(sftpRemoteDirectoryDownload);

   IntegrationFlows.from(inboundStreamingAdapter,
            c -> c.poller(Pollers.trigger(new PeriodicTrigger(10)).maxMessagesPerPoll(1)))
            .channel("data")
            .split(mySplitter)
            .get();

, и у меня есть следующие файлы на моем сервере sftp:

.DS_Store
PL_12002_1815_1.csv1

Из моего понимания адаптер должен отфильтровать все, чтоне заканчивается на csv postfix.

На самом деле происходит то, что некоторые файлы (.DS_Store) иногда проходят через mySplitter в зависимости от того, что в данный момент хранится на FTP.

Насколько далекоЯ могу сказать, что проблема в AbstractRemoteFileStreamingMessageSource (используется в качестве базового класса для класса SftpStreamingMessageSource):

           if (this.filter != null && this.filter.supportsSingleFileFiltering()
                       && !this.filter.accept(file.getFileInfo())) {  //HERE APPLY A FILTER AND FIND OUT PL_12002_1815_1.csv1 SHOULD NOT BE PROCESSED

                   if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files 
                       //HERE WE LOAD ANOTHER FILE FROM THE QUEUE (.DS_Store)
                       file = poll();
                   }
                   else {
                       file = null;
                   }
           }
    //FILTER IS NOT REAPPLIED FURTHER DOWN AND WE CREATE A MESSAGE FOR .DS_Store
           if (file != null) {
               try {
                   String remotePath = remotePath(file);
                   Session<?> session = this.remoteFileTemplate.getSession();
                   try {
                       return getMessageBuilderFactory()
                               .withPayload(session.readRaw(remotePath))
                               .setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
                               .setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
                               .setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
                               .setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
                               .setHeader(FileHeaders.REMOTE_FILE_INFO,
                                       this.fileInfoJson ? file.toJson() : file);
       }

Я бы сказал, что это ошибка, но, возможно, я просто что-то неправильно настроил, может кто-нибудь помочь?

1 Ответ

1 голос
/ 07 ноября 2019

выглядит как ошибка;if должно быть while (с нулевой проверкой).

Пожалуйста, откройте проблему на GitHub .

...