Повторить (повторно подписаться) к источнику, наблюдаемому, если в течение тайм-аута нет следующего (...) вызова - PullRequest
0 голосов
/ 30 июня 2018

Я пытаюсь взять наблюдаемый источник rxjs, представляющий сетевое соединение, которое отправляет мне данные, и повторно подключиться (путем повторной подписки на наблюдаемый источник), если я не получил данные в течение периода ожидания. Я, конечно, могу написать это несколько странно, но есть ли хороший способ написать это кратко с помощью rxjs?

1 Ответ

0 голосов
/ 04 июля 2018

Я в конечном итоге написал оператор. Я думаю, что есть лучший способ сделать это, но, учитывая, что никто другой не имеет идеи, вот оператор, который я написал:

import { Observable, Subscription } from "rxjs";

export function retryAfterTimeout<T>(timeout: number, allowCompletion = false): (obs: Observable<T>) => Observable<T> {
    return source => new Observable<T>(observer => {
        let sub: Subscription | undefined;
        let timer: number | undefined;

        function resetTimer() {
            if (timer) clearTimeout(timer);
            timer = window.setTimeout(() => resub(), timeout);
        }

        function resub() {
            if (sub) sub.unsubscribe();
            sub = source.subscribe({
                next(x) {
                    resetTimer();
                    observer.next(x);
                },
                error(err) {
                    observer.error(err);
                },
                complete() {
                    if (allowCompletion)
                        observer.complete();
                    else
                        resub();
                }
            });
        }

        resub();
        resetTimer();

        return () => {
            if (sub) sub.unsubscribe();
            if (timer) window.clearTimeout(timer);
        };
    });
}
...