Я пытаюсь реализовать потребителя awes kinesis с пружинной загрузкой, который можно автоматически масштабировать, чтобы разделить нагрузку (разделенные фрагменты обработки) с исходным экземпляром.
Что я смог сделать: используя хорошо определенные прочитанные мной и примеры, доступные здесь Документы Kinesis binder Мне удалось запустить несколько потребителей, которые фактически разделяют фрагменты для обработки, предоставляя эти свойства .
для производителя, я предоставляю partitionCount: 2 через свойство приложения.
а для потребителей я предоставляю и instanceIndex, и instanceCount.
на потребителя 1 у меня instanceIndex = 0 и instantCount = 2,
на потребителя 2 у меня instanceIndex = 1 и instantCount = 2
это работает нормально, и у меня есть два приложения с весенней загрузкой, которые работают со своими осколками. Но в этом случае у меня должен быть предварительно настроенный файл свойств для каждого загрузочного приложения, который должен быть доступен при загрузке, чтобы они могли разделить нагрузку. и если я только запускаю первого потребителя (без автомасштабирования), я обрабатываю только фрагменты, относящиеся к индексу 0, оставляя другие фрагменты необработанными.
То, что я хотел бы сделать, но не уверен, возможно ли, это развернуть одного потребителя (который обрабатывает все шарды). если я разверну другой экземпляр, я бы хотел, чтобы этот экземпляр восстановил у первого потребителя часть нагрузки, иными словами, если у меня есть 2 осколка и один потребитель, он будет обрабатывать оба, если я разверну другое приложение, мне будет нужен этот первый потребитель до сих пор обрабатывает только один осколок, оставляя второй осколок второму потребителю.
Я попытался сделать это, не указав instanceIndex или instanceCount на потребителях, а только указав имя группы, но оставив второго потребителя бездействующим, пока первый не выключится. К вашему сведению, я также создал свою собственную таблицу метаданных и блокировок, не позволяющую компоновщику создавать файлы по умолчанию.
Конфигурация:
Производитель -----------------
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.type
потребители -------------------------------------
originator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings:
input:
destination: <stream-name>
content-type: application/json
consumer:
concurrency: 1
listenerMode: batch
useNativeDecoding: true
recordsLimit: 10
idleBetweenPolls: 250
partitioned: true
group: mygroup