Делегат Observer OnError не вызывается несколько раз - PullRequest
0 голосов
/ 06 января 2019

Здравствуйте, я пытаюсь понять, почему OnError не вызывается каждый раз Observer. При попытке отладки OnError, вызванного внутри моей IOBservable реализации, отладчик проходит мимо.

Наблюдаемый реализация

public class Reactive:IStorage,IObservable<SquareResult>
        {
            private object @lock = new object();
            private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();


            public static Reactive Create()
            {
                return new Reactive();
            }

            public void Enqueue(SquareResult square)
            {
                lock (@lock)
                {
                    foreach (var item in observers)
                    {
                        if (square.Result < 0)
                        {
                            item.OnError(new InvalidSquareException());
                        }
                        else
                        item.OnNext(square);
                    }
                }
            }
            public void EndStoring()
            {
                this.observers.ForEach(obs => obs.OnCompleted());
            }

            public IDisposable Subscribe(IObserver<SquareResult> observer)
            {
                if (!this.observers.Contains(observer))
                {
                    this.observers.Add(observer);
                }
                return new Unsubscriber(observer, this.observers);
            }

            public class Unsubscriber : IDisposable
            {
                private List<IObserver<SquareResult>> observers;
                private IObserver<SquareResult> observer;

                public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
                {
                    this.observer = obs;
                    this.observers = observers;
                }

                public void Dispose()
                {
                    if (this.observer == null)
                    {
                        return;
                    }
                    this.observers.Remove(this.observer);
                }
            }
        }

Единственный метод, который вызывается производителем - это Enqueue.

Тогда у нас есть служба, в которой consumer берет данные из IObservable и отправляет их через сокет:

Потребитель

class ProgressService
    {
        private IStorage observable;
        public ProgressService(IStorage storage)
        {
            this.observable = storage;
        }

        public async Task NotifyAsync(WebSocket socket)
        {

            byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
            CancellationTokenSource cancelSignal = new CancellationTokenSource();
            this.observable.Subscribe(async (next) =>
            {

                try
                {
                    ReadOnlyMemory<byte> data = next.Encode();
                    await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
                }
                catch (Exception)
                {
                    if (socket.State == WebSocketState.Open)
                    {

                        await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
                    }
                    return;
                }
            }, async(ex) =>
            {
                 //!! this delegate does not get called everytime the observable calls OnError(exception)
                await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
            }, async () =>
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
            }, cancelSignal.Token);

            await socket.ReceiveAsync(buffer, CancellationToken.None);
            cancelSignal.Cancel();
        }


    }

Так что в основном происходит, если я пару раз звоню куда-нибудь myobservable.OnError(new exception()), обратный вызов consumer вызывается только один раз, и я не понимаю, почему.

Пример

observable.OnError(new Exception()); observable.OnError(new Exception());

Observer.OnError implmenetation

public void OnError(Exception ex)
{
  Console.WriteLine(ex.Message);
 }

Приведенный выше пример в моем коде напечатает сообщение об исключении только один раз.

1 Ответ

0 голосов
/ 06 января 2019

'Наблюдаемый контракт' допускает 0 или более OnNext уведомлений, за которыми следует одно OnCompleted или одно OnError уведомление, которое прекращает наблюдаемое. Как только наблюдаемое завершается (первым OnError в вашем случае), уведомления больше не отправляются, то есть делегаты больше не вызываются.

...