Реализация экспоненциального отката с помощью rxjs - PullRequest
0 голосов
/ 26 октября 2018

Angular 7 документы обеспечивают этот пример практического использования rxjs Observable s при реализации экспоненциального отката для запроса AJAX:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

Хотя я понимаю концепциюиз Observables и Backoff, я не могу понять, как именно retryWhen будет рассчитывать интервалы времени для повторной подписки на источник ajax.

В частности, как сделать zip, map и mapMerge работают в этой настройке?

А что будет содержаться в объекте attempts при его выбросе в retryWhen?

Я просмотрел их справочные страницы, но все еще не могу обдумать это.

1 Ответ

0 голосов
/ 28 октября 2018

Я потратил довольно много времени на изучение этого (в учебных целях) и постараюсь максимально подробно объяснить работу этого кода.

Во-первых, вот оригинальный код с комментариями:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}
  1. Достаточно просто, мы создаем пользовательский оператор backoff из оператора retryWhen. Мы сможем применить это позже в функции pipe.
  2. В этом контексте метод pipe возвращает пользовательский оператор.
  3. Наш пользовательский оператор будет модифицированным retryWhen оператором. Требуется аргумент функции. Эта функция будет вызываться один раз - в частности, когда этот retryWhen впервые встречается / вызывается. Кстати, retryWhen попадает в игру только , когда наблюдаемый источник выдает ошибку. Затем он предотвращает дальнейшее распространение ошибки и повторно подписывается на источник. Если источник выдает результат без ошибок (при первой подписке или при повторной попытке), retryWhen передается и не участвует.

    Несколько слов о attempts. Это наблюдаемое. Это не источник, наблюдаемый. Он создан специально для retryWhen. Он имеет одно и только одно использование: всякий раз, когда подписка (или повторная подписка) на наблюдаемый источник приводит к ошибке, attempts запускает next. Нам дают attempts, и мы можем использовать его для того, чтобы как-то реагировать на каждую неудачную попытку подписки на наблюдаемый источник.

    Так вот что мы собираемся сделать.

    Сначала мы создаем range(1, maxTries), наблюдаемое, которое имеет целое число для каждой повторной попытки, которую мы готовы выполнить. range готов немедленно набрать все свои номера, но мы должны держать его лошадей: нам нужен только новый номер, когда произойдет повторная попытка. Так вот почему мы ...

  4. ... застегните его с помощью attempts. Это означает, что необходимо объединить каждое излученное значение attempts с одним значением range.

    Помните, что функция, в которой мы сейчас находимся, будет вызываться только один раз, и тогда attempts будет вызывать next только один раз - для первоначальной неудачной подписки. Итак, на данный момент наши две наблюдаемые наблюдаемые величины дали только одно значение.

    Кстати, каковы значения двух наблюдаемых в одном виде? Эта функция решает, что: (i) => i. Для наглядности можно написать (itemFromRange, itemFromAttempts) => itemFromRange. Второй аргумент не используется, поэтому он отбрасывается и первый переименовывается в i.

    Что здесь происходит, мы просто игнорируем значения , сгенерированные attempts, нас интересует только факт , что они запускаются. И всякий раз, когда это происходит, мы извлекаем следующее значение из range наблюдаемого ...

  5. ... и возвести в квадрат. Это для экспоненциальной части экспоненциального отката.

    Итак, теперь, когда (повторная) подписка на источник терпит неудачу, у нас в руках постоянно увеличивается целое число (1, 4, 9, 16 ...). Как мы можем преобразовать это целое число в задержку до следующей повторной подписки?

    Помните, что эта функция, в которой мы сейчас находимся, должна возвращать наблюдаемую, используя attempts в качестве входных данных. Эта результирующая наблюдаемая создается только один раз. retryWhen затем подписывается на полученную в результате наблюдаемую и: повторяет попытку подписки на наблюдаемый источник всякий раз, когда возникают наблюдаемые пожары next; звонки complete или error на источник, наблюдаемый всякий раз, когда в результате наблюдаемого запускаются соответствующие события.

  6. Короче говоря, нам нужно заставить retryWhen немного подождать. delay оператор может быть использован, но установка экспоненциального роста задержки, вероятно, будет болезненной. Вместо этого в игру вступает оператор mergeMap.

    mergeMap - это сочетание двух операторов: map и mergeAll. map просто конвертирует каждое увеличивающееся целое число (1, 4, 9, 16 ...) в наблюдаемую timer, которая запускает next после пройденного числа миллисекунд. mergeAll заставляет retryWhen фактически подписаться на timer. Если этого последнего бита не произошло, то наша результирующая наблюдаемая просто сразу сработает next с timer наблюдаемым экземпляром в качестве значения.

  7. На данный моментмы создали нашу пользовательскую наблюдаемую, которая будет использоваться retryWhen, чтобы решить, когда именно пытаться повторно подписаться на исходную наблюдаемую.

В нынешнем виде я вижу две проблемы с этой реализацией:

  • Как только наша полученная в результате наблюдаемая стреляет в свой последний next (вызывая последнюю попытку повторной подписки), она также немедленно запускает complete. Если исходная наблюдаемая не вернет результат очень быстро (при условии, что последним выполнением будет тот, который завершится успешно), этот результат будет проигнорирован.

    Это потому, что как только retryWhen слышит complete от нашей наблюдаемой, он вызывает complete в источнике, который все еще может находиться в процессе выполнения AJAX-запроса.

  • Если все попытки были неудачными, источник фактически вызывает complete вместо более логичного error.

Чтобы решить обе эти проблемы, я думаю, что наша наблюдаемая наблюдаемая должна запустить error в самом конце, после того, как у последней попытки будет некоторое разумное время, чтобы попытаться выполнить свою работу.

Вот моя реализация указанного исправления, которая также учитывает устаревание оператора zip в последней версии rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

Я проверил этот код, и он, кажется, работает правильно во всех случаях. Я не могу потрудиться объяснить это подробно прямо сейчас; Я сомневаюсь, что кто-нибудь даже прочтет стену текста выше.

В любом случае, большое спасибо @BenjaminGruenbaum и @cartant за то, что они указали мне правильный путь для того, чтобы обернуть мою голову вокруг всего этого.

...