Связка весеннего облачного потока Kinesis - параллелизм - PullRequest
1 голос
/ 13 июня 2019

Я собрал потребителя пружинной загрузки Kinesis со следующими компонентами:

  • пружинный башмак (версия - 2.1.2.RELEASE)
  • весеннее облако (версия - Greenwich.RELEASE)
  • связующее средство для весеннего облачного потока (версия 1.1.0.RELEASE)

Я использую события из потока кинезиса с 1 осколком . Также это весеннее загрузочное потребительское приложение работает на Pivotal Cloud Foundry Platform .

Я попробовал сценарий локально (с кинезалитом) и в PCF (с потоком кинезиса), прежде чем опубликовать этот вопрос. Можете ли вы подтвердить правильность моего понимания? Я просмотрел документацию по весеннему облачному потоку (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ и https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc). Хотя документация является исчерпывающей, параллелизм и высокая доступность подробно не объясняются.

Допустим, у меня есть 3 экземпляра получателя, развернутых в PCF (установив для атрибута экземпляров значение 3 в файле manifest.yml , который используется во время нажатия cf).

Все 3 экземпляра имеют следующие свойства :

spring.cloud.stream.bindings..consumer.concurrency = 5

spring.cloud.stream.bindings..group = мой-потребитель-группа

spring.cloud.stream.kinesis.binder.checkpoint.table = мой-метаданные dynamodb стол

spring.cloud.stream.kinesis.binder.locks.table = мои замки-dynamodb стол

Допустим, что события были отправлены в кинезис производителем в этом порядке

событие5 (последнее событие в потоке) - событие4 - событие3 - событие2 - событие1 (первое событие в потоке)

Для такой конфигурации я объяснил свое понимание ниже. Можете ли вы подтвердить, правильно ли это?

  1. В данный момент времени активен только один экземпляр потребителя, и он будет обрабатывать все события, отправленные в поток кинезиса (поскольку поток имеет только один фрагмент). Один из двух других экземпляров получит контроль только тогда, когда основной экземпляр не работает. Эта конфигурация предназначена для обеспечения высокой доступности и сохранения порядка сообщений.
  2. Так как количество экземпляров установлено в manifest.yml для PCF, мне не нужно беспокоиться о настройке свойств spring.cloud.stream.instanceCount или spring.cloud.stream.bindings..consumer.instanceCount.
  3. 5 потребительских потоков активны (поскольку для параллелизма установлено значение 5), когда запускается / запускается потребитель загрузочной пружины. Теперь события потребляются в порядке, описанном выше. Поток1 получает событие1. Когда поток1 все еще активно обрабатывает событие1, другой поток просто выбирает и начинает обрабатывать следующее событие из потока (событие 2 потока2 и т. Д.). Хотя порядок событий в этом случае сохраняется (событие 1 всегда выбирается до события 2 и т. Д.), Нет никакой гарантии, что поток1 завершит обработку события 1 до потока 2.
  4. Когда все 5 потоков заняты обработкой 5 событий в потоке, если новые события говорят о событиях 6 и 7, то потребитель должен ждать, пока поток станет доступным. Скажем, поток 3 завершил обработку события 3, а другие потоки все еще заняты обработкой событий, поток 3 получит событие 6 и начнет обработку, но событие 7 все еще не обнаружено, так как нет доступных потоков.
  5. По умолчанию параллелизм установлен на 1. Если ваше бизнес-требование требует, чтобы вы завершили обработку первого события, прежде чем подхватывать следующее, тогда параллелизм должен быть 1. В этом случае вы ставите под угрозу пропускную способность. Вы можете использовать только одно событие за раз. Но если пропускная способность важна и вы хотите обработать более одного события в данный момент времени, тогда для параллелизма должно быть установлено желаемое значение. Увеличение количества осколков также является вариантом, но если вы не можете требовать увеличения, это лучший выбор для достижения параллелизма / пропускной способности.

1 Ответ

0 голосов
/ 13 июня 2019

Пожалуйста, смотрите concurrency опцию JavaDocs в KinesisMessageDrivenChannelAdapter:

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

Таким образом, поскольку в этом одном потоке имеется только один шард, будет только один активный поток, который выполняет итерацию на протяжении ShardIterator s на этом одном шарде.

Дело в том, что мы всегда должны обрабатывать записи из одного сегмента в одном потоке. Таким образом, мы гарантируем правильный порядок, плюс проверяется точка для наибольшего порядкового номера.

Пожалуйста, изучите подробнее, что такое AWS Kinesis и как он работает.

...