aws kinesis (определяется как один поток шардов) get-records для TRIM_HORIZON get-shard-iterator возвращает пустой массив, но поток kinesis содержит новые записи без полей
запись в поток kinesis правильная и чтение с AT_SEQUENCE_NUMBER get-shard-итератор всегда выполняется успешно. Иногда цикл попыток while кэшировал новые записи, но после нескольких запусков функции возвращаемый массив всегда пуст, однако новые записи добавляются в этот поток кинезиса.
public void readDataFromKinesisStream(String kinesisStreamName) throws
ExecutionException, InterruptedException {
// Continuously read data records from a shard
int readJobTimePeriod = kinesisConfig.getReadJobTimePeriod();
int numberAttemptsRead = kinesisConfig.getNumberAttemptsRead();
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
GetShardIteratorRequest getShardIteratorRequest =
GetShardIteratorRequest .builder()
.streamName(kinesisStreamName)
.shardId("shardId-000000000000")
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.build();
try {
GetShardIteratorResponse getShardIteratorResult =
kinesisAsyncClient.getShardIterator(getShardIteratorRequest).get();
String shardIterator =
getShardIteratorResult.shardIterator();
int currentNumberAttemptsRead = numberAttemptsRead;
while (currentNumberAttemptsRead-- > 0 ) {
// Create a new getRecordsRequest with an existing
shardIterator
// Set the maximum records to return to 25
GetRecordsRequest getRecordsRequest =
GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(kinesisConfig.getMaximumReturnedRecords())
.build();
GetRecordsResponse result =
kinesisAsyncClient.getRecords(getRecordsRequest).get();
// Put the result into record list. The result can be
empty.
List<Record> records = result.records();
records.forEach(record -> {
final String sequenceNumber =
record.sequenceNumber();
final String partitionKey = record.partitionKey();
final String message = record.data().asUtf8String();
try {
trendingEventSaveService.saveEvent(message);
} catch (IOException e) {
log.error("Can not write message to
elasticsearch: " + e.getMessage());
}
});
if(records.isEmpty()){
try {
Thread.sleep(50/*threadDelayTime*/);
} catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
} else {
shardIterator = result.nextShardIterator();
}
}
} catch (InterruptedException e) {
log.error("Can not read from kinesis stream: " +
e.getMessage());
} catch (ExecutionException e) {
log.error("Can not read from kinesis stream: " +
e.getMessage());
}
}
}, 0, readJobTimePeriod);
}