Я пытаюсь создать потребительское приложение, которое использует весенний облачный поток для получения событий из потока Kinesis.
Мое потребительское приложение работает в группе автоматического масштабирования AWS, и я хотел бы, чтобы оно масштабировалось ивниз в любой точке, не влияя на количество экземпляров, которые могут обработать данную запись в потоке.Точнее говоря, я бы хотел, чтобы не более одного экземпляра обрабатывал событие из потока в любой момент времени.
После прочтения документации я подумал, что эта функциональность может быть достигнута с помощью групп потребителей, механизм связывания Kinesis дажепредлагая свою собственную реализацию MetaDataStore и LockRegistry на основе DynamoDB.
Поскольку я не хочу вводить DynamoDB только для удержания блокировок и метаданных, я мог бы достичь того же результата, используя эквивалентные реализации Jdbc, используя предопределенную группу.
После моего начального теста я увидел, что хранилища jdbc были использованы, как и ожидалось, но когда я увеличил количество экземпляров в ASG, я заметил, что сообщения потребляются дважды (по одному от каждого экземпляра) вместо этогоиз того, что я ожидал бы после прочтения документации.
Мои свойства:
spring.cloud.stream.bindings.input.destination = stream spring.cloud.stream.bindings.input.group = group spring.cloud.stream.bindings.input.content-type = application / json
Конфигурация для лидера реестра блокировок и т. д .:
@Bean
public DefaultLockRepository lockRepository(DataSource dataSource) {
return new DefaultLockRepository(dataSource);
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository) {
return new JdbcLockRegistry(lockRepository);
}
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry lockRegistry) {
LockRegistryLeaderInitiator lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(lockRegistry);
lockRegistryLeaderInitiator.setPublishFailedEvents(true);
return lockRegistryLeaderInitiator;
}
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
У меня есть один поток с одним осколком на моем Kinesis вмомент.
Я попытался добавить больше шардов (до 4), я попытался настроить параллелизм через свойства приложения.
Я вижу, что хранилище метаданных заполнено, как и ожидалось, на моембазы данных и того, что потребитель сохраняет состояние, как ожидалось, но я также вижу несколько записей в таблице int_lock, которые я не ожидал увидеть (т.е. я ожидал увидеть только одного зарегистрированного клиента, но вижу несколько).
Итак, мои вопросы:
- Кто-нибудь настраивал потребителя, использующего реестр jdbc?
- Есть ли проблемы с моей настройкой?
- Поскольку документация недействительно ясно, блокировка реестра блокирует доступ к потоку Kinesis?
Я почти уверен, что упускаю что-то на концептуальной стороне, но ничего не нашелв документации самой близкой к моей проблеме была эта проблема, сообщаемая на github Но это еще больше смутило меня в отношении ожидаемых конфигураций.Также не ясно, как instanceIndex и instanceCount должны работать в среде ASG