Я потратил довольно много времени на изучение этого (в учебных целях) и постараюсь максимально подробно объяснить работу этого кода.
Во-первых, вот оригинальный код с комментариями:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) { // (1)
return pipe( // (2)
retryWhen(attempts => range(1, maxTries) // (3)
.pipe(
zip(attempts, (i) => i), // (4)
map(i => i * i), // (5)
mergeMap(i => timer(i * ms)) // (6)
)
)
); // (7)
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
- Достаточно просто, мы создаем пользовательский оператор
backoff
из оператора retryWhen
. Мы сможем применить это позже в функции pipe
.
- В этом контексте метод
pipe
возвращает пользовательский оператор.
Наш пользовательский оператор будет модифицированным retryWhen
оператором. Требуется аргумент функции. Эта функция будет вызываться один раз - в частности, когда этот retryWhen
впервые встречается / вызывается. Кстати, retryWhen
попадает в игру только , когда наблюдаемый источник выдает ошибку. Затем он предотвращает дальнейшее распространение ошибки и повторно подписывается на источник. Если источник выдает результат без ошибок (при первой подписке или при повторной попытке), retryWhen
передается и не участвует.
Несколько слов о attempts
. Это наблюдаемое. Это не источник, наблюдаемый. Он создан специально для retryWhen
. Он имеет одно и только одно использование: всякий раз, когда подписка (или повторная подписка) на наблюдаемый источник приводит к ошибке, attempts
запускает next
. Нам дают attempts
, и мы можем использовать его для того, чтобы как-то реагировать на каждую неудачную попытку подписки на наблюдаемый источник.
Так вот что мы собираемся сделать.
Сначала мы создаем range(1, maxTries)
, наблюдаемое, которое имеет целое число для каждой повторной попытки, которую мы готовы выполнить. range
готов немедленно набрать все свои номера, но мы должны держать его лошадей: нам нужен только новый номер, когда произойдет повторная попытка. Так вот почему мы ...
... застегните его с помощью attempts
. Это означает, что необходимо объединить каждое излученное значение attempts
с одним значением range
.
Помните, что функция, в которой мы сейчас находимся, будет вызываться только один раз, и тогда attempts
будет вызывать next
только один раз - для первоначальной неудачной подписки. Итак, на данный момент наши две наблюдаемые наблюдаемые величины дали только одно значение.
Кстати, каковы значения двух наблюдаемых в одном виде? Эта функция решает, что: (i) => i
. Для наглядности можно написать (itemFromRange, itemFromAttempts) => itemFromRange
. Второй аргумент не используется, поэтому он отбрасывается и первый переименовывается в i
.
Что здесь происходит, мы просто игнорируем значения , сгенерированные attempts
, нас интересует только факт , что они запускаются. И всякий раз, когда это происходит, мы извлекаем следующее значение из range
наблюдаемого ...
... и возвести в квадрат. Это для экспоненциальной части экспоненциального отката.
Итак, теперь, когда (повторная) подписка на источник терпит неудачу, у нас в руках постоянно увеличивается целое число (1, 4, 9, 16 ...). Как мы можем преобразовать это целое число в задержку до следующей повторной подписки?
Помните, что эта функция, в которой мы сейчас находимся, должна возвращать наблюдаемую, используя attempts
в качестве входных данных. Эта результирующая наблюдаемая создается только один раз. retryWhen
затем подписывается на полученную в результате наблюдаемую и: повторяет попытку подписки на наблюдаемый источник всякий раз, когда возникают наблюдаемые пожары next
; звонки complete
или error
на источник, наблюдаемый всякий раз, когда в результате наблюдаемого запускаются соответствующие события.
Короче говоря, нам нужно заставить retryWhen
немного подождать. delay
оператор может быть использован, но установка экспоненциального роста задержки, вероятно, будет болезненной. Вместо этого в игру вступает оператор mergeMap
.
mergeMap
- это сочетание двух операторов: map
и mergeAll
. map
просто конвертирует каждое увеличивающееся целое число (1, 4, 9, 16 ...) в наблюдаемую timer
, которая запускает next
после пройденного числа миллисекунд. mergeAll
заставляет retryWhen
фактически подписаться на timer
. Если этого последнего бита не произошло, то наша результирующая наблюдаемая просто сразу сработает next
с timer
наблюдаемым экземпляром в качестве значения.
На данный моментмы создали нашу пользовательскую наблюдаемую, которая будет использоваться retryWhen
, чтобы решить, когда именно пытаться повторно подписаться на исходную наблюдаемую.
В нынешнем виде я вижу две проблемы с этой реализацией:
Как только наша полученная в результате наблюдаемая стреляет в свой последний next
(вызывая последнюю попытку повторной подписки), она также немедленно запускает complete
. Если исходная наблюдаемая не вернет результат очень быстро (при условии, что последним выполнением будет тот, который завершится успешно), этот результат будет проигнорирован.
Это потому, что как только retryWhen
слышит complete
от нашей наблюдаемой, он вызывает complete
в источнике, который все еще может находиться в процессе выполнения AJAX-запроса.
Если все попытки были неудачными, источник фактически вызывает complete
вместо более логичного error
.
Чтобы решить обе эти проблемы, я думаю, что наша наблюдаемая наблюдаемая должна запустить error
в самом конце, после того, как у последней попытки будет некоторое разумное время, чтобы попытаться выполнить свою работу.
Вот моя реализация указанного исправления, которая также учитывает устаревание оператора zip
в последней версии rxjs v6
:
import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";
function backoffImproved(maxTries, ms) {
return pipe(
retryWhen(attempts => {
const observableForRetries =
zip(range(1, maxTries), attempts)
.pipe(
map(([elemFromRange, elemFromAttempts]) => elemFromRange),
map(i => i * i),
switchMap(i => timer(i * ms))
);
const observableForFailure =
throwError(new Error('Could not complete AJAX request'))
.pipe(
materialize(),
delay(1000),
dematerialize()
);
return concat(observableForRetries, observableForFailure);
})
);
}
Я проверил этот код, и он, кажется, работает правильно во всех случаях. Я не могу потрудиться объяснить это подробно прямо сейчас; Я сомневаюсь, что кто-нибудь даже прочтет стену текста выше.
В любом случае, большое спасибо @BenjaminGruenbaum и @cartant за то, что они указали мне правильный путь для того, чтобы обернуть мою голову вокруг всего этого.