Есть два способа решить эту проблему. Во-первых, проблема.
По умолчанию KCL настроен на начало чтения потока с LATEST
. Этот параметр указывает читателю потока выбрать поток с «текущей» меткой времени.
В вашем случае у вас есть данные в этом потоке, которые были помещены туда до «сейчас». Чтобы прочитать эти данные, вы можете рассмотреть возможность чтения самых ранних данных, которые есть в потоке. Если вы настроили поток по умолчанию, поток будет хранить данные в течение 24 часов.
Чтобы прочитать данные с «начала» этого потока или за 24 часа до запуска приложения KCL, вам понадобится чтобы установить средство чтения потока на TRIM_HORIZON
. Этот параметр называется initialPositionInStream
. Об этом можно прочитать здесь . В API .
задокументированы три различных параметра. Чтобы решить вашу проблему, предпочтительный метод, как указано в первой ссылке, - это добавить запись в файл свойств. Если вы не используете файл свойств, вы можете просто добавить его в свой Scheduler
ctor:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Одну вещь, о которой следует помнить при этой настройке, - это функция запуска, когда у вас есть данные в потоке. и вы начинаете с TRIM_HORIZON
. В этом сценарии RecordProcessor
будет перебирать записи так быстро, как только может. Это может создать проблемы с производительностью в Kinesis API или даже в последующих системах (куда бы вы ни отправляли данные, когда они были получены от RecordProcessor),