Использование async / await в потоке чтения для пакетной загрузки в DynamoDB - PullRequest
1 голос
/ 28 февраля 2020

Здесь немного новичка Node ...

Я пытаюсь написать функцию, которая извлекает CSV из S3 и записывает элементы в DynamoDB. DynamoDB имеет ограничение 25 в каждом пакете, поэтому мне нужно записать записи как I go. Проблема, с которой я сталкиваюсь, заключается в том, что моя await функция для выполнения записи в БД запускается только на .end(), а не когда я проверяю.

Я понимаю, что не могу выполнить такие вещи , но я не уверен, как это исправить? Я использую Node12.

Спасибо.

async function populateTable(
  dataFile: bucketKey,
  tableName: string
): Promise<void> {
  const s3 = getS3Client();
  const stream = s3.getObject(dataFile).createReadStream();

  const BATCH_COUNT = 25; // Max size to write to DynamoDB
  let counter = 0;
  let datarows: any = [];
  let datarow = {};

  stream
    .pipe(parse(DATA_HEADERS))
    .on("data", async function(data: DataRow) {
      counter++;

      datarow = {
        PutRequest: {
          Item: data
        }
      };

      datarows.push(datarow);

      if (counter % BATCH_COUNT === 0) {
        console.log("before batch write " + counter); // This fires!
        await batchWriteToDynamo(datarows, tableName); // I want this function to fully execute before moving on
        console.log("after batch write " + counter); // This does not
        datarows = [];
      }
    })
    .on("end", async function() {
      await batchWriteToDynamo(datarows, tableName); // This fires!
    });
}

1 Ответ

1 голос
/ 28 февраля 2020

Я предполагаю, что эти потоковые события не совместимы с * asyn c; Возможно, вам придется прибегнуть к созданию собственной цепочки обещаний. Потенциально вы могли бы сделать это следующим образом:

  let datarow = {};
  let pr = Promise.resolve();
  // ...
      if (counter % BATCH_COUNT === 0) {
        let scopedRows = datarows.slice(); // scoped shallow copy
        pr = pr.then(()=> batchWriteToDynamo(scopedrows, tableName));

  // ...
    .on("end", async function() {
      pr = pr.then(()=> batchWriteToDynamo(datarows, tableName));

Это должно гарантировать, что ваши пакетные записи происходят по одному и в правильном порядке. Обратите внимание также на мелкую копию datarows во время события данных. Я уверен, что это необходимо, поскольку события и обещания будут происходить в непредсказуемом порядке.

Но, в конце концов, в этом не должно быть необходимости, поскольку, я думаю, datarows больше не должно меняться в этот момент.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...