Java-клиент AWS Kinesis: настройка позиции TRIM_HORIZON в потоке не работает - PullRequest
0 голосов
/ 08 мая 2019

Я использую тестовую систему, которая порождает производителя Kinesis, который начинает писать сообщения, например: от 1 до 100, в поток с двумя осколками.

В течение этого цикла потребитель начинает читать сообщения из потока,Я заметил, что потребитель читает только сообщения LATEST, которые поступают в поток после его запуска.Так, например, он начинает читать сообщение 43. Я попытался изменить Worker.class для использования политики TRIM_HORIZON, но, похоже, это не работает.

KinesisClientLibConfiguration c = new KinesisClientLibConfiguration("MediaPlan", "randeepstream",
    DefaultAWSCredentialsProviderChain.getInstance(),
    "consumer1")
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final Worker w = new Worker.Builder()
    .recordProcessorFactory(rpf)
    .config(kinesisConfig)
    .build();
new Thread(() -> w.run()).start();

Процессор моего потребителя настроен как:

public class ConsumerRecordProcessorImpl implements IRecordProcessor {

    public void initialize(InitializationInput initializationInput) {
        log.info("Setting up consumer with shard {} starting at {}", initializationInput.getShardId(),
                initializationInput.getExtendedSequenceNumber());
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        ...
    }
}

Я ожидаю увидеть сообщение вроде: Setting up consumer with shard shardId-000000000000 starting at TRIM_HORIZON 0, но вместо этого получу: Setting up consumer with shard shardId-000000000000 starting at LATEST 0

Как заставить моего потребителя прекратить читать последние и читать всенеобработанные сообщения?

...