Когда у вас есть IObservable<string> ioObs = Observable.FromAsync<string>(Something);
, у вас есть наблюдаемый, который может либо возвращать значение, а затем завершать ({OnNext} {OnCompleted}), либо у вас есть объект, который генерирует исключение ({OnError}).
Очень просто разрешить источнику повторное выполнение после возврата значений или ошибок.
IObservable<string> query = ioObs.Retry().Repeat();
.Retry()
говорит: «go снова, если вы получите ошибку». .Repeat()
говорит: «Подпишитесь снова, если наблюдаемый завершится».
Теперь это немного опасно, поскольку вы создали наблюдаемый, который будет выполняться непрерывно. Вам нужно найти способ остановить это.
Возможны следующие варианты:
- Отказ от подписки
- Возьмите определенное количество значений (например,
.Take(n)
) - Установите
.Timeout
. - Или используйте
TakeUntil
Последний подходит, если исходный ioObs
возвращает пустую или пустую строку по завершении.
Вы можете сделать что-то вроде этого:
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x == null);
Вот тестовый фрагмент кода, на котором вы можете попробовать это:
private int __counter = 0;
Task<string> Something()
{
return Task.Run(() =>
{
if (Interlocked.Increment(ref __counter) % 7 == 0)
{
throw new Exception("Blam!");
}
return $"Hello World {__counter}";
});
}
А затем выполните это:
IObservable<string> ioObs = Observable.FromAsync<string>(Something);
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x.EndsWith("19"));
Когда я подписываюсь, я получаю:
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 15
Hello World 16
Hello World 17
Hello World 18
Hello World 19
Обратите внимание, что 7
и 14
отсутствуют, потому что тогда было сгенерировано исключение.