Повторите попытку следующего элемента Flux и пропустите успешные - PullRequest
0 голосов
/ 10 октября 2019

Немного предыстории.

Я пытаюсь использовать реактивное программирование, чтобы иметь возможность загрузить файл из другого сервиса. Хитрость заключается в том, что в случае сбоя соединения или сбоя элемента Flux (чего угодно) я хотел бы повторить попытку на Flux несколько раз , но после того, как смогу его схватить, я бы хотел возобновить работу без обработкиэлементы с самого начала.

Я имею в виду, что что-то идет не так, и я получил только 56 элементов из моего Flux из 100 возможных (скажем, это изображение в формате .jpg) из-за сбоя соединения. После успешного повтора я хотел бы возобновить работу с 57th элементом, чтобы мне не приходилось обрабатывать его и снова выполнять GET с самого начала.

Вот какнормальная повторная попытка выглядит следующим образом: Flux retry()

но я бы хотел добиться того, чтобы при повторной попытке мне нужно было получить только красный элемент (так как у меня уже есть желтый и фиолетовый)).

Просто sidenote, я хотел бы добиться функциональности, как с Заголовки запроса диапазона HTTP , где я могу получить байты только в определенном диапазоне, и в случае сбоя я смог бырезюме из байта, который я хочу.

Это вообще возможно, чего я пытаюсь достичь? Если да, что может быть возможным курсом действий?

1 Ответ

1 голос
/ 14 октября 2019

Необходимо поддерживать некоторое состояние (по крайней мере, начало диапазона запроса) для каждого подписчика. Это должно быть сделано перед retry, чтобы каждая повторная попытка заново оценивала диапазон. В то же время состояние должно быть атомно обновляемым и видимым после retry (для целей обновления). Я предполагаю, что вы используете WebClient:

  • a flatMap можно использовать для создания области, в которой состояние диапазона будет видимым
  • в лямбде,AtomicLong может использоваться как состояние
  • снова в лямбда-формате плоской карты, обернуть вызов веб-клиента в Flux.defer, чтобы обеспечить отложенное создание запроса с переоценкой состояния для генерации соответствующего заголовка (чтение из AtomicLong)
  • добавление retry после того, как defer
  • обновит AtomicLong по мере необходимости после получения и обработки каждого фрагмента (например, в doOnNext)
...