Потоковая передача объектов из S3 Object с использованием интеграции Spring Aws - PullRequest
0 голосов
/ 04 января 2019

Я работаю над сценарием использования, где я должен опрашивать S3 -> прочитать поток для содержимого -> выполнить некоторую обработку и загрузить его в другое ведро, а не записывать файл на моем сервере.

Я знаю, что могу добиться этого с помощью S3StreamingMessageSource в интеграции Spring AWS, но проблема, с которой я сталкиваюсь, заключается в том, что я не знаю, как обрабатывать поток сообщений, полученный при опросе

public class S3PollerConfigurationUsingStreaming {
    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Value("${amazonProperties.newBucket}")
    private String newBucket;

    @Autowired
    private AmazonClientService amazonClient;

    @Bean
    @InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);
        messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
                "streaming"));      
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "s3Channel", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer();
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
    }

    @Bean
    public PollableChannel s3Channel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileStreamingFlow() {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
                .handle(streamFile())
                .get();
    }

}

Может кто-нибудь помочь мне с кодом для обработки потока?

1 Ответ

0 голосов
/ 04 января 2019

Не уверен, в чем ваша проблема, но я вижу, что у вас есть смесь проблем. Если вы используете аннотации сообщений (см. @InboundChannelAdapter в вашей конфигурации), какой смысл использовать тот же s3InboundStreamingMessageSource в определении IntegrationFlow?

В любом случае похоже, что вы уже исследовали для себя StreamTransformer. У этого есть свойство charset для преобразования вашего InputStream из удаленного ресурса S3 в String. В противном случае он возвращает byte[]. Все остальное зависит от вас, что и как делать с этим преобразованным контентом.

Также я не вижу смысла иметь s3Channel как QueueChannel, так как начало вашего потока все равно может быть опрошено @InboundChannelAdapter.

С большой высоты я бы сказал, что у нас к вам больше вопросов, чем наоборот ...

UPDATE

Непонятно, какова ваша идея для обработки InputStream, но на самом деле это факт, что после S3StreamingMessageSource в следующем обработчике будет ровно InputStream в качестве полезной нагрузки.

Также не уверен, какой у вас streamFile(), но он действительно должен ожидать InputStream в качестве входных данных от полезной нагрузки сообщения запроса. Вы также можете использовать упомянутое StreamTransformer там:

@Bean
IntegrationFlow fileStreamingFlow() {
    return IntegrationFlows
            .from(s3InboundStreamingMessageSource(),
                    e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
            .transform(Transformers.fromStream("UTF-8"))
            .get();
}

И следующий .handle() будет готов к String в качестве полезной нагрузки.

...