Я пытаюсь записать данные в Amazon Kinesis, используя AWS SDK для NodeJS. Я использую метод putRecords, который я хочу использовать с async / await. Мне нужно выполнить несколько других шагов, как только записи будут записаны в Kinesis. Я хочу, чтобы это был синхронный процесс, поэтому я не могу использовать только обратные вызовы.
Я пробовал несколько разных способов, используя обещание, асинхронность / ожидание, но ни один из них, похоже, не работает.
С Async / Await:
let response = await kinesisPutRecords({
Records: //records,
StreamName: //streamName
});
async function kinesisPutRecords(recordsParams){
return new Promise((resolve, reject)=>{
kinesisClient.putRecords(recordsParams,function (err, data) {
if (err)
reject(err);
resolve();
}
});
}
с обещанием
let response = await kinesisClient.putRecords(recordsParams).promise();
console.log(response);
Этот подход используется внутри цикла, цикл продолжается, не дожидаясь завершения этого вызова.
Эти два подхода прекрасно работают с S3.getObject ();
Любая помощь приветствуется. Спасибо.
Обновление 1:
Я обнаружил проблему с приведенным выше кодом. Его просто не хватает try ... catch вызывает проблему. В моем полном коде у меня все еще есть эта проблема, но я смог найти основную причину.
Я использую csv tarnsform с потоками NodeJS. Внутри функции преобразования ожидание не работает. Может быть из-за того, что поток асинхронный и не приостанавливается?
Код, близкий к моему сценарию:
const transformer = transform(function (record) {
//add records to an array until a threshold is met and then push to kinesis.
let response = await kinesisPutRecords();
console.log(response);
}, function(err, output){
//logic to push remaining records
});
fileReadStream
.pipe(gunzip) //unzip the file
.pipe(csvParser) //to parse the csv
.pipe(transformer);
async function kinesisPutRecords(recordsParams){
return new Promise((resolve, reject)=>{
try{
kinesisClient.putRecords(recordsParams,function (err, data) {
if (err)
reject(err);
resolve();
});
}
catch(exception ex){
reject();
}
});
}