Как ждать каждого значения наблюдаемого с обещанием - PullRequest
1 голос
/ 06 марта 2019

Допустим, у меня есть это наблюдаемое:

const obs = new Observable((observer) => {
    observer.next(0.25);
    observer.next(0.75);
    observer.next(new ArrayBuffer(100));
    observer.complete();
});

Как мне ждать каждого значения с обещанием?

Следующий код вернет только последнее значение (значение перед вызовом complete ()):

const value = await obs.toPromise();

Но я хочу иметь возможность получить каждое значение по пути. Я могу сделать что-то вроде этого:

const value1 = await obs.pipe(take(1)).toPromise();
const value2 = await obs.pipe(take(2)).toPromise();

Но это не идеально, поскольку мне придется каждый раз увеличивать число, а также take(1000) все равно будет возвращать что-то в примере, даже если есть только 3 значения. Я ищу что-то вроде:

const value1 = await obs.pipe(next()).toPromise(); // 0.25
const value2 = await obs.pipe(next()).toPromise(); // 0.75
const value3 = await obs.pipe(next()).toPromise(); // ArrayBuffer(100)
const value4 = await obs.pipe(next()).toPromise(); // null

Это больше похоже на генератор.

Есть ли способ сделать что-то подобное?

Ответы [ 2 ]

0 голосов
/ 08 марта 2019

Похоже, что вы запрашиваете способ преобразования наблюдаемой в асинхронную итерацию, так что вы можете асинхронно перебирать ее значения, либо «вручную», либо используя новый for-await-of языковая функция.

Вот пример того, как это сделать (я не тестировал этот код, поэтому в нем могут быть некоторые ошибки):

// returns an asyncIterator that will iterate over the observable values
function asyncIterator(observable) {
  const queue = []; // holds observed values not yet delivered
  let complete = false;
  let error = undefined;
  const promiseCallbacks = [];

  function sendNotification() {
    // see if caller is waiting on a result
    if (promiseCallbacks.length) {
      // send them the next value if it exists
      if (queue.length) {
        const value = queue.shift();
        promiseCallbacks.shift()[0]({ value, done: false });
      }
      // tell them the iteration is complete
      else if (complete) {
        while (promiseCallbacks.length) {
          promiseCallbacks.shift()[0]({ done: true });
        }
      }
      // send them an error
      else if (error) {
        while (promiseCallbacks.length) {
          promiseCallbacks.shift()[1](error);
        }
      }
    }
  }

  observable.subscribe(
    value => {
      queue.push(value);
      sendNotification();
    },
    err => {
      error = err;
      sendNotification();
    },
    () => {
      complete = true;
      sendNotification();
    });

  // return the iterator
  return {
    next() {
      return new Promise((resolve, reject) => {
        promiseCallbacks.push([resolve, reject]);
        sendNotification();
      });
    }
  }
}

Использование с функцией ожидания языка:

async someFunction() {
  const obs = ...;
  const asyncIterable = {
    [Symbol.asyncIterator]: () => asyncIterator(obs)
  };

  for await (const value of asyncIterable) {
    console.log(value);
  }
}

Использование без функции языка ожидания:

async someFunction() {
  const obs = ...;
  const it = asyncIterator(obs);

  while (true) {
    const { value, done } = await it.next();
    if (done) {
      break;
    }

    console.log(value);
  }
}
0 голосов
/ 06 марта 2019

, что может сработать, так как take (1) завершает наблюдаемое, затем оно используется await, следующий в строке произведет второй выброс в value2.

const observer= new Subject()
async function getStream(){
  const value1 = await observer.pipe(take(1)).toPromise() // 0.25
  const value2 = await observer.pipe(take(1)).toPromise() // 0.75
  return [value1,value2]
}
getStream().then(values=>{
  console.log(values)
})
//const obs = new Observable((observer) => {
 setTimeout(()=>{observer.next(0.25)},1000);
 setTimeout(()=>observer.next(0.75),2000);

ОБНОВЛЕНИЕ: Использование объекта для излучения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...