Я использую тестовую систему, которая порождает производителя Kinesis, который начинает писать сообщения, например: от 1 до 100, в поток с двумя осколками.
В течение этого цикла потребитель начинает читать сообщения из потока,Я заметил, что потребитель читает только сообщения LATEST
, которые поступают в поток после его запуска.Так, например, он начинает читать сообщение 43. Я попытался изменить Worker.class для использования политики TRIM_HORIZON
, но, похоже, это не работает.
KinesisClientLibConfiguration c = new KinesisClientLibConfiguration("MediaPlan", "randeepstream",
DefaultAWSCredentialsProviderChain.getInstance(),
"consumer1")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final Worker w = new Worker.Builder()
.recordProcessorFactory(rpf)
.config(kinesisConfig)
.build();
new Thread(() -> w.run()).start();
Процессор моего потребителя настроен как:
public class ConsumerRecordProcessorImpl implements IRecordProcessor {
public void initialize(InitializationInput initializationInput) {
log.info("Setting up consumer with shard {} starting at {}", initializationInput.getShardId(),
initializationInput.getExtendedSequenceNumber());
}
public void processRecords(ProcessRecordsInput processRecordsInput) {
...
}
}
Я ожидаю увидеть сообщение вроде: Setting up consumer with shard shardId-000000000000 starting at TRIM_HORIZON 0
, но вместо этого получу: Setting up consumer with shard shardId-000000000000 starting at LATEST 0
Как заставить моего потребителя прекратить читать последние и читать всенеобработанные сообщения?