Как отменить подписку от собственного делегата без тупика? - PullRequest
0 голосов
/ 02 января 2019

Если вы имеете дело с записью на websockets внутри onNext делегата IObservable, как я могу прекратить всю подписку, если socket выдает исключение? Делегат onError обрабатывает ошибки, поступающие от поставщика, а не то, что происходит, когда потребляет эти elements.

код

class ProgressService
    {
        private IStorage storage;   
        public ProgressService(IStorage storage)
        {
            this.storage = storage;
        }
        public async Task NotifyAsync(WebSocket socket)
        {
            var buffer = ArrayPool<byte>.Shared.Rent(1024);
            CancellationTokenSource source = new CancellationTokenSource();

             this.storage.Subscribe(async (onNext) =>
            {
                try
                {
                    await socket.SendAsync(onNext.Encode(), WebSocketMessageType.Text, true, CancellationToken.None);
                }
                catch (Exception)
                {
                    if (socket.State == WebSocketState.Open)
                    {
                        await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Error on socket", CancellationToken.None);
                    }
                    /*i want to dispose the subscription here but i do not think 
                        if it is safe te terminate resource from within  (deadlock condition ? )
                        .Dispose()*/
                }
            }, (ex) => { Console.WriteLine("provider error"); },
            async () =>
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Completed task", CancellationToken.None);
            }
            , source.Token);

            await socket.ReceiveAsync(buffer, CancellationToken.None);
            source.Cancel();
        }


    }

Наблюдаемые

interface IStorage:IObservable<Square>
    {
         void Enqueue(Square square);
    }

P.S
Как видите, я хочу уничтожить подписку, как только Websocket отключится. Кроме того, сигнал для уничтожения subscription после того, как делегат OnCompleted был уволен ... поступит извне (CancellationTokenSource), и это происходит после первого Socket.ReceiveAsync (так что это должно быть обработано также на главном нить).

...