Отказаться от чтения файлов, когда чтение одного файла не удается во время потоковой передачи SFTP - PullRequest
0 голосов
/ 30 мая 2019

Я читаю файлы по одному из удаленного каталога, используя SFTP. Для этого я использую LS gateway, затем split, GET gateway и затем в конце переименуем файл.

Последовательность файлов очень важна для меня, поэтому имя файла имеет счетчик. Я хочу, чтобы, если после потоковой передачи файла с использованием шлюза GET там возникала какая-то проблема с данными или во время обработки данных, я не хотел бы, чтобы следующие файлы в последовательности читались.

IntegrationFlows.from(() -> path, e -> e.poller(Pollers.fixedDelay(60, TimeUnit.SECONDS)))
.handle(Sftp.outboundGateway(sftpSessionFactory(), LS, "payload")
        .regexFileNameFilter(".*csv"))
.split()
.handle(Sftp.outboundGateway(sftpSessionFactory(), GET, "payload.remoteDirectory + payload.filename").options(STREAM).temporaryFileSuffix("_reading"))
.handle(readCsvData(), e -> e.advice(afterReadingCsv()))
.filter(this, "checkSuccess")
.enrichHeaders(h -> h
        .headerExpression(FileHeaders.RENAME_TO, "headers[file_remoteDirectory] + 'archive/' + headers[file_remoteFile]")
        .headerExpression(FileHeaders.REMOTE_FILE, "headers[file_remoteFile]")
        .header(FileHeaders.REMOTE_DIRECTORY, "headers[file_remoteDirectory]"))
.handle(Sftp.outboundGateway(sftpSessionFactory(), MV, "headers[file_remoteDirectory]+headers[file_remoteFile]").renameExpression("headers['file_renameTo']"))
.get();

Я даже не хочу переименовывать файл, если при обработке данных возникают проблемы, я могу остановить этот поток, но я не уверен, как остановить поток для чтения последующих файлов.

...