Spring Cloud Aws kinesis Балансировка нагрузки Binder - PullRequest
0 голосов
/ 01 апреля 2019

Я пытался реализовать балансировку нагрузки для потребителей потока Aws kinesis

В соответствии с документацией я пытаюсь реализовать

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

У меня есть 3 контейнеры,Я хочу вызвать новых контейнеров (макс. 6) , если необходимо, без перезапуска существующих.

  1. instanceIndex начинается с 0 или 1.
  2. ЕслиЯ присваиваю instanceCount значение 6, но при этом поднимаю только три экземпляра, будут ли все сообщения использоваться до тех пор, пока я не вызову новые экземпляры.
  3. В документации есть свойство с именем spring.cloud.stream.bindings..consumer..concurrency, Можете ли вы помочь в важности этого.
  4. По некоторым причинам, если какой-либо из экземпляров выйдет из строя, любое из сообщений будет не использовано.

Можете ли вы помочьнам

1 Ответ

1 голос
/ 01 апреля 2019

spring.cloud.stream.bindings..consumer.concurrency является внутренним параметром для каждого потребителя:

adapter.setConcurrency(properties.getConcurrency());

...

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

, поэтому это не имеет ничего общего с вашим распределенным решением.

instanceIndex и instanceCount в Binder работают следующим образом:

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(
                        kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

Таким образом, каждый потребитель получает подмножество шардов в потоке.Поэтому, если у вас больше шардов, чем в инстансах, вы можете столкнуться с тем фактом, что некоторые шарды не используются.

Нечего одновременно использовать сообщения из одного и того же шарда: только один поток может использовать один шард на кластер..

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...