Адаптер Spring Integration Kinesis и группы потребителей - PullRequest
0 голосов
/ 30 августа 2018

У меня есть приложение для пользователя Kinesis, разработанное с использованием spring-integration-aws версии 1.1.0.RELEASE.

В моих тестах я запускаю два экземпляра этого приложения в одной группе потребителей и использую поток из двух сегментов. В своих тестах я понял, что KinesisMessageDrivenChannelAdapter будет распространять сообщения тремя способами:

  1. Все сообщения доставлены одному потребителю
  2. сообщений, распределенных обоим потребителям (неравномерно)
  3. Оба потребителя получили одинаковые сообщения

Со стороны производителя сообщения распределяются равномерно между двумя шардами. Я хотел бы знать, как адаптер kinesis распределяет сообщения среди потребителей и, если поддерживается, как я могу получить равномерное распределение среди потребителей.

Спасибо

ОБНОВЛЕНИЕ (конфигурация адаптера)

@Bean
  public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
      AmazonKinesis amazonKinesis) {
    String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(new QueueChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setCheckpointMode(CheckpointMode.record);
    adapter.setStartTimeout(10000);
    adapter.setConsumerGroup(consumerClientProperties.getName());
    adapter.setListenerMode(ListenerMode.record);
    adapter.setDescribeStreamRetries(1);
    return adapter;
  }

  @Bean
  public DynamoDbMetadataStore dynamoDbMetaDataStore() {
    DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
        consumerClientProperties.getName());
    return dynamoDbMetaDataStore;
  }

1 Ответ

0 голосов
/ 31 августа 2018

Рекомендуется всем обновить версию Spring Integration AWS до последней версии 2.0: https://spring.io/blog/2018/08/21/spring-integration-for-aws-2-0-ga-and-spring-cloud-stream-kinesis-binder-1-0-ga

Было сделано множество исправлений на уровне потребителей Kinesis, и теперь у нас есть выборы лидеров, которые не подписываются на один и тот же осколок более одного раза.

Идея состоит в том, чтобы при обработке записей иметь строгий порядок, поэтому только один поток на кластер должен иметь доступ к одному шарду. Этот поток может обрабатывать несколько осколков.

В любом случае, если вы используете два экземпляра приложения, вам нужно внедрить MetadataStore на основе общих данных, например, DynamoDbMetadataStore.

...