Здравствуйте, я пытаюсь понять, почему OnError
не вызывается каждый раз Observer
.
При попытке отладки OnError
, вызванного внутри моей IOBservable
реализации, отладчик проходит мимо.
Наблюдаемый реализация
public class Reactive:IStorage,IObservable<SquareResult>
{
private object @lock = new object();
private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();
public static Reactive Create()
{
return new Reactive();
}
public void Enqueue(SquareResult square)
{
lock (@lock)
{
foreach (var item in observers)
{
if (square.Result < 0)
{
item.OnError(new InvalidSquareException());
}
else
item.OnNext(square);
}
}
}
public void EndStoring()
{
this.observers.ForEach(obs => obs.OnCompleted());
}
public IDisposable Subscribe(IObserver<SquareResult> observer)
{
if (!this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return new Unsubscriber(observer, this.observers);
}
public class Unsubscriber : IDisposable
{
private List<IObserver<SquareResult>> observers;
private IObserver<SquareResult> observer;
public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
{
this.observer = obs;
this.observers = observers;
}
public void Dispose()
{
if (this.observer == null)
{
return;
}
this.observers.Remove(this.observer);
}
}
}
Единственный метод, который вызывается производителем - это Enqueue
.
Тогда у нас есть служба, в которой consumer
берет данные из IObservable
и отправляет их через сокет:
Потребитель
class ProgressService
{
private IStorage observable;
public ProgressService(IStorage storage)
{
this.observable = storage;
}
public async Task NotifyAsync(WebSocket socket)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
CancellationTokenSource cancelSignal = new CancellationTokenSource();
this.observable.Subscribe(async (next) =>
{
try
{
ReadOnlyMemory<byte> data = next.Encode();
await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
}
return;
}
}, async(ex) =>
{
//!! this delegate does not get called everytime the observable calls OnError(exception)
await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
}, async () =>
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
}, cancelSignal.Token);
await socket.ReceiveAsync(buffer, CancellationToken.None);
cancelSignal.Cancel();
}
}
Так что в основном происходит, если я пару раз звоню куда-нибудь myobservable.OnError(new exception())
, обратный вызов consumer
вызывается только один раз, и я не понимаю, почему.
Пример
observable.OnError(new Exception());
observable.OnError(new Exception());
Observer.OnError implmenetation
public void OnError(Exception ex)
{
Console.WriteLine(ex.Message);
}
Приведенный выше пример в моем коде напечатает сообщение об исключении только один раз.