Безопасный поток SendAsync / ожидаемый FIFO - PullRequest
0 голосов
/ 03 февраля 2019

Итак, я делаю потокобезопасную оболочку для System.Net.WebSockets.ClientWebSocket.

Что вы думаете о следующем методе, который я придумал, он безопасен для потоков?Важно то, что только 1 поток одновременно выполняет _ws.SendAsync и что ожидающие потоки будут выполняться в порядке FIFO.

Альтернативное решение состоит в том, чтобы вместо этого использовать BlockingCollection и шаблон производитель-потребитель.Но BlockingCollection не имеет функции async-await, поэтому я бы предпочел текущее решение, если оно поточно-ориентированное.Я очень благодарен за другие предложения.

private ClientWebSocket _ws;
private int _sendTicketCount = 0;
private int _sendTicketCurrent = 1;

public async Task SendAsync(string message)
{
    if (_ws.State != WebSocketState.Open)
        throw new Exception("Connection is not open");

    try
    {
        int mySendTicket = Interlocked.Increment(ref _sendTicketCount);

        if (mySendTicket != _sendTicketCurrent)
        {
            await Task.Run(() => SpinWait.SpinUntil(() => mySendTicket == _sendTicketCurrent)).ConfigureAwait(false);
        }

        var cancel = new CancellationTokenSource(5000);
        var bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
        await _ws.SendAsync(bytes, WebSocketMessageType.Text, true, cancel.Token).ConfigureAwait(false);
    }
    catch(Exception ex)
    {
        throw ex;
    }
    finally
    {
        Interlocked.Increment(ref _sendTicketCurrent);
    }
}

Ответы [ 2 ]

0 голосов
/ 03 февраля 2019

У вас есть несколько производителей и один потребитель.

Это можно сделать с помощью Class"> ActionBlock , настроенного с соответствующей опцией.

0 голосов
/ 03 февраля 2019

_sendTicketCurrent должен быть изменчивым, в противном случае JIT разрешено кэшировать значение в регистре, что может привести к бесконечному циклу.Кроме того, он выглядит поточно-безопасным, за исключением того, что это огромная трата процессора (SpinWait полностью использует ядро).

Также нет смысла делать await Task.Run(() => SpinWait.SpinUntil(() => mySendTicket == _sendTicketCurrent)).Вы все равно будете удерживать поток, так что вы можете сделать ожидание в текущем потоке.

Чтобы сделать то, что вы хотите, более оптимизированным способом, уже есть асинхронно-совместимый примитив синхронизации, предоставленный изкоробка: SemaphoreSlim.

private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

public async Task SendAsync(string message)
{
    if (_ws.State != WebSocketState.Open)
        throw new Exception("Connection is not open");

    try
    {
        await _semaphore.WaitAsync().ConfigureAwait(false);

        var cancel = new CancellationTokenSource(5000);
        var bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
        await _ws.SendAsync(bytes, WebSocketMessageType.Text, true, cancel.Token).ConfigureAwait(false);
    }
    finally
    {
        _semaphore.Release();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...