AWS Kinesis KCL пропускает записи, добавленные перед запуском - PullRequest
0 голосов
/ 04 августа 2020

Я начал использовать как KPL, так и KCL для обмена данными между сервисами. Но всякий раз, когда consumer service отключен, все данные, отправленные KPL, теряются навсегда. Таким образом, я получаю только те фрагменты данных, которые были отправлены, пока consumer service работает, а его shardConsumer готов. Мне нужно начать с последней использованной точки или как-то еще данные процесса, оставленные .

Вот мой ShardProcessor код:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

И код конфигурации:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

1 Ответ

1 голос
/ 04 августа 2020

Есть два способа решить эту проблему. Во-первых, проблема.

По умолчанию KCL настроен на начало чтения потока с LATEST. Этот параметр указывает читателю потока выбрать поток с «текущей» меткой времени.

В вашем случае у вас есть данные в этом потоке, которые были помещены туда до «сейчас». Чтобы прочитать эти данные, вы можете рассмотреть возможность чтения самых ранних данных, которые есть в потоке. Если вы настроили поток по умолчанию, поток будет хранить данные в течение 24 часов.

Чтобы прочитать данные с «начала» этого потока или за 24 часа до запуска приложения KCL, вам понадобится чтобы установить средство чтения потока на TRIM_HORIZON. Этот параметр называется initialPositionInStream. Об этом можно прочитать здесь . В API .

задокументированы три различных параметра. Чтобы решить вашу проблему, предпочтительный метод, как указано в первой ссылке, - это добавить запись в файл свойств. Если вы не используете файл свойств, вы можете просто добавить его в свой Scheduler ctor:

Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordinatorConfig(),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
        .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);

Одну вещь, о которой следует помнить при этой настройке, - это функция запуска, когда у вас есть данные в потоке. и вы начинаете с TRIM_HORIZON. В этом сценарии RecordProcessor будет перебирать записи так быстро, как только может. Это может создать проблемы с производительностью в Kinesis API или даже в последующих системах (куда бы вы ни отправляли данные, когда они были получены от RecordProcessor),

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...