Я пытаюсь создать Observable, который постоянно запрашивает обновления у внешней службы, и, если есть новая, выдает обновление:
this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
let shouldLoop = true;
while (shouldLoop)
{
if (!this._client)
throw new Error("This client is not initialised.");
const update = this._lib.receiveSync(this._client, 5);
if (!update)
continue;
if (update._ === "error")
this.emit("error", update);
else
this.emit("update", update);
subscriber.next(update);
}
// never gets here b/c of while loop, so subscribing to this Observable
// causes everything to block
// cancellation logic
return () =>
{
shouldLoop = false;
this._loop = null;
};
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;
this._loopSubscription = this._loop.connect();
Однако функция подписки блокирует, что означаетчто мой код останавливается, когда я звоню connect()
.Как мне переписать это, чтобы подписка была неблокируемой?