Асинхронное ожидание внутри наблюдаемого - PullRequest
2 голосов
/ 29 июня 2019

Как я могу использовать async / await внутри Observable ??С помощью этого кода я не могу вызвать функцию отмены подписки в пределах наблюдаемого, поэтому интервал не очищается.

const { Observable } = require("rxjs");

const test = () => new Observable(async (subscriber) => {
  await Promise.resolve();

  const a = setInterval(() => {
    subscriber.next(Math.random());
    console.log("zz");
  }, 500);

  return () => {
    console.log("asdsad");
    clearInterval(a);
  };
});

const xyz = test().subscribe(console.log);


setTimeout(() => {
  xyz.unsubscribe();
}, 3000);

1 Ответ

0 голосов
/ 01 июля 2019

Прежде всего, подписчик, передаваемый наблюдаемому конструктору, не может быть асинхронной функцией.Это не поддерживается.

Если вам нужно создать наблюдаемое из обещания, используйте from:

import { from } from 'rxjs';
const observable = from(promise);

Но с учетом вашего сценария.

Поскольку существуетнет никакого способа отменить собственное обещание JS, вы не можете отписаться от такой созданной наблюдаемой, поэтому:

const obs = from(new Promise(resolve => {
  setTimeout(() => {
    console.log('gonna resolve');
    resolve('foo');
  }, 1000);
}));

const sub = obs.subscribe(console.log);
setTimeout(() => sub.unsubscribe(), 500);

напечатает:

gonna resolve
gonna resolve
gonna resolve
(...)

так что да: gonna resolve будет напечатано ввсе время, но больше ничего - результат, переданный для разрешения, будет проигнорирован - просто не зарегистрирован.

С другой стороны, если вы удалите эту отписку (setTimeout(() => sub.unsubscribe(), 500);), этораз вы увидите:

gonna resolve
foo
gonna resolve
gonna resolve
gonna resolve
(...)

Есть один способ, который, возможно, поможет вам - defer - но он не связан строго с вашим вопросом.

import { defer } from 'rxjs';

defer(async () => {
  const a = await Promise.resolve(1);
  const b = a + await Promise.resolve(2);

  return a + b + await Promise.resolve(3);
}).subscribe(x => console.log(x)) // logs 7
...