Динамически изменяемый интервал опроса - PullRequest
2 голосов
/ 25 февраля 2020

Мне нужно, чтобы мое внешнее приложение опрашивало мое внутреннее приложение и обновляло sh отображаемые данные. Я решил это определенным образом, но у меня есть ощущение, что это можно оптимизировать. Вот мое решение:

        this.pollDataSource$ = new BehaviorSubject<number>(this.pollingInterval);

        this.pollDataSource$
            .pipe(
                switchMap(duration =>
                    of(duration)
                        .pipe(
                            filter(duration => duration !== 0),
                            delay(duration),
                            concatMap(() => this.kubernetesObjectList$)
                        )
                )
            )
            .subscribe({
                next: (kubernetesObjectList) => {
                    this.dataSource.data = kubernetesObjectList;
                    if (this.pollingInterval > 0) {
                        this.pollDataSource$.next(this.pollingInterval);
                    }
                }
            });

        this.pollDataSource$.next(-1);

Итак, у меня есть выпадающий селектор, который влияет на this.pollingInterval. Кроме того, у меня есть this.pollDataSource$, что BehaviorSubject<number>. Он генерирует number, который используется в качестве продолжительности до следующего опроса.

Когда this.pollDataSource$ испускает -1 (это происходит, когда пользователь нажимает Refre sh кнопка), источник данных должен быть опрошен немедленно, независимо от того, какой интервал опроса установлен.

Когда this.pollDataSource$ выдает некоторое положительное число (это происходит, когда пользователь выбирает определенный интервал опроса из выпадающего списка) этот номер должен использоваться как продолжительность до следующего refre sh.

Когда this.pollDataSource$ испускает 0 (это происходит, когда пользователь выбирает параметр Stop Polling в тот же выпадающий селектор), мы должны прекратить опрос, пока пользователь не выберет новый интервал опроса.

И все работает просто отлично. Страница загружена, по умолчанию this.pollingInterval имеет 10000, поэтому пользователь получает данные немедленно, но через 10 секунд они обновляются. Когда пользователь нажимает Refre sh, данные обновляются, и следующий автомат c refre sh происходит через 10 секунд после этого. Когда пользователь переключается на Stop Polling , данные остаются на месте. Когда пользователь переключается на другой интервал, данные снова обновляются. Все здорово! Но у меня есть ощущение, что мое решение не оптимально. Мне просто не нравится эта конструкция: ...pipe...switchMap...of...pipe... Есть ли способ упростить это?

Заранее спасибо за все подсказки.

Ответы [ 2 ]

1 голос
/ 26 февраля 2020

Если вы хотите опросить на интервале, попробуйте следующее:

const intervalDelays$ = new BehaviorSubject(10000);

intervalDelays.pipe(
  switchMap(ms => ms === 0 ? EMPTY : interval(ms).pipe(
    concatMap(() => fetchDataObservableHere$)
  ))
);

interval использует setInterval под капотом. Просто убедитесь, что ms длиннее, чем fetchDataObservableHere$ занимает завершено . В противном случае вы столкнетесь с проблемами противодавления.

Если возникнет проблема с противодавлением, вы можете сделать это:

const intervalDelays$ = new BehaviorSubject(10000);

intervalDelays.pipe(
  switchMap(ms => ms < 0
    // No wait? We'll assume it's "off"
    ? EMPTY
    // just an observable to start the recursive expand call.
    // 'start' doesn't matter. Could be `true` or anything, really.
    : of('start').pipe(
      expand(() => timer(ms).pipe(
        switchMap(() => fetchDataObservableHere$),
        tap(data => {
          // PROCESS DATA HERE
          // this ensures that it's done processing before you move on.
        }),
      )),
  ))
);

Приведенный выше код будет видеть входящие изменения задержки интервала, и запустить новую внутреннюю наблюдаемую область, которая будет рекурсивно выполняться таким образом, что будет ожидать возвращения каждого значения и его обработки перед выполнением другого запроса.

1 голос
/ 26 февраля 2020

Причина, по которой я упомянул expand, заключается в том, что этот оператор можно использовать вместо конструкции

someSubject.pipe(
  // do stuff
).subscribe(
  next: data => {
    // do some more stuff
    someSubject.next(something)
  }

Это та конструкция, которую вы на самом деле используете, поэтому предложение о expand.

Затем, возвращаясь к вашему вопросу без введения expand, вы можете подумать, что это что-то вроде

this.pollDataSource$
        .pipe(
            switchMap(duration =>
                return duration === 0 ?
                  NEVER :
                  this.kubernetesObjectList$.pipe(delay(duration));
            )
        )
        .subscribe({
            next: (kubernetesObjectList) => {
                this.dataSource.data = kubernetesObjectList;
                if (this.pollingInterval > 0) {
                    this.pollDataSource$.next(this.pollingInterval);
                }
            }
        });

    this.pollDataSource$.next(-1);

С expand это может выглядеть как

this.pollDataSource$
            .pipe(
                expand(duration =>
                    return duration === 0 ?
                      NEVER :
                      this.kubernetesObjectList$.pipe(
                         delay(duration),
                         map(() =>  duration)
                      );
                )
            )
            .subscribe({
                next: (kubernetesObjectList) => {
                    this.dataSource.data = kubernetesObjectList;
                }
            });

        this.pollDataSource$.next(-1);

Являются ли эти версии кода более чистыми, чем ваша оригинальная, которую я нашел ясной и читабельной, - вероятно, вопрос личного вкуса.

Все «мог» и «бы», которые я использовал, связано с тем, что у меня нет площадки для тестирования этого кода, поэтому очень возможно, что вы найдете что-то не работающее здесь .

...