Nodejs PassThrough Stream - PullRequest
       64

Nodejs PassThrough Stream

2 голосов
/ 28 марта 2020

Я хочу передать поток fs.Readstream через поток net .Socket (TCP). Для этого я использую .pipe. Когда fs.Readstream закончится, я не хочу заканчивать поток net .Socket. Вот почему я использую

readStream.pipe(socket, { end: false })

К сожалению, я не получаю 'close', 'fini sh' или 'end' с другой стороны. Это мешает мне закрыть мой fs.Writestream на противоположной стороне. Тем не менее, соединение net .Socket остается, что мне также нужно, потому что я хотел бы получить ID в качестве ответа. Так как я не получаю 'close' или 'fini sh' напротив, к сожалению, я не могу завершить fs.Writestream и поэтому не могу отправить ответ с соответствующим ID

Is Есть ли способ вручную отправить событие 'close' или 'fini sh' через net .socket, не закрывая его? С командой реагируют только мои собственные события. Может кто-нибудь сказать мне, что я делаю не так?

    var socket : net.Socket; //TCP connect
    var readStream = fs.createWriteStream('test.txt');

    socket.on('connect', () => {
        readStream.pipe(socket, {
            end: false
        })
        readStream.on('close', () => {
            socket.emit('close');
            socket.emit('finish');
        })

        //waiting for answer
        //waiting for answer
        //waiting for answer

        socket.on('data', (c) => {
            console.log('got my answer: ' + c.toString());
        })
    })
}

1 Ответ

2 голосов
/ 01 апреля 2020

Ну, на самом деле мало что можно сделать с одним потоком, кроме как предоставить другой стороне возможность узнать, что поток закончился программно.

Когда сокет отправляет событие end, он фактически сбрасывается буфер, а затем закрывает TCP-соединение, которое затем на другой стороне преобразуется в finish после доставки последнего байта. Чтобы повторно использовать соединение, вы можете рассмотреть следующие два варианта:

Один: использовать HTTP keep-alive

Как вы можете себе представить, вы не первый, кто столкнулся с этой проблемой. На самом деле это обычная вещь, и некоторые протоколы, такие как HTTP, уже описаны. Это приведет к незначительным накладным расходам, но только при запуске и завершении потоков - что в вашем случае может быть более приемлемым, чем другие варианты.

Вместо использования основных потоков TCP c вы можете просто использовать HTTP подключений и отправлять ваши данные по HTTP-запросам, запрос HTTP POST был бы просто нормальным, и ваш код не выглядел бы иначе, кроме отключения этого {end: false}. Сокету нужно будет отправлять свои заголовки, поэтому он будет сконструирован так:

const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
   'connection': 'keep-alive',
   'transfer-encoding': 'chunked'
}}, (res) => {
   // here you can call the code to push more streams since the 
});

readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.

На самом деле вам не нужно ждать подключения сокета и направить поток напрямую, как в пример выше, но проверьте, как это ведет себя, если ваше соединение не удается. Ожидание события connect также будет работать, поскольку класс запросов HTTP реализует все события и методы подключения TCP (хотя он может иметь некоторые незначительные различия в сигнатурах).

Дополнительные сведения:

Да, и небольшое предупреждение - поддержка протокола TCP - это другое дело, так что не запутайтесь там.

Два: Используйте конечный пакет "magi c"

. В этом случае вам нужно будет отправить простой конечный пакет, например: \x00 (nul символ) в конце разъем. Это имеет существенный недостаток, потому что вам нужно будет что-то сделать с потоком, чтобы убедиться, что символ nul там не появится, в противном случае - это приведет к дополнительным расходам на обработку данных (что приведет к большей загрузке ЦП).

Для того, чтобы сделать это таким образом, вам нужно обработать данные sh через поток преобразования, прежде чем отправлять их в сокет - ниже приведен пример, но он будет работать только со строками, поэтому его можно адаптировать. это для ваших нужд.

const zeroEncoder = new Transform({
  encoding: 'utf-8',
  transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\\x00')); },
  flush: (cb) => cb('\x00')
});

// ... whereever you do the writing:

readStream
  .pipe(zeroEncoder)
  .on('unpipe', () => console.log('this will be your end marker to send in another stream'))
  .pipe(socket, {end: false})

Затем с другой стороны:

tcpStream.on('data', (chunk) => {
  if (chunk.toString().endsWith('\x00')) {
    output.end(decodeZeros(chunk));
    // and rotate output
  } else {
    output.write(decodeZeros(chunk));
  }
});

Как вы можете видеть, это намного сложнее, и это также просто пример - вы можете упростить его немного с использованием JSON, 7-битной кодировки передачи или некоторыми другими способами, но во всех случаях потребуется некоторая хитрость и, что самое важное, чтение всего потока и намного больше памяти для него - так что я действительно не рекомендую это подходить. Если вы все же выполните:

  • Убедитесь, что вы правильно закодировали / расшифровали данные
  • Подумайте, можете ли вы найти байт, который не появится в ваших данных
  • Вышеприведенное может работать со строками, но, по крайней мере, будет плохо с буферами
  • Наконец, нет контроля ошибок или управления потоками - поэтому необходим по крайней мере pause / resume logi c.

Надеюсь, это полезно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...