Существует ли более функциональная реализация этого записываемого потока с использованием Rx JS, которая ожидает обработки обещания перед обработкой? - PullRequest
0 голосов
/ 23 января 2020

Я живу под скалой несколько лет, поэтому я еще не коснулся Rx JS, и мне любопытно, как она справляется с ситуацией, как показано ниже.

Вариант использования для В этом сценарии я открываю соединение и жду, пока оно установит sh, прежде чем обрабатывать любые входящие чанки.

const {Readable, Writable} = require('stream')

class DelayedWritable extends Writable {
  constructor(options = {}){
    options.objectMode = true;
    super(options);

    this.promise = new Promise((resolve, reject) => {
      console.log('...waiting...')
      setTimeout( function() {
        console.log('...promise resolved...')
        resolve(/* with client */)
      }, 2000) 
    }) 
  }
  _write = function(chunk, encoding, done){
    var output = `_write : ${chunk}`
    console.time(output)
    this.promise.then((client) => {
      console.timeEnd(output)
      done() 
    });
  }
  _final = function(done){
    console.log('_final')
  }

}

var readable = Readable.from(['one', 'two', 'three'])
var test = new DelayedWritable();
readable.pipe(test)

Кажется, что более функциональный подход будет оптимальным. Rx JS также предлагает много возможностей управления потоком из коробки, что было бы неплохо.

Спасибо!

1 Ответ

1 голос
/ 23 января 2020

Если вы уже знакомы с Node.js потоками, я не думаю, что вам будет трудно использовать asp ту же концепцию в Rx Js: сборе данных, поступающем со временем .

Вариант использования этого сценария, в котором я открываю соединение и жду, пока оно установит sh перед обработкой любых входящих фрагментов.

Это можно перевести на Rx Js следующим образом:

interval(500)
  .pipe(
    take(5),
    bufferTime(2000), // It takes 2s to establish the connection
  )
  .subscribe(console.log)

$ Суффикс используется для обозначения того, что src$ не простая переменная, это наблюдаемая (выдает значения с течением времени).

interval будет выдавать значения на основе указанного интервала времени, а bufferTime будет собирать испущенные значения до тех пор, пока не пройдет указанный ms. Когда это произойдет, вы получите собранные значения в виде массива.

Что мне особенно нравится в Rx Js, так это то, что он поставляется с множеством встроенных операторов, которые позволяют вам манипулировать входящими data.

const src$ = from(['one', 'two', 'tree']);

src$
  .pipe(
    filter(v => v !== 'one'),
    map(v => v.toUpperCase()),
    toArray() // Group the values in an array once the source completes
  )
  .subscribe(val => console.log(val));

StackBlitz .


Чтобы преобразовать обещание в наблюдаемое, достаточно просто:

from(promise).subscribe()

Приведенный выше пример может показаться тривиальным, и я согласен. Вы неизбежно столкнетесь с более сложными ситуациями, и вам будет полезно узнать о наблюдаемых высших порядков .

...