AWS Kinesis помещает записи синхронно с помощью NodeJS SDK - PullRequest
0 голосов
/ 12 мая 2019

Я пытаюсь записать данные в Amazon Kinesis, используя AWS SDK для NodeJS. Я использую метод putRecords, который я хочу использовать с async / await. Мне нужно выполнить несколько других шагов, как только записи будут записаны в Kinesis. Я хочу, чтобы это был синхронный процесс, поэтому я не могу использовать только обратные вызовы.

Я пробовал несколько разных способов, используя обещание, асинхронность / ожидание, но ни один из них, похоже, не работает.

С Async / Await:

let response = await kinesisPutRecords({ 
    Records: //records,
    StreamName: //streamName
});

async function kinesisPutRecords(recordsParams){

    return new Promise((resolve, reject)=>{

        kinesisClient.putRecords(recordsParams,function (err, data) {
            if (err) 
                reject(err);
            resolve();        
        }
    });
}

с обещанием

let response = await kinesisClient.putRecords(recordsParams).promise();
console.log(response);

Этот подход используется внутри цикла, цикл продолжается, не дожидаясь завершения этого вызова.

Эти два подхода прекрасно работают с S3.getObject ();

Любая помощь приветствуется. Спасибо.

Обновление 1:

Я обнаружил проблему с приведенным выше кодом. Его просто не хватает try ... catch вызывает проблему. В моем полном коде у меня все еще есть эта проблема, но я смог найти основную причину.

Я использую csv tarnsform с потоками NodeJS. Внутри функции преобразования ожидание не работает. Может быть из-за того, что поток асинхронный и не приостанавливается?

Код, близкий к моему сценарию:

const transformer = transform(function (record) {
    //add records to an array until a threshold is met and then push to kinesis.

let response = await kinesisPutRecords();
console.log(response);

}, function(err, output){
  //logic to push remaining records
});

fileReadStream
   .pipe(gunzip) //unzip the file
   .pipe(csvParser) //to parse the csv
   .pipe(transformer);


async function kinesisPutRecords(recordsParams){

    return new Promise((resolve, reject)=>{
       try{
           kinesisClient.putRecords(recordsParams,function (err, data) {
               if (err) 
                  reject(err);
               resolve();        
           });
       }
       catch(exception ex){
           reject();
       }
    });
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...