spring.cloud.stream.bindings..consumer.concurrency
является внутренним параметром для каждого потребителя:
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
, поэтому это не имеет ничего общего с вашим распределенным решением.
instanceIndex
и instanceCount
в Binder работают следующим образом:
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
Таким образом, каждый потребитель получает подмножество шардов в потоке.Поэтому, если у вас больше шардов, чем в инстансах, вы можете столкнуться с тем фактом, что некоторые шарды не используются.
Нечего одновременно использовать сообщения из одного и того же шарда: только один поток может использовать один шард на кластер..