Тайм-аут ожидания соединения из пула при опросе S3 для объектов - PullRequest
0 голосов
/ 06 января 2019

Я работаю над бэкэнд-сервисом, который периодически опрашивает корзину S3, используя интеграцию Spring AWS, и обрабатывает опрашиваемый объект из S3. Ниже приведена реализация для него

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);   
        return messageSource;
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
    }

    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileReadingFlow() throws IOException {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
                .get();
    }
}

Я получаю сообщения от S3 при загрузке объекта и могу обработать его, используя входной поток, полученный как часть полезной нагрузки сообщения. Но проблема, с которой я здесь сталкиваюсь, заключается в том, что я получаю исключение «Время ожидания ожидания соединения из пула» после получения нескольких сообщений

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

Я знаю, что проблема связана с не закрытием открытого S3Object, как указано здесь https://github.com/aws/aws-sdk-java/issues/1405, поэтому я реализовал закрытие входного потока S3Object, полученного как часть полезной нагрузки сообщения. Но это не решает проблему, и я продолжаю получать исключения. Может кто-нибудь помочь мне решить эту проблему?

1 Ответ

0 голосов
/ 07 января 2019

Ваша проблема в том, что вы все еще смешиваете объявления аннотаций обмена сообщениями с Java DSL в вашей конфигурации.

Похоже, что в fileReadingFlow вы закрываете эти InputStream s в методе кода processS3Object(), но вы ничего не делаете с InputStream s, полученными @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")). Почему ты вообще это в первом месте? Что заставляет вас хранить этот код, если вы его не используете?

Этот S3StreamingMessageSource опрашивается все время дважды: @InboundChannelAdapter и IntegrationFlows.from().

Вам просто нужно удалить этот @InboundChannelAdapter из определения S3StreamingMessageSource bean, и все.

Пожалуйста, прочитайте Справочное руководство, чтобы определить причину такой аннотации и то, как она вам не нужна при использовании Java DSL:

https://docs.spring.io/spring-integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters

...