Я хотел бы создать потоковый поток чтения CSV, который читает информацию построчно и компилирует ее в массив записей. Для начала я создал поток Transform, который сохраняет состояние в качестве переменной экземпляра.
См. Репозиторий https://github.com/vamsiampolu/stateful-node-stream-question
Используются экспериментальные модули ES2015
Поток с преобразованием состояния
import { Transform } from 'stream';
export default class LineStream extends Transform {
constructor() {
super();
this.state = [];
this.storeState = this.storeState.bind(this);
}
storeState(chunk) {
const line = chunk.toString();
state.push(line);
}
_transform(chunk, enc, done) {
this.storeState(chunk);
this._write(line);
done();
}
}
Чтобы дождаться завершения потока, я создал доступный для записи NoOpStream. Я надеюсь подождать до завершения writeStream и убедиться, что к состоянию можно получить доступ.
Ожидается:
await finished(writeStream);
console.log('Stream has finished', lineStream.state);
последняя строка выше, чтобы напечатать весь текст.
Фактически: ничего не происходит. Либо состояние пустое, либо ничего не печатается.
N oOp Поток
import { Writable } from 'stream';
const noop = () => {};
export default class DevNull extends Writable {
constructor(args) {
super(args);
}
_write(chunk, enc, cb) {
noop(chunk);
cb();
}
}
Использование
import { createReadStream } from 'fs';
import readLine from 'readline';
import { finished as finishedCb } from 'stream';
import { promisify } from 'util';
import LineStream from './state-stream.mjs';
import NoOpStream from './noop-stream.mjs';
const finished = promisify(finishedCb);
async function main() {
const readStream = createReadStream('./random_text');
const lineStream = new LineStream();
const devNull = new NoOpStream();
const writeStream = lineStream.pipe(devNull)
readLine.createInterface({
input: readStream,
output: writeStream
});
await finished(writeStream);
console.log('Stream has finished', lineStream.state);
}
main();