Spring Kinesis Group в Spring Cloud Stream в группе автоматического масштабирования через JdbcLockRegistry - PullRequest
0 голосов
/ 19 февраля 2019

Я пытаюсь создать потребительское приложение, которое использует весенний облачный поток для получения событий из потока Kinesis.

Мое потребительское приложение работает в группе автоматического масштабирования AWS, и я хотел бы, чтобы оно масштабировалось ивниз в любой точке, не влияя на количество экземпляров, которые могут обработать данную запись в потоке.Точнее говоря, я бы хотел, чтобы не более одного экземпляра обрабатывал событие из потока в любой момент времени.

После прочтения документации я подумал, что эта функциональность может быть достигнута с помощью групп потребителей, механизм связывания Kinesis дажепредлагая свою собственную реализацию MetaDataStore и LockRegistry на основе DynamoDB.

Поскольку я не хочу вводить DynamoDB только для удержания блокировок и метаданных, я мог бы достичь того же результата, используя эквивалентные реализации Jdbc, используя предопределенную группу.

После моего начального теста я увидел, что хранилища jdbc были использованы, как и ожидалось, но когда я увеличил количество экземпляров в ASG, я заметил, что сообщения потребляются дважды (по одному от каждого экземпляра) вместо этогоиз того, что я ожидал бы после прочтения документации.

Мои свойства:

spring.cloud.stream.bindings.input.destination = stream spring.cloud.stream.bindings.input.group = group spring.cloud.stream.bindings.input.content-type = application / json

Конфигурация для лидера реестра блокировок и т. д .:

@Bean
public DefaultLockRepository lockRepository(DataSource dataSource) {
    return new DefaultLockRepository(dataSource);
}

@Bean
public LockRegistry lockRegistry(LockRepository lockRepository) {
    return new JdbcLockRegistry(lockRepository);
}

@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry lockRegistry) {
    LockRegistryLeaderInitiator lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(lockRegistry);
    lockRegistryLeaderInitiator.setPublishFailedEvents(true);
    return lockRegistryLeaderInitiator;
}

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

У меня есть один поток с одним осколком на моем Kinesis вмомент.

Я попытался добавить больше шардов (до 4), я попытался настроить параллелизм через свойства приложения.

Я вижу, что хранилище метаданных заполнено, как и ожидалось, на моембазы данных и того, что потребитель сохраняет состояние, как ожидалось, но я также вижу несколько записей в таблице int_lock, которые я не ожидал увидеть (т.е. я ожидал увидеть только одного зарегистрированного клиента, но вижу несколько).

Итак, мои вопросы:

  1. Кто-нибудь настраивал потребителя, использующего реестр jdbc?
  2. Есть ли проблемы с моей настройкой?
  3. Поскольку документация недействительно ясно, блокировка реестра блокирует доступ к потоку Kinesis?

Я почти уверен, что упускаю что-то на концептуальной стороне, но ничего не нашелв документации самой близкой к моей проблеме была эта проблема, сообщаемая на github Но это еще больше смутило меня в отношении ожидаемых конфигураций.Также не ясно, как instanceIndex и instanceCount должны работать в среде ASG

1 Ответ

0 голосов
/ 20 февраля 2019

Я не вижу проблем с вашей конфигурацией.KinesisMessageChannelBinder действительно ожидает ConcurrentMetadataStore и LockRegistry бобов.И уже не имеет значения, какие реализации вы используете.

Хотя вам не нужно LockRegistryLeaderInitiator.Он не используется в связывателе и может сделать для вас дополнительную неожиданную работу.

Вы должны быть уверены, что DataSource является общей СУБД для всех ваших экземпляров.Также вы должны быть уверены, что опция group действительно одинакова для всех экземпляров.

Кроме того, логика блокировки действительно основана на ключе, подобном:

this.consumerGroup + ":" + stream + ":" + shardId

Так что, пока нет идей, что может быть не так в вашей конфигурации ...

...