Я определяю IntegrationFlow
для потоковой передачи из SFTP в S3 с синтаксисом DSL следующим образом:
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
И он работает как задумано: файл хорошо загружен в мое хранилище S3 , Однако я хотел бы избежать синтаксиса SPEL
и вставлять заголовки из сообщения в метод s3uploadMessageHandler
, чтобы я мог использовать простой ValueExpression
для установки keyExpression
в методе s3UploadMessageHandler
. Для этого я изменил
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
на
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3
Но теперь этот обработчик, похоже, больше не запускается. В журналах нет ошибок, и я знаю из журналов, что опрос SFTP все еще работает.
Я попытался найти причину этого и увидел, что при вводе метода handle в IntegrationFlowdefinition.java
тип класса messageHandler
отличается: это S3MessageHandler
при вызове без лямбды и MyCallingClass$lambda
при вызове с лямбда-выражением.
Что я упустил, чтобы мой сценарий работал?