Как я могу передавать файл построчно, параллельно, с Node.js? - PullRequest
0 голосов
/ 07 февраля 2019

У меня есть решение на основе этого вопроса и выбранного ответа.Однако для каждой прочитанной строки я выполняю SQL-запрос, который, скажем, занимает 13 мс.В настоящее время я имитирую это замедление с фиксированной задержкой 13 мс.

Файлы, которые мне нужны для потоковой передачи, имеют размер, скажем, 1 ГБ, и их сотни.Порядок , в котором я обрабатываю файлы, имеет значение , но порядок , в котором я обрабатываю строки в одном файле, не имеет значения .

Поэтому я хотел бы знать,Есть ли способ сделать указанное решение параллельно?Как, может быть, разделить один файл пополам или на 1/10 блоков, а затем передать и обработать эти блоки параллельно?

Вот текущий код, с некоторыми не относящимися к делу именами и такими опущенными.Этот файл выводит файл построчно и обрабатывает его построчно, и это медленно!

const fs = require('fs');
const es = require('event-stream');
var argv = require('minimist')(process.argv.slice(2));
const mysql = require('promise-mysql');
const moment = require('moment');
var exec = require('child_process').exec;

Promise.prototype.delay = function(t) 
{
    return this.then(function(v) 
    {
        return delay(t, v);
    });
}

const dataFilePath = '/path/';

var fileToRead = argv.file ? argv.file : argv.f ? argv.f : false;

if(fileToRead === false)
{
    console.log('--file or -f not given, quitting');
    process.exit(1);
}

function StreamFile(file)
{
    var processingStartTime = Date.now();

    var lineNumber = 0;
    var readStream = fs.createReadStream(file)
        .pipe(es.split())
        .pipe(es.mapSync(async function(line)
        {
            try
            {
                // pause the readstream
                readStream.pause();
                lineNumber += 1;

                if(lineNumber !== 1)
                    await ProcessLineData(line, lineNumber);

                // resume the readstream, possibly from a callback
                readStream.resume();
            }
            catch(err)
            {
                console.log(err);
            }
        })
        .on('error', function(err)
        {
            console.log('Error while reading file.', err);
        })
        .on('end', function()
        {
            var processingEndTime = Date.now();
            var processingElapsedTime = processingEndTime - processingStartTime;
            console.log(`Data processed in ${processingElapsedTime / 1000}s`);
        })
    );
}

async function ProcessLineData(lineData, lineNumber)
{
    try
    {
        var [a, b, c, d, e] = lineData.split(';');

        //console.log(time, value, meteringPoint, clientType, position, unit);

        return sqlConnection.query(`SELECT COUNT(*) as "lineExists"
            FROM xxx
            WHERE xxx.\`a\` = "${a}" AND xxx.\`b\` = ${b}`)
        .then(function(existsResult)
        {
            var lineExists = existsResult[0].lineExists;

            if(lineExists)
            {
                console.log(`Processing line ${lineNumber}/${totalLines} -- UPDATE`);
            }
            else
            {
                console.log(`Processing line ${lineNumber}/${totalLines} -- INSERT`);
            }
        }).delay(13);

    }
    catch(err)
    {
        console.log(err);
    }
}

// Begin

console.log('Connecting to database...');
var sqlConnection = null;
var totalLines = 0;

mysql.createConnection(sql_connection_config).then(function(connection)
{
    console.log('Connected');
    sqlConnection = connection;

    exec(`wc -l < ${dataFilePath}/${fileToRead}`, function (error, results) 
    {
        if(error)
            console.log(error);

        totalLines = parseInt(results.toString().trim()) - 1;
        StreamFile(dataFilePath + '/' + fileToRead);
    });
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...