Остановите переименование файла, если обработка данных не удалась при потоковой передаче файла удаленного каталога - PullRequest
0 голосов
/ 21 мая 2019

Я читаю файл из удаленного каталога, используя SFTP. Я могу получить файл по потоку, используя исходящий шлюз, и даже переместить его в папку архива.

Я обрабатываю данные в файле, но если есть какая-то проблема в данных, я выдаю ошибку. Я не хочу переименовывать файл , если при обработке данных выдается какая-либо ошибка, как я могу этого добиться. Будет очень полезно, если я смогу получить некоторые полезные советы для использования обработчика ошибок при использовании интеграции с пружиной.

.handle(Sftp.outboundGateway(sftpSessionFactory(), GET, "payload.remoteDirectory + payload.filename").options(STREAM).temporaryFileSuffix("_reading"))
.handle(readData(),c->c.advice(afterReading()))
.enrichHeaders(h -> h
        .headerExpression(FileHeaders.RENAME_TO, "headers[file_remoteDirectory] + 'archive/' + headers[file_remoteFile]")
        .headerExpression(FileHeaders.REMOTE_FILE, "headers[file_remoteFile]")
        .header(FileHeaders.REMOTE_DIRECTORY, "headers[file_remoteDirectory]"))
.handle(Sftp.outboundGateway(sftpSessionFactory(), MV, "headers[file_remoteDirectory]+headers[file_remoteFile]").renameExpression("headers['file_renameTo']"))
.get();

@Bean
    public ExpressionEvaluatingRequestHandlerAdvice afterReading() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("successReading.input");
        advice.setOnSuccessExpressionString("payload + ' was successful streamed'");
        advice.setFailureChannelName("failureReading.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

    @Bean
    public IntegrationFlow successReading() {
        return f -> f.log();
    }

    @Bean
    public IntegrationFlow failureReading() {
        return f -> f.log(ERROR);
    }


public GenericHandler readData() {
return new GenericHandler() {
    @Override
    public Object handle(Object o, Map map) {
        InputStream file = (InputStream) o;
        String fileName = (String) map.get(REMOTE_FILE);
        try {
            // processing data
        } catch (Exception e) {
            return new SftpException(500, String.format("Error while processing the file %s because of  Error: %s and reason %s", fileName, e.getMessage(), e.getCause()));
        }
        Closeable closeable = (Closeable) map.get(CLOSEABLE_RESOURCE);
        if (closeable != null) {
            try {
                closeable.close();
                file.close();
            } catch (Exception e) {
                logger.error(String.format("Session didn`t get closed after reading the stream data for file %s and error %s"), fileName, e.getMessage());
            }
        }
        return map;
    }
    };
}

Обновлено

1 Ответ

0 голосов
/ 21 мая 2019

Добавьте ExpressionEvaluatingRequestHandlerAdvice к конечной точке .handler() .handle(readData(), e -> e.advice(...)).

Последний предоставленный класс рекомендаций - o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.Этот совет является более общим, чем два других совета.Он предоставляет механизм для оценки выражения исходного входящего сообщения, отправленного конечной точке.Отдельные выражения доступны для оценки после успеха или неудачи.При желании сообщение, содержащее результат оценки, вместе с входным сообщением может быть отправлено в канал сообщений.

...