Как я могу получить последние записи в Amazon Kinesis при работе с Javascript SDK? - PullRequest
0 голосов
/ 04 октября 2019

Я использую AWS Javascript SDK для использования из потока данных Kinesis. Я хочу получить последние записи в осколке.

Когда я указываю тип ShardIterator как «ПОСЛЕДНИЕ», я не получаю никаких записей обратно. Однако, когда я использую «TRIM_HORIZON», я получаю все записи обратно.

kinesis.describeStream(describeParams, function(err, data) {
            if (err) {
                console.log(err, err.stack);    // an error occurred
            }
            else {
                var getParams = {
                    ShardId: data.StreamDescription.Shards[0].ShardId,
                    ShardIteratorType: "TRIM_HORIZON",     //get oldest package
                    StreamName: streamName,
                };

                if(shardIteratorType){
                    console.log("you have passed some shardIteratorType:" + shardIteratorType);
                    getParams.ShardIteratorType = shardIteratorType;
                }

                kinesis.getShardIterator(getParams, function(err, result) {
                    if (err) {
                        console.log("Error in getShardIterator()");
                        console.log(err);
                    } else {
                        console.log("calling getRecord with shard iterator");
                        // Get records from the Kinesis stream
                        getRecord(result.ShardIterator);
                    }
                });
            }
        });

function getRecord(shard_iterator) {
    console.log("getRecord was called.");
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {

            if(result.Records.length > 0) {
                    // Loop through all the packages
               for(var i = 0; i < result.Records.length; i++) {
                   if(result.Records[i] != undefined) {
                        var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
                        console.log(getData);
                        var table = document.getElementById("myTable");
                        var row = table.insertRow(0);
                        var j = i + 1 ;
                        var cell1 = row.insertCell(0);


                        cell1.innerHTML = getData ;

                    }
                }
            }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }
        }
    });
}

Я ожидал бы получить только самые последние записи вместо всей истории при использовании «LATEST» вместо «TRIM_HORIZON» для типа итератора сегмента. Что я делаю не так?

1 Ответ

0 голосов
/ 04 октября 2019

Из документации https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

В запросе вы можете указать тип итератора сегмента AT_TIMESTAMP для чтения записей из произвольного момента времени, TRIM_HORIZON, чтобы ShardIterator указывал на последний необработанныйзапись в сегменте в системе (самая старая запись данных в фрагменте) или LATEST, чтобы вы всегда читали самые последние данные в сегменте.

Здесь последний элемент подразумевает пропуск «прямо сейчас»все записи между последней контрольной точкой и сейчас.

Используя LATEST в качестве типа итератора сегмента, вы можете подумать, что он читает записи, поступающие сразу после использования итератора сегмента, возвращаемого функцией getShardIterator.

Ниже вы можете обратиться к примерукак это работает с 'LATEST':

kinesis.describeStream(describeParams, (err, streamData) => {
    // Skipping error handling

    kinesis.getShardIterator(getShardIteratorParams, (err, shardIterData) => {
        let shardIterator = shardIterData.ShardIterator;

        // Keep reading records from the stream
        while (true) {
            let getRecParams = {
                ShardIterator: shardIterator
            };
            kinesis.getRecords(getRecParams, (err, recData) => {
                // Skipping error handling

                if (recData.Records.length > 0) {
                   // Do something

                   shardIterator = recData.NextShardIterator;
                }
            });
            // Break if you need
        }
    });
});
...