Проблема с ручным завершением потока, доступного для записи, с {end: false} в конвейере - PullRequest
0 голосов
/ 25 января 2019

Итак, ниже приведен код, использующий пользовательский поток, расширяющий поток. Преобразование из базовой библиотеки узла. Целью этого кода является чтение файла строка за строкой, каждая строка представляет собой строку, отформатированную в JSON, и выполняет некоторую обработку, чтобы проверить, является ли электронная почта новой.

"use strict";

const fs = require('fs');
const split = require('split');
const request = require('request');
const parseString = require('xml2js').parseString;
const moment = require('moment');
const LimitedParallelStream = require('./limitedParallelStream');

const ApiKey = 'xxxxxxxxxxxxxxxxxxxxx';
const ListID = 'yyyyyyyyyyyyyyyyyyyy';

const sourceReadStream = fs.createReadStream(process.argv[2]);
const resultWriteStream = fs.createWriteStream('results2.txt');

let newEmailCount = 0;
let newEmailByDay = {};

sourceReadStream
  .pipe(split())
  .pipe(new LimitedParallelStream(3, (entrant, enc, push, done) => {
    if(!entrant) return done();

    const entrantObj = JSON.parse(entrant);
    const { email, entrydttm } = entrantObj;
    const entryDayKey = moment(entrydttm).format('YYYY-MM-DD');

    console.log(`checking ${email}...`);

    const options = { 
      method: 'GET',
      url: 'http://api.com/api/Subscribers.GetSingleSubscriber',
      qs: 
      { 
        ApiKey,
        ListID,
        EmailAddress: email 
      }
    };

    request(options, function (err, response, body) {
      if (err) throw new Error(err);

      parseString(body, function (parseErr, result) {

        const createdDate = moment(result.date);

        let newEmail = 'NO';
        if (createdDate.isSameOrAfter(entrydttm)) {
          newEmailCount++;
          newEmail = 'YES';
          if (newEmailByDay.hasOwnProperty(entryDayKey)) {
            const { [entryDayKey]: count } = newEmailByDay;           
            newEmailByDay = { ...newEmailByDay, [entryDayKey]: count + 1 };
          } else {
            newEmailByDay = { ...newEmailByDay, [entryDayKey]: 1 };
          }
        }

        push(`${email} - entrydttm: ${entrydttm}, createdttm: ${result.anyType.Date[0]}, new: ${newEmail}\n`);        
        done();
      });
    }); 
  }))
  .pipe(resultWriteStream)  // { end: false }
  .on('finish', () => {
    console.log(`All emails were checked - total new emails: ${newEmailCount}`);
    console.log(`New emails by day: ${JSON.stringify(newEmailByDay, null, 2)}`);
  })
;

Это прекрасно работает, если я просто консоль записываю окончательный результат в обратный вызов 'finish'.

Однако, когда я попытался использовать {end: false} в конвейере для resultWriteStream, а затем вручную завершить поток записи, когда поток чтения заканчивается, поток чтения заканчивается до того, как все содержимое будет прочитано.

Sth, как это:

sourceReadStream
.on('end', () => resultWriteStream.end(`All emails were checked - total new emails: ${newEmailCount}`));

Файл LimitedParallelStream.js выглядит следующим образом:

"use strict";

const stream = require('stream');

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
    if(this.running < this.concurrency) {
      done();
    } else {
      this.continueCallback = done;
    }
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    const tmpCallback = this.continueCallback;
    this.continueCallback = null;
    tmpCallback && tmpCallback();
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = LimitedParallelStream;

Файл для чтения выглядит примерно так, но имеет несколько сотен строк:

{"электронный адрес": "joe.doe@hotmail.com", "entrydttm": "2019-01-16 14:09:07"} {"email": "bill.gee@gmail.com", "entrydttm": "2019-01-16 13:53:17"}

До сих пор я не понял, как правильно разрешить мне записывать окончательный результат в один и тот же файл после того, как он фактически завершил обработку всех строк.

Любая помощь будет высоко ценится!

...