У меня есть приложение для пользователя Kinesis, разработанное с использованием spring-integration-aws
версии 1.1.0.RELEASE
.
В моих тестах я запускаю два экземпляра этого приложения в одной группе потребителей и использую поток из двух сегментов. В своих тестах я понял, что KinesisMessageDrivenChannelAdapter
будет распространять сообщения тремя способами:
- Все сообщения доставлены одному потребителю
- сообщений, распределенных обоим потребителям (неравномерно)
- Оба потребителя получили одинаковые сообщения
Со стороны производителя сообщения распределяются равномерно между двумя шардами. Я хотел бы знать, как адаптер kinesis распределяет сообщения среди потребителей и, если поддерживается, как я могу получить равномерное распределение среди потребителей.
Спасибо
ОБНОВЛЕНИЕ (конфигурация адаптера)
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
AmazonKinesis amazonKinesis) {
String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(new QueueChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setCheckpointMode(CheckpointMode.record);
adapter.setStartTimeout(10000);
adapter.setConsumerGroup(consumerClientProperties.getName());
adapter.setListenerMode(ListenerMode.record);
adapter.setDescribeStreamRetries(1);
return adapter;
}
@Bean
public DynamoDbMetadataStore dynamoDbMetaDataStore() {
DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
consumerClientProperties.getName());
return dynamoDbMetaDataStore;
}