Как сохранить состояние в потоке преобразования nodejs и получить к нему доступ позже - PullRequest
0 голосов
/ 08 февраля 2020

Я хотел бы создать потоковый поток чтения CSV, который читает информацию построчно и компилирует ее в массив записей. Для начала я создал поток Transform, который сохраняет состояние в качестве переменной экземпляра.

См. Репозиторий https://github.com/vamsiampolu/stateful-node-stream-question

Используются экспериментальные модули ES2015

Поток с преобразованием состояния

import { Transform } from 'stream';

export default class LineStream extends Transform {
  constructor() {
   super();
   this.state = [];
   this.storeState = this.storeState.bind(this);
  }

  storeState(chunk) {
     const line = chunk.toString();
     state.push(line);
  }

  _transform(chunk, enc, done) {
   this.storeState(chunk);
   this._write(line);
   done();
  }
}

Чтобы дождаться завершения потока, я создал доступный для записи NoOpStream. Я надеюсь подождать до завершения writeStream и убедиться, что к состоянию можно получить доступ.

Ожидается:

await finished(writeStream);
console.log('Stream has finished', lineStream.state);

последняя строка выше, чтобы напечатать весь текст.

Фактически: ничего не происходит. Либо состояние пустое, либо ничего не печатается.

N oOp Поток

import { Writable } from 'stream';

const noop = () => {}; 


export default class DevNull extends Writable {
  constructor(args) {
     super(args);
  }

  _write(chunk, enc, cb) {
    noop(chunk);
    cb();
  }
}

Использование

import { createReadStream } from 'fs';
import readLine from 'readline';
import { finished as finishedCb } from 'stream';
import { promisify } from 'util';

import LineStream from './state-stream.mjs';
import NoOpStream from './noop-stream.mjs';

const finished = promisify(finishedCb);

async function main() {
  const readStream = createReadStream('./random_text');

  const lineStream = new LineStream();
  const devNull = new NoOpStream();

  const writeStream = lineStream.pipe(devNull)

  readLine.createInterface({
    input: readStream,
    output: writeStream
  });

  await finished(writeStream);
  console.log('Stream has finished', lineStream.state);
}

main();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...