get-records kinesis для TRIM_HORIZON shard-iterator возвращает пустые записи, однако поток kinesis содержит новые записи - PullRequest
0 голосов
/ 12 октября 2019

aws kinesis (определяется как один поток шардов) get-records для TRIM_HORIZON get-shard-iterator возвращает пустой массив, но поток kinesis содержит новые записи без полей

запись в поток kinesis правильная и чтение с AT_SEQUENCE_NUMBER get-shard-итератор всегда выполняется успешно. Иногда цикл попыток while кэшировал новые записи, но после нескольких запусков функции возвращаемый массив всегда пуст, однако новые записи добавляются в этот поток кинезиса.

public void readDataFromKinesisStream(String kinesisStreamName) throws 
ExecutionException, InterruptedException {
    // Continuously read data records from a shard
    int readJobTimePeriod = kinesisConfig.getReadJobTimePeriod();
    int numberAttemptsRead =  kinesisConfig.getNumberAttemptsRead();
    final Timer timer = new Timer();
    timer.schedule(new TimerTask() {
        public void run() {
        GetShardIteratorRequest  getShardIteratorRequest = 
GetShardIteratorRequest .builder()
                .streamName(kinesisStreamName)
                .shardId("shardId-000000000000")
                .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
                .build();

         try {
            GetShardIteratorResponse getShardIteratorResult = 
kinesisAsyncClient.getShardIterator(getShardIteratorRequest).get();
            String shardIterator = 
getShardIteratorResult.shardIterator();
            int currentNumberAttemptsRead =  numberAttemptsRead;
            while (currentNumberAttemptsRead-- > 0 ) {
                // Create a new getRecordsRequest with an existing 
shardIterator
                // Set the maximum records to return to 25
                GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder()
                        .shardIterator(shardIterator)
                        .limit(kinesisConfig.getMaximumReturnedRecords())
                        .build();

                GetRecordsResponse result = 
kinesisAsyncClient.getRecords(getRecordsRequest).get();

                // Put the result into record list. The result can be 
empty.
                List<Record> records = result.records();

                records.forEach(record -> {
                    final String sequenceNumber = 
record.sequenceNumber();
                    final String partitionKey = record.partitionKey();
                    final String message = record.data().asUtf8String();
                    try {
                        trendingEventSaveService.saveEvent(message);
                    } catch (IOException e) {
                        log.error("Can not write message to 
elasticsearch: " + e.getMessage());
                    }
                });

                if(records.isEmpty()){
                    try {
                        Thread.sleep(50/*threadDelayTime*/);
                    } catch (InterruptedException exception) {
                        throw new RuntimeException(exception);
                    }

                } else {
                    shardIterator = result.nextShardIterator();
                }
            }
        } catch (InterruptedException e) {
            log.error("Can not read from kinesis stream: " + 
e.getMessage());
        } catch (ExecutionException e) {
            log.error("Can not read from kinesis stream: " + 
e.getMessage());
        }
        }
    }, 0, readJobTimePeriod);

}
...