У меня есть решение на основе этого вопроса и выбранного ответа.Однако для каждой прочитанной строки я выполняю SQL-запрос, который, скажем, занимает 13 мс.В настоящее время я имитирую это замедление с фиксированной задержкой 13 мс.
Файлы, которые мне нужны для потоковой передачи, имеют размер, скажем, 1 ГБ, и их сотни.Порядок , в котором я обрабатываю файлы, имеет значение , но порядок , в котором я обрабатываю строки в одном файле, не имеет значения .
Поэтому я хотел бы знать,Есть ли способ сделать указанное решение параллельно?Как, может быть, разделить один файл пополам или на 1/10 блоков, а затем передать и обработать эти блоки параллельно?
Вот текущий код, с некоторыми не относящимися к делу именами и такими опущенными.Этот файл выводит файл построчно и обрабатывает его построчно, и это медленно!
const fs = require('fs');
const es = require('event-stream');
var argv = require('minimist')(process.argv.slice(2));
const mysql = require('promise-mysql');
const moment = require('moment');
var exec = require('child_process').exec;
Promise.prototype.delay = function(t)
{
return this.then(function(v)
{
return delay(t, v);
});
}
const dataFilePath = '/path/';
var fileToRead = argv.file ? argv.file : argv.f ? argv.f : false;
if(fileToRead === false)
{
console.log('--file or -f not given, quitting');
process.exit(1);
}
function StreamFile(file)
{
var processingStartTime = Date.now();
var lineNumber = 0;
var readStream = fs.createReadStream(file)
.pipe(es.split())
.pipe(es.mapSync(async function(line)
{
try
{
// pause the readstream
readStream.pause();
lineNumber += 1;
if(lineNumber !== 1)
await ProcessLineData(line, lineNumber);
// resume the readstream, possibly from a callback
readStream.resume();
}
catch(err)
{
console.log(err);
}
})
.on('error', function(err)
{
console.log('Error while reading file.', err);
})
.on('end', function()
{
var processingEndTime = Date.now();
var processingElapsedTime = processingEndTime - processingStartTime;
console.log(`Data processed in ${processingElapsedTime / 1000}s`);
})
);
}
async function ProcessLineData(lineData, lineNumber)
{
try
{
var [a, b, c, d, e] = lineData.split(';');
//console.log(time, value, meteringPoint, clientType, position, unit);
return sqlConnection.query(`SELECT COUNT(*) as "lineExists"
FROM xxx
WHERE xxx.\`a\` = "${a}" AND xxx.\`b\` = ${b}`)
.then(function(existsResult)
{
var lineExists = existsResult[0].lineExists;
if(lineExists)
{
console.log(`Processing line ${lineNumber}/${totalLines} -- UPDATE`);
}
else
{
console.log(`Processing line ${lineNumber}/${totalLines} -- INSERT`);
}
}).delay(13);
}
catch(err)
{
console.log(err);
}
}
// Begin
console.log('Connecting to database...');
var sqlConnection = null;
var totalLines = 0;
mysql.createConnection(sql_connection_config).then(function(connection)
{
console.log('Connected');
sqlConnection = connection;
exec(`wc -l < ${dataFilePath}/${fileToRead}`, function (error, results)
{
if(error)
console.log(error);
totalLines = parseInt(results.toString().trim()) - 1;
StreamFile(dataFilePath + '/' + fileToRead);
});
});