Нужно ли сделать поля потокобезопасными при использовании async / await? - PullRequest
0 голосов
/ 06 декабря 2018

Иногда я сталкиваюсь с асинхронным / ожидающим кодом, который обращается к полям объекта.Например, этот фрагмент кода из проекта Stateless:

private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private bool _firing;

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
    if (_firing)
    {
        _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
        return;
    }

    try
    {
        _firing = true;

        await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

        while (_eventQueue.Count != 0)
        {
            var queuedEvent = _eventQueue.Dequeue();
            await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
        }
    }
    finally
    {
        _firing = false;
    }
}

Если я правильно понимаю, await **.ConfigureAwait(false) означает, что код, который выполняется после этого await, делает не обязательно должен выполняться в том же контексте.Таким образом, цикл while может быть выполнен в потоке ThreadPool.Я не вижу, что обеспечивает синхронизацию полей _firing и _eventQueue, например, что создает здесь блокировку / забор памяти / барьер?Итак, мой вопрос:мне нужно сделать поля поточно-ориентированными или что-то в структуре async / await позаботится об этом?

Редактировать: уточнить мой вопрос;в этом случае InternalFireQueuedAsync всегда должен вызываться в одном и том же потоке.В этом случае только продолжение может выполняться в другом потоке, что заставляет меня задуматься: нужны ли мне механизмы синхронизации (например, явный барьер), чтобы убедиться, что значения синхронизированы, чтобы избежать описанной здесь проблемы: http://www.albahari.com/threading/part4.aspx

Редактировать 2: есть также небольшая дискуссия без гражданства: https://github.com/dotnet-state-machine/stateless/issues/294

Ответы [ 4 ]

0 голосов
/ 29 января 2019

Я не вижу, что обеспечивает синхронизацию полей _firing и _eventQueue, например, что создает здесь блокировку / забор памяти / барьер?Итак, мой вопрос:нужно ли сделать поля поточно-ориентированными или что-то в структуре async / await позаботится об этом?

await обеспечит наличие всех необходимых барьеров памяти.Однако это не делает их «поточно-ориентированными».

, в этом случае InternalFireQueuedAsync всегда должен вызываться в одном потоке.

Тогда _firingхорошо, и не нужно volatile или что-то подобное.

Однако использование _eventQueue некорректно.Рассмотрим, что происходит, когда поток пула потоков возобновил код после await: вполне возможно, что Queue<T>.Count или Queue<T>.Dequeue() будет вызываться потоком пула потоков в то же время, когда Queue<T>.Enqueue вызывается основнымнить.Это не потокобезопасно.

Если основной поток, вызывающий InternalFireQueuedAsync, является потоком с однопоточным контекстом (например, потоком пользовательского интерфейса), то одним простым исправлением является удаление всех экземпляров ConfigureAwait(false)в этом методе.

0 голосов
/ 06 декабря 2018

ConfigureAwait(false) означает, что Context не захвачен для запуска продолжения.Использование контекста пула потоков не означает, что продолжения выполняются параллельно.Использование await до и внутри цикла while гарантирует, что код (продолжения) будет выполняться последовательно, поэтому нет необходимости блокировать в этом случае.Однако при проверке значения _firing вы можете иметь состояние гонки .

0 голосов
/ 06 декабря 2018

используйте lock или ConcurrentQueue.

решение с lock:

private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private bool _firing;
private object _eventQueueLock = new object();

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
if (_firing)
{
    lock(_eventQueueLock)
       _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
    return;
}

try
{
    _firing = true;

    await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

    lock(_eventQueueLock)
    while (_eventQueue.Count != 0)
    {
        var queuedEvent = _eventQueue.Dequeue();
        await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
    }
}


finally
{
    _firing = false;
}

}

решение с ConcurrentQueue:

private readonly ConccurentQueue<QueuedTrigger> _eventQueue = new ConccurentQueue<QueuedTrigger>();
private bool _firing;

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
if (_firing)
{
    _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
    return;
}

try
{
    _firing = true;

    await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

    lock(_eventQueueLock)
    while (_eventQueue.Count != 0)
    {
        object queuedEvent; // change object > expected type
        if(!_eventQueue.TryDequeue())
           continue;
        await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
    }
}


finally
{
    _firing = false;
}

}

0 голосов
/ 06 декабря 2018

Чтобы быть в безопасности, вы должны пометить поле _firing как volatile - это будет гарантировать барьер памяти и быть уверенным, что часть продолжения, которая может выполняться в другом потоке, прочитает правильное значение.Без volatile компилятор, CLR или JIT-компилятор или даже процессор могут выполнить некоторые оптимизации, которые приведут к тому, что код прочитает для него неправильное значение.

Что касается _eventQueue, вы неИзменить поле, поэтому пометить его как volatile бесполезно.Если только один поток вызывает «InternalFireQueuedAsync», вы не получаете к нему доступ одновременно из нескольких потоков, поэтому все в порядке.

Однако, если несколько потоков вызывают InternalFireQueuedAsync, вам нужно будет использовать ConcurrentQueueвместо этого или заблокируйте ваш доступ к _eventQueue.Тогда вам лучше также заблокировать доступ к _firing, или получить к нему доступ с помощью Interlocked, или заменить его на ManualResetEvent.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...