Как создать наблюдаемую на основе цикла while? - PullRequest
0 голосов
/ 22 декабря 2018

Я пытаюсь создать Observable, который постоянно запрашивает обновления у внешней службы, и, если есть новая, выдает обновление:

this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
    let shouldLoop = true;

    while (shouldLoop)
    {
        if (!this._client)
            throw new Error("This client is not initialised.");

        const update = this._lib.receiveSync(this._client, 5);

        if (!update)
            continue;

        if (update._ === "error")
            this.emit("error", update);
        else
            this.emit("update", update);

        subscriber.next(update);
    }

    // never gets here b/c of while loop, so subscribing to this Observable
    // causes everything to block

    // cancellation logic
    return () =>
    {
        shouldLoop = false;
        this._loop = null;
    };
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;

this._loopSubscription = this._loop.connect();

Однако функция подписки блокирует, что означаетчто мой код останавливается, когда я звоню connect().Как мне переписать это, чтобы подписка была неблокируемой?

1 Ответ

0 голосов
/ 23 декабря 2018

Спасибо @martin, решение было довольно очевидным.Я не знаю, почему я не подумал об этом ?

this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
    let shouldLoop = true;

    process.nextTick(() =>
    {
        while (shouldLoop)
        {
            if (!this._client)
                throw new Error("This client is not initialised.");

            const update = this._lib.receiveSync(this._client, 5);

            if (!update)
                continue;

            if (update._ === "error")
                this.emit("error", update);
            else
                this.emit("update", update);

            subscriber.next(update);
        }
    });

    // cancellation logic
    return () =>
    {
        shouldLoop = false;
        this._loop = null;
    };
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;
...