Потребление записей из указанных c шардов в KCL 2.x (Kinesis) - PullRequest
0 голосов
/ 13 июля 2020

У меня есть набор записей по некоторым конкретным c шардам в потоке Kinesis. Я использую потребителя KCL 2.x для получения записей из кинезиса, но проблема в том, что потребитель получает мне записи из всех сегментов, доступных в потоке. Есть ли способ указать сегменты или их идентификаторы при настройке объекта configBuilder или потребителя KCL, чтобы использовались только записи из указанных сегментов.

Пример кода:

configsBuilder = new ConfigsBuilder(
        applicationName,
        streamName,
        kinesisAsyncClient,
        dynamoDbClient,
        cloudWatchClient,
        workerID,
        new RecordProcessorFactory());

scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configBuilder.retrievalConfig()
    );

    // start the kinesis records consumer.
    schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

Спасибо заранее!

1 Ответ

0 голосов
/ 06 августа 2020

KCL 2.x предоставляет интерфейс ShardPrioritization, который позволяет устанавливать приоритеты или фильтровать сегменты:

/**
 * Provides logic to prioritize or filter shards before their execution.
 */
public interface ShardPrioritization {

    /**
     * Returns new list of shards ordered based on their priority.
     * Resulted list may have fewer shards compared to original list
     * 
     * @param original
     *            list of shards needed to be prioritized
     * @return new list that contains only shards that should be processed
     */
    List<ShardInfo> prioritize(List<ShardInfo> original);
}

Тем не менее, вы можете предоставить реализацию ShardPrioritization, которая оставит только актуальные для вас сегменты. .

После этого просто укажите свой приоритет в конфигурации координатора:

configsBuilder.coordinatorConfig
          .shardPrioritization(new CustomShardsPrioritixation())
...