Создавая итератор, действующий как наблюдаемый / go канал, почему он генерирует только несгруппированные асинхронные c события? - PullRequest
0 голосов
/ 31 января 2020

Я моделирую обработчик реактивности из каналов rx js и Golang. Я пытаюсь использовать итераторы для достижения этой цели, однако мой «Канал» испускается только при наличии события asyn c. Почему это так?

Вот мой потребительский код

import { Channel } from "./channel";

const numbers: any = new Channel();

void async function() {
  for (const number of numbers) {
    // Will pause and wait for emit()
    console.log(await number);
  }
}();

numbers.emit(1);
numbers.emit(2);
numbers.emit(3);
setTimeout(() => numbers.emit(4));
setTimeout(() => numbers.emit(5), 50);
setTimeout(() => numbers.emit(6), 100);

Это моя реализация "Channel".

export class Channel<T = any> {
  private onValue = new PromiseSubject()

  emit(value: T): void {
    this.onValue.resolve(value)
    this.onValue = new PromiseSubject()
  }

  *[Symbol.iterator](): Iterator<Promise<T>> {
    while (true) {
      yield this.onValue.promise
    }
  }    
}

export class PromiseSubject<T = any> {
  public resolve!: (value?: T) => void
  public promise = new Promise<T>((res) => this.resolve = res)
}

Я ожидаю, однако, вывод 123456 Я только получаю 456. Кажется, что когда события сгруппированы вместе, итератор не дает значения.

Точно так же, я не получаю консольный вывод при запуске:

setTimeout(() => {
  numbers.emit(1)
  numbers.emit(2)
  numbers.emit(3)
})

Ссылка песочницы: https://codesandbox.io/s/festive-wave-y0o3e?expanddevtools=1&fontsize=14&hidenavigation=1&theme=dark

1 Ответ

0 голосов
/ 31 января 2020

Потому что вам нужно реализовать некую очередь. С

 this.onValue = new PromiseSubject()

вы переопределяете текущий onValue, когда вы вызываете итератор (с for of), вы ожидаете только текущий onValue, а не обещания, сохраненные там ранее.

Однако существуют асинхронные c итераторы, которые делают именно то, что вы ищете!

 export class Channel<T = any> {
   toEmit = [] as T[];
   resolveLast: Promise?;

  emit(value: T): void {
    if(this.resolveLast) this.resolveLast(value);
    else this.toEmit.push(value);
  }

  async *[Symbol.iterator](): {
    while (true) {
      const value = this.toEmit.shift();
      yield value;
      if(this.toEmit.length === 0)
         await new Promise(res => this.resolveLthis.resolveLast = res);
    }
  }    
}

 // iterable as
 for await(const el of new Channel)

Рекомендуемое чтение:

Джейк Арчибальд на асин c итераторы

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...