Фильтрация объекта Stream в node.js - PullRequest
6 голосов
/ 04 августа 2011

Мне кажется, что элегантный способ обработки определенных видов данных в Node.js - это цепочка объектов обработки, таких как каналы UNIX.

Например, grep:

function Grep(pattern) {
    ...
}
util.inherits(Grep, stream.Stream);

Grep.prototype.???? = ???????  // What goes here?

grep = new Grep(/foo/);

process.stdin.pipe(grep);
myStream.pipe(process.stdout);

Однако мне не совсем понятно, как различные методы Stream должны быть переопределены, чтобы это работало.

Как я могу создать объект Stream, который просто копирует его входные данные в свои выходные? Предположительно с учетом этого, более сложные потоки фильтрации становятся тривиальными.

Обновление: такое ощущение, что должно работать следующее (выражается в CoffeeScript, поэтому я не заполняю это поле синтаксисом JS!):

class Forwarder extends stream.Stream
    write: (chunk, encoding) ->
        @emit 'data', chunk
    end: (chunk, encoding) =>
        if chunk?
            @emit 'data', chunk
        @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

Однако, что-то в этом скрипте ничего не выводит. Явный вызов fwd.write () в скрипте вызывает вывод на стандартный вывод.

Ответы [ 2 ]

8 голосов
/ 05 августа 2011

Ты так близко.

Поскольку вы используете потоковый класс очень низкого уровня, вам нужно установить свойство потока для записи, чтобы сделать его доступным для записи. Если вы читаете из потока, вам нужно установить свойство для чтения. Также конечное событие не имеет аргументов.

class Forwarder extends stream.Stream
  constructor: ->
    @writable = true
  write: (chunk, encoding) ->
    @emit 'data', chunk
  end: ->
    @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

Обновление

Ответ выше применяется к потокам V1 в узле <= 0,8. Если вы используете> 0,8, Node добавил более конкретные классы, которые предназначены для расширения, поэтому вы должны использовать что-то более похожее на это:

class Forwarder extends stream.Transform
    _transform: (chunk, encoding, callback) ->
      this.push(chunk);
      callback();

Обработка chunk и добавление частей, которые вам действительно нужны.

3 голосов
/ 20 сентября 2014

Несмотря на то, что существующий ответ хорош, он все же требует некоторого изучения того, кто ищет ответы.

Следующий код завершает пример, приведенный OP с использованием потокового API узла 0.10.

var stream = require('stream')
var util = require('util')

function Grep(pattern) {
  stream.Transform.call(this)

  this.pattern = pattern
}

util.inherits(Grep, stream.Transform)

Grep.prototype._transform = function(chunk, encoding, callback) {
  var string = chunk.toString()

  if (string.match(this.pattern)) {
    this.push(chunk)
  }

  callback()
}

var grep = new Grep(/foo/)
process.stdin.pipe(grep)
grep.pipe(process.stdout)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...