Проблема извлечения данных из потока NodeJS в Lambda - PullRequest
1 голос
/ 11 ноября 2019

Я работаю с потоками в первый раз, и у меня возникают проблемы с извлечением данных из моего читаемого потока.

Я использую pg и pg-copy-streams для извлечения большого объема данных в виде потока из БД PSQL с целью создания CSVфайл с данными из базы данных.

Вот мой код:

const aws = require('aws-sdk');
const {Client} = require('pg'); //  Needs the nodePostgres Lambda Layer
const copyTo = require('pg-copy-streams').to;

exports.handler = async (event) => {
let response = {};

console.log(JSON.stringify(event));

const client = new Client();

const deviceId = event.deviceId;
const fromDate = event.fromDate;
const toDate = event.toDate;

if (!deviceId) { // if we do not have a device id, just bail.
    return response = {
        statusCode: 400,
        body: "No device Id",
    };
}

const tempTableQuery = getQuery(deviceId, fromDate, toDate);
console.log("Search query: " + tempTableQuery);
try {

    await client.connect();

    await client.query(tempTableQuery);

    const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'`;
    const dataStream = client.query(copyTo(q));

    // dataStream.pipe(console.log(process.stdout));
    dataStream.on('readable', function() {
        // There is some data to read now.
        let data;

        while (data = this.read()) {
            console.log(data); //<- this dosent print anything :(
        }
    });

    dataStream.on('error', async function (err) {
        // Here we can control stream errors
        await client.end();
    });
    dataStream.on('end', async function () {
        await client.end();
    });


} catch (e) {
    response = {
        statusCode: 500,
        result: "Error: " + e
    };
} finally {
    client.end();
}
};

function getQuery(deviceId, fromDate, toDate) {
return `CREATE TEMPORARY TABLE temp_csv_table AS
            SELECT * 
            FROM sensor_data_v2 
            WHERE device_id = '${deviceId}' and 
                  time_stamp between '${fromDate}' and '${toDate}' 
            LIMIT 10;`;
}

Вопрос:

  1. Как извлечь строки из потока данных?
  2. Есть ли лучший способ сделать это?

Примечания:

  1. Выполнение этого в среде AWS Lambda NodeJS 10.x.
  2. Я знаю, что в таблице есть данные для фильтров, которые я указал.
  3. Я установил LIMIT 10 только для этого теста, эти условия вернут 2600 строк данных.
  4. Я буду использовать пакет csv-write-stream для создания файла CSV с данными из БД. Не очень привязанный к этому пакету, с удовольствием использую другого писателя CSV, если с ним будет легче работать.

1 Ответ

1 голос
/ 21 ноября 2019

Я пытался повторить это, и я думаю, что ваша проблема на самом деле в блоке try-catch-finally.

Итак, когда ваш код достиг части const dataStream = client.query(copyTo(q)), он начинает потоковый процесс, которыйне связано с обещанием, поэтому выполнение идет непосредственно к блоку finally. Однако в этом finally блоке вы завершаете работу клиента, поэтому при потоковой передаче данных (которая все еще выполняется) будет выдана ошибка Error: Connection terminated, так как для потоковой передачи потребуется некоторое время.

Чтобы это исправить, вы можете простоудалите client.end() из блока finally и поместите это клиентское завершение как в блок catch, так и в обработчик событий end stream, и тогда оно должно работать.

Вы также можете сделать console.log(data.toString()) во время события readable, так как в противном случае распечатанные данные будут двоичными.

...