Триггеры закрытия событий перед событиями данных в потоке файлов - PullRequest
0 голосов
/ 26 апреля 2019

У меня есть скрипт, который добавляет данные JSON из файла в таблицу DynamoDB. Сценарий использует модуль "fs", чтобы открыть поток чтения в файл json и построчно получать данные. Когда данные возвращаются, они вставляются в таблицу DynamoDB. Когда операция заканчивается, дается сводка выполнения с количеством обработанных, успешно вставленных и неудачно вставленных записей. Проблема в том, что сводка выполняется до полной обработки файла. В результате цифры неверны.

Сценарий ...

ddb_table_has_records(table_name, (err, dat) => {
  if (dat.Count === 0 || force) {
    const transformStream = JSONStream.parse("*");
    const inputStream = fs.createReadStream(import_file);

    let record_position = 0;
    let count_imported_successful = 0;
    let count_imported_fail = 0;

    inputStream.pipe(transformStream).on("data", (Item) => {
      const params = {
        TableName: table_name,
        Item
      }

      ddb_client.put(params, (err, data) => {
        ++record_position;

        if (err) {
          console.error("Unable to add mapping for record " + record_position + ", error = " + err);
          ++count_imported_fail;
        } else {
          console.log("PutItem succeeded " + record_position);
          ++count_imported_successful;
        }
      });
    }).on("close", () => {
      console.log("=".repeat(70));
      console.log(`'Completed: ${import_file}' has been loaded into '${table_name}'.`);

      console.log(` Record Count: ${record_position}`);
      console.log(` Imported Record Count: ${count_imported_successful}`);
      console.log(` Rejected Record Count: ${count_imported_fail}`);
    });
  } else {
    console.log("=".repeat(70));
    console.log(`Completed: Skipping import of '${import_file}' into '${table_name}'.`);
  };
});

Когда это работает, это выглядит следующим образом

PS C:\> node --max-old-space-size=8192 .\try.js 'foo' 'us-west-2' 'development' '.\data.json' true
Target Profile:  development
Target Region:  us-west-2
Target Table:  foo
Source File:  .\data.json
Force Import:  true
Confirming Table's State...
======================================================================
'Completed: .\data.json' has been loaded into 'foo'.
 Record Count: 0
 Imported Record Count: 0
 Rejected Record Count: 0
PutItem succeeded 1
PutItem succeeded 2
PutItem succeeded 3
PutItem succeeded 4
...

Часть кода, которая получает количество записей, выполняется до завершения вставки, поэтому импортированные и отклоненные номера записей всегда неверны. Похоже, что поток файлов закрывается во время вставки. Я пытался перейти с события «закрыть» на «конец», тот же результат.

1 Ответ

0 голосов
/ 24 июня 2019

Протестируйте этот скрипт с помощью следующего вызова ...

node --max-old-space-size=8192 .\data.load.js 'foo' 'us-west-1' 'dev' '.\foo.default.json' true

Вот содержимое скрипта, который я в конечном итоге использовал ...

'use strict'

if (process.argv.length < 6) {
    throw new Error ('Please pass the table-name, aws-Region, aws-Profile, and file-path to the script.');
}

let [, , TableName, Region, Profile, ImportFile, Force] = process.argv;

process.env.AWS_SDK_LOAD_CONFIG = true;
process.env.AWS_PROFILE = Profile;

Force = typeof(Force) !== 'undefined' ? Force : false;

const AWS = require('aws-sdk');
const fs = require('fs');
const JSONStream = require('JSONStream');

AWS.config.update({ region: Region });

const ddbc = new AWS.DynamoDB.DocumentClient();

console.log('Target Profile: ', Profile);
console.log('Target Region: ', Region);
console.log('Target Table: ', TableName);
console.log('Source File: ', ImportFile);
console.log('Force Import: ', Force);

// Returns the number of records in a specified table
const ddb_table_has_items = (TableName) => {
    return new Promise((resolve, reject) => {
        const ddb_query_parameters = { TableName, Select: 'COUNT' }

        ddbc.scan(ddb_query_parameters, (error, data) => {
            (error) ? reject(error) : resolve(data);
        });
    });
}

const ddb_table_upsert_items = (TableName, Item) => {
    return new Promise((reject, resolve) => {
        const ddb_insert_payload = { TableName, Item };

        ddbc.put(ddb_insert_payload, (error, data) => {
            (error) ? reject(error) : resolve(data);
        });
    });
}

const ddb_bulk_load = (TableName, ImportFile) => {
    return new Promise ( (resolve, reject) => {
        let count_succeeded = 0;
        let count_failed = 0;
        let count_attempted = 0;
        let inserts = [];

        const json_stream = JSONStream.parse( "*" );
        const source_data_stream = fs.createReadStream(ImportFile);
        const ddb_source_item = source_data_stream.pipe(json_stream);

        ddb_source_item.on("data", (source_data_item) => {
            count_attempted++;

            let ddb_insert = ddb_table_upsert_items(TableName, source_data_item)
                .then( (data) => count_succeeded++ )
                .catch( (error) => count_failed++ );

            inserts.push(ddb_insert);
        });

        ddb_source_item.on("end", () => {
            Promise.all(inserts)
                .then(() => {
                    resolve({count_succeeded, count_failed, count_attempted});
                })
                .catch((error) => {
                    console.log(error);
                    reject(error);
                });
        });

        ddb_source_item.on("error", (error) => {
            reject(error);
        });
    });
}

(async () => {
    try {
        let proceed_with_import = false;

        if ( Force.toString().toLowerCase() === 'true' ) {
            proceed_with_import = true;
        } else {
            const table_scan = await ddb_table_has_items(TableName);
            proceed_with_import = ( table_scan.Count === 0 );
        }

        if (proceed_with_import) {
            let ddb_inserts = await ddb_bulk_load(TableName, ImportFile);

            console.log("=".repeat(75));
            console.log("Completed: '%s' has been loaded into '%s'.", ImportFile, TableName);
            console.log(" Insert Attempted: %s", ddb_inserts.count_attempted);
            console.log(" Insert Succeeded: %s", ddb_inserts.count_succeeded);
            console.log(" Insert Failed   : %s", ddb_inserts.count_failed);
        }
    } catch (error) {
        console.log(error);
    }
})();

Завершение каждой вставки в обещании, помещая в массив вставки-обещания и используя обещание, все в этом массиве добились цели.Я выполняю обещание все, как только мы закончим читать из файла;как только событие «end» будет отправлено в поток ddb_source_item.

...