Если вы имеете дело с записью на websockets
внутри onNext
делегата IObservable
, как я могу прекратить всю подписку, если socket
выдает исключение?
Делегат onError
обрабатывает ошибки, поступающие от поставщика, а не то, что происходит, когда потребляет эти elements
.
код
class ProgressService
{
private IStorage storage;
public ProgressService(IStorage storage)
{
this.storage = storage;
}
public async Task NotifyAsync(WebSocket socket)
{
var buffer = ArrayPool<byte>.Shared.Rent(1024);
CancellationTokenSource source = new CancellationTokenSource();
this.storage.Subscribe(async (onNext) =>
{
try
{
await socket.SendAsync(onNext.Encode(), WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Error on socket", CancellationToken.None);
}
/*i want to dispose the subscription here but i do not think
if it is safe te terminate resource from within (deadlock condition ? )
.Dispose()*/
}
}, (ex) => { Console.WriteLine("provider error"); },
async () =>
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Completed task", CancellationToken.None);
}
, source.Token);
await socket.ReceiveAsync(buffer, CancellationToken.None);
source.Cancel();
}
}
Наблюдаемые
interface IStorage:IObservable<Square>
{
void Enqueue(Square square);
}
P.S
Как видите, я хочу уничтожить подписку, как только Websocket
отключится.
Кроме того, сигнал для уничтожения subscription
после того, как делегат OnCompleted
был уволен ... поступит извне (CancellationTokenSource
), и это происходит после первого Socket.ReceiveAsync
(так что это должно быть обработано также на главном нить).