Как обрабатывать исключение и повторять с помощью Rx - PullRequest
0 голосов
/ 21 июня 2020

Я выполняю операцию IO, используя Observable.FromAsync. Я хочу повторять это вечно. Я не понимаю, как обрабатывать исключения, что-то делать с ними, а затем go обратно в свой l oop :

Что я пробовал:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);

Теперь я не понимаю, почему моя наблюдаемая заканчивается после первого исключения. Я не говорю, чтобы она продолжалась.

Что я хочу сделать:

  • выполнить действие ввода-вывода
  • если он выбрасывает, вернуть какое-то настраиваемое значение
  • повторить l oop

Если бы мой наблюдаемый объект был перечислимым, я бы хотел такого поведения:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}

Как мне продолжить выполнение Repeat, даже если OnError сработало?

Как Observable.Catch и Observable.Throw должны сочетаться с Observable.Repeat?

Ответы [ 2 ]

3 голосов
/ 26 июня 2020

Когда у вас есть IObservable<string> ioObs = Observable.FromAsync<string>(Something);, у вас есть наблюдаемый, который может либо возвращать значение, а затем завершать ({OnNext} {OnCompleted}), либо у вас есть объект, который генерирует исключение ({OnError}).

Очень просто разрешить источнику повторное выполнение после возврата значений или ошибок.

IObservable<string> query = ioObs.Retry().Repeat();

.Retry() говорит: «go снова, если вы получите ошибку». .Repeat() говорит: «Подпишитесь снова, если наблюдаемый завершится».

Теперь это немного опасно, поскольку вы создали наблюдаемый, который будет выполняться непрерывно. Вам нужно найти способ остановить это.

Возможны следующие варианты:

  • Отказ от подписки
  • Возьмите определенное количество значений (например, .Take(n) )
  • Установите .Timeout.
  • Или используйте TakeUntil

Последний подходит, если исходный ioObs возвращает пустую или пустую строку по завершении.

Вы можете сделать что-то вроде этого:

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x == null);
        

Вот тестовый фрагмент кода, на котором вы можете попробовать это:

private int __counter = 0;
Task<string> Something()
{
    return Task.Run(() =>
    {
        if (Interlocked.Increment(ref __counter) % 7 == 0)
        {
            throw new Exception("Blam!");
        }
        return $"Hello World {__counter}";
    });
}

А затем выполните это:

IObservable<string> ioObs = Observable.FromAsync<string>(Something);

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x.EndsWith("19"));
        

Когда я подписываюсь, я получаю:

Hello World 1 
Hello World 2 
Hello World 3 
Hello World 4 
Hello World 5 
Hello World 6 
Hello World 8 
Hello World 9 
Hello World 10 
Hello World 11 
Hello World 12 
Hello World 13 
Hello World 15 
Hello World 16 
Hello World 17 
Hello World 18 
Hello World 19 

Обратите внимание, что 7 и 14 отсутствуют, потому что тогда было сгенерировано исключение.

1 голос
/ 21 июня 2020

Наблюдаемое считается мертвым после создания исключения. То же, что и при обычном выполнении метода. Когда создается исключение, оно обрабатывается в том месте, где находится блок try - catch. Невозможно «go вернуться» туда, где изначально было сгенерировано исключение, и продолжить оттуда. То же, что и для наблюдаемых.

Методы расширения Observable.Catch() могут использоваться для перехвата такого исключения (из уже мертвого наблюдаемого) и вместо этого продолжить с новым наблюдаемым. Перегрузка Observable.Catch(IObservable<TSource>), которую вы используете, использует параметр params для используемых объектов IObservable. Когда первый из них выходит из строя за исключением, переходите ко второму. Если это не удается, переходите к третьему и так далее. Однако вы предоставили только одну наблюдаемую с Observable.Catch(ioObs), поэтому не к чему переходить, когда первая наблюдаемая выходит из строя.

Это может быть возможно, если вы используете другую перегрузку , которая ожидает перечисление подписок для использования. Когда один не работает, будет использован следующий. См. Следующий пример:

public static int requestCounter = 0;
public static Task<string> SomeSourceMethod() {
    Random r = new Random();
    Thread.Sleep(50);
    if (r.NextDouble() < 0.5) {
        Console.Write("E ");
        throw new Exception();
    }
    return Task.FromResult($"test {requestCounter++}");
}
    
public static IEnumerable<IObservable<string>> ObservableEnumerable() {
    for (;;) {
        IObservable<string> foo = Observable.FromAsync<string>(SomeSourceMethod);
        IObservable<string> fooRepeated = foo.Repeat();
        yield return fooRepeated;
    }
}

static void Main(string[] args)
{
    IObservable<string> endless = ObservableEnumerable().Catch();
    IDisposable subscription = endless
        .SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(it => Console.WriteLine(it));

    Thread.Sleep(1000);
    Console.WriteLine("Terminating the subscription");
    subscription.Dispose();
}

Это может привести к примерно следующему выводу:

E test 1
E E E E test 2
test 3
test 4
test 5
test 6
E E test 7
test 8
E test 9
Terminating the subscription
E E E E E E E E E E E E E E E E E E E 

Однако это не сработало, так как мне пришлось жестко убить процесс, чтобы остановить. Это вроде как (?) Работает, но я не уверен, что это правильный способ (возможно, нет).

...