Ваш вопрос очень широкий, но вот несколько советов для потребителей Kinesis, которые, как мы надеемся, имеют отношение к вашему варианту использования.
Каждый поток Kinesis разделен на один или несколько осколков. Существуют ограничения, накладываемые на каждый шард, например, вы не можете записать в шард больше данных, чем МБ данных в секунду, и вы не можете инициировать более 5 запросов GetRecords (которые вызывает сам процесс CustomerRecords) в секунду для одного осколок. (См. Полный список ограничений здесь .) Если вы работаете с объемами данных, которые приближаются к этим ограничениям или превосходят их, вам нужно увеличить количество сегментов в вашем потоке.
Когда у вас есть только одно потребительское приложение и один работник, он берет на себя ответственность за обработку всех сегментов соответствующего потока. Если имеется несколько работников, каждый из них берет на себя ответственность за некоторое подмножество сегментов, так что каждый шард назначается одному и только одному работнику (если вы просматриваете журналы потребителей, вы можете найти это как "получение аренды" на шардах).
Если вы хотите иметь несколько процессоров, которые независимо принимают трафик Kinesis и обрабатывают записи, вам нужно зарегистрировать два отдельных пользовательских приложения. В приведенном выше коде имя приложения является первым параметром конструктора KinesisClientLibConfiguration. Обратите внимание, что даже если они являются отдельными пользовательскими приложениями, все равно применяется ограничение в 5 GetRecords в секунду.
Другими словами, вам нужно иметь два отдельных процесса, один из которых будет создавать экземпляр потребителя, который обращается к БД, а другой будет создавать экземпляр клиента, который обновляет графический интерфейс:
KinesisClientLibConfiguration databaseSaverKclConfig =
new KinesisClientLibConfiguration(
"DatabaseSaverKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
// I believe worker ids don't need to be unique, but it's a good practice to make them unique so you can easily identify the workers
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
// this only matters the very first time your consumer is launched, subsequent launches will read the checkpoint from the previous runs
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final IRecordProcessorFactory databaseSaverConsumer = new DatabaseSaverConsumer();
KinesisClientLibConfiguration guiUpdaterKclConfig =
new KinesisClientLibConfiguration(
"GuiUpdaterKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final IRecordProcessorFactory guiUpdaterConsumer = new GuiUpdaterConsumer();
А как насчет реализации DatabaseSaverConsumer и GuiUpdaterConsumer? Каждый из них должен реализовать собственную логику в методе processRecords. Вы должны убедиться, что каждый из них выполняет правильную работу внутри этого метода, и что логика контрольных точек является надежной. Давайте расшифруем это:
- Допустим, processRecords занимает 10 секунд на 100 записей, но соответствующий шард получает 500 записей за 10 секунд. Каждый последующий вызов processRecords будет отставать от сегмента. Это означает, что либо часть работы должна быть извлечена из processRecords, либо необходимо увеличить количество шардов.
- И наоборот, если processRecords занимает всего 0,1 секунды, тогда processRecords будет вызываться 10 раз в секунду, что превышает выделенные 5 транзакций в секунду на шард. Если я правильно понимаю / помню, нет способа добавить паузу между последующими вызовами к processRecords в конфигурации KCL, поэтому вы должны добавить спящий код в ваш код.
- Контрольная точка: каждому работнику необходимо отслеживать свой прогресс, чтобы в случае его неожиданного прерывания и получения другим работником того же осколка он знал, с чего продолжить. Обычно это делается двумя способами: в начале processRecords или в конце. В первом случае вы говорите: «Я могу прыгать через некоторые записи в потоке, но определенно не хочу обрабатывать их дважды»; в последнем случае вы говорите: «Я хорошо обрабатываю некоторые записи дважды, но определенно не могу потерять ни одну из них». (Когда вам нужно лучшее из обоих миров, то есть обрабатывать записи один раз и только один раз, вы должны сохранять состояние в каком-либо хранилище данных вне рабочих.) В вашем случае разработчику базы данных, скорее всего, понадобится контрольная точка после обработки; Я не так уверен в его GUI.
Говоря о графическом интерфейсе, что вы используете для отображения данных и почему потребителю Kinesis нужно его обновлять, а не сам графический интерфейс, запрашивающий базовые хранилища данных?
В любом случае, я надеюсь, что это поможет. Дайте мне знать, если у вас есть более конкретные вопросы.