Не уверен, в чем ваша проблема, но я вижу, что у вас есть смесь проблем. Если вы используете аннотации сообщений (см. @InboundChannelAdapter
в вашей конфигурации), какой смысл использовать тот же s3InboundStreamingMessageSource
в определении IntegrationFlow
?
В любом случае похоже, что вы уже исследовали для себя StreamTransformer
. У этого есть свойство charset
для преобразования вашего InputStream
из удаленного ресурса S3 в String
. В противном случае он возвращает byte[]
. Все остальное зависит от вас, что и как делать с этим преобразованным контентом.
Также я не вижу смысла иметь s3Channel
как QueueChannel
, так как начало вашего потока все равно может быть опрошено @InboundChannelAdapter
.
С большой высоты я бы сказал, что у нас к вам больше вопросов, чем наоборот ...
UPDATE
Непонятно, какова ваша идея для обработки InputStream
, но на самом деле это факт, что после S3StreamingMessageSource
в следующем обработчике будет ровно InputStream
в качестве полезной нагрузки.
Также не уверен, какой у вас streamFile()
, но он действительно должен ожидать InputStream
в качестве входных данных от полезной нагрузки сообщения запроса.
Вы также можете использовать упомянутое StreamTransformer
там:
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.transform(Transformers.fromStream("UTF-8"))
.get();
}
И следующий .handle()
будет готов к String
в качестве полезной нагрузки.