Итак, ниже приведен код, использующий пользовательский поток, расширяющий поток. Преобразование из базовой библиотеки узла. Целью этого кода является чтение файла строка за строкой, каждая строка представляет собой строку, отформатированную в JSON, и выполняет некоторую обработку, чтобы проверить, является ли электронная почта новой.
"use strict";
const fs = require('fs');
const split = require('split');
const request = require('request');
const parseString = require('xml2js').parseString;
const moment = require('moment');
const LimitedParallelStream = require('./limitedParallelStream');
const ApiKey = 'xxxxxxxxxxxxxxxxxxxxx';
const ListID = 'yyyyyyyyyyyyyyyyyyyy';
const sourceReadStream = fs.createReadStream(process.argv[2]);
const resultWriteStream = fs.createWriteStream('results2.txt');
let newEmailCount = 0;
let newEmailByDay = {};
sourceReadStream
.pipe(split())
.pipe(new LimitedParallelStream(3, (entrant, enc, push, done) => {
if(!entrant) return done();
const entrantObj = JSON.parse(entrant);
const { email, entrydttm } = entrantObj;
const entryDayKey = moment(entrydttm).format('YYYY-MM-DD');
console.log(`checking ${email}...`);
const options = {
method: 'GET',
url: 'http://api.com/api/Subscribers.GetSingleSubscriber',
qs:
{
ApiKey,
ListID,
EmailAddress: email
}
};
request(options, function (err, response, body) {
if (err) throw new Error(err);
parseString(body, function (parseErr, result) {
const createdDate = moment(result.date);
let newEmail = 'NO';
if (createdDate.isSameOrAfter(entrydttm)) {
newEmailCount++;
newEmail = 'YES';
if (newEmailByDay.hasOwnProperty(entryDayKey)) {
const { [entryDayKey]: count } = newEmailByDay;
newEmailByDay = { ...newEmailByDay, [entryDayKey]: count + 1 };
} else {
newEmailByDay = { ...newEmailByDay, [entryDayKey]: 1 };
}
}
push(`${email} - entrydttm: ${entrydttm}, createdttm: ${result.anyType.Date[0]}, new: ${newEmail}\n`);
done();
});
});
}))
.pipe(resultWriteStream) // { end: false }
.on('finish', () => {
console.log(`All emails were checked - total new emails: ${newEmailCount}`);
console.log(`New emails by day: ${JSON.stringify(newEmailByDay, null, 2)}`);
})
;
Это прекрасно работает, если я просто консоль записываю окончательный результат в обратный вызов 'finish'.
Однако, когда я попытался использовать {end: false} в конвейере для resultWriteStream, а затем вручную завершить поток записи, когда поток чтения заканчивается, поток чтения заканчивается до того, как все содержимое будет прочитано.
Sth, как это:
sourceReadStream
.on('end', () => resultWriteStream.end(`All emails were checked - total new emails: ${newEmailCount}`));
Файл LimitedParallelStream.js выглядит следующим образом:
"use strict";
const stream = require('stream');
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) {
super({objectMode: true});
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
this.continueCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
if(this.running < this.concurrency) {
done();
} else {
this.continueCallback = done;
}
}
_flush(done) {
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback();
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = LimitedParallelStream;
Файл для чтения выглядит примерно так, но имеет несколько сотен строк:
{"электронный адрес": "joe.doe@hotmail.com", "entrydttm": "2019-01-16 14:09:07"}
{"email": "bill.gee@gmail.com", "entrydttm": "2019-01-16 13:53:17"}
До сих пор я не понял, как правильно разрешить мне записывать окончательный результат в один и тот же файл после того, как он фактически завершил обработку всех строк.
Любая помощь будет высоко ценится!