Остановка одновременного запуска двух асинхронных методов с другим - PullRequest
3 голосов
/ 13 апреля 2019

У меня есть две асинхронные функции, которые я буду называть ChangeState() и DoThing().Каждый из них ожидает последующие асинхронные методы.Они вызываются из обработчиков событий, поэтому они не будут блокировать любой другой код во время выполнения.Если вызывается ChangeState(), обязательно, чтобы DoThing() не делал свое дело до тех пор, пока не завершится предыдущий ChangeState().ChangeState() можно вызвать снова, пока он еще выполняется.Любое выполнение, начатое до DoThing(), должно быть завершено до того, как DoThing() может продолжиться.

Обратное также верно;ChangeState() должен дождаться завершения любого ранее запущенного DoStuff().

Как я могу реализовать это без опасности тупиков?

Я знаю, что ожидания не допускаются внутри операторов блокировки, и этопо понятным причинам, поэтому я не пытаюсь воссоздать эту функциональность.

async void ChangeState(bool state)
{
 //Wait here until any pending DoStuff() is complete.
 await OutsideApi.ChangeState(state);
}

async void DoStuff()
{
 //Wait here until any pending ChangeState() is complete.
 await OutsideApi.DoStuff();
}

Ответы [ 5 ]

2 голосов
/ 13 апреля 2019

По вашим требованиям может помочь что-то вроде ReaderWriterLock.Кроме того, поскольку у вас есть async методы, вы должны использовать async lock.К сожалению, не существует готовой блокировки ReaderWriterLock, предоставляемой самой платформой .NET.К счастью, вы можете взглянуть на библиотеку AsyncEx или на эту статью .Пример использования AsyncEx.

var readerWriterLock = new AsyncReaderWriterLock();

async void ChangeState(bool state)
{
    using(await readerWriterLock.ReaderLockAsync())
    {
        await OutsideApi.ChangeState(state);
    }
}

async void DoStuff()
{
    using(await readerWriterLock.WriterLockAsync())
    {
        await OutsideApi.DoStuff();
    }
}

nb Это решение по-прежнему имеет ограничение, заключающееся в том, что вызовы DoStuff не могут быть одновременными, блокировка записи, но все же порядок вызовов итребование завершить все DoStuff до ChangeState и наоборот будет выполнено (совет от @Scott Chamberlain об использовании блокировки чтения и записи)

0 голосов
/ 14 апреля 2019

Это похоже на пару ReaderWriterLockSlim с.

private readonly ReaderWriterLockSlim changeStateLock = new ReaderWriterLockSlim();
private readonly ReaderWriterLockSlim doStuffLock = new ReaderWriterLockSlim();

Один контролирует доступ к ChangeState, а другой контролирует доступ к DoStuff.

Блокировка считывателя используется для оповещения о том, что метод выполняется, а блокировка записывающего устройства используется для оповещения о том, что выполняется другой метод.ReaderWriterLockSlim допускает многократное чтение, но запись является исключительной.

Task.Yield просто для того, чтобы вернуть управление вызывающей стороне, потому что ReaderWriterLockSlim блокирует ар.

async Task ChangeState(bool state)
{
    await Task.Yield();

    doStuffLock.EnterWriteLock();

    try
    {
        changeStateLock.EnterReadLock();

        try
        {
            await OutsideApi.ChangeState(state);
        }
        finally
        {
            changeStateLock.ExitReadLock();
        }
    }
    finally
    {
        doStuffLock.ExitWriteLock();
    }
}

async Task DoStuff()
{
    await Task.Yield();

    changeStateLock.EnterWriteLock();

    try
    {
        doStuffLock.EnterReadLock();

        try
        {
            await OutsideApi.DoStuff();
        }
        finally
        {
            doStuffLock.ExitReadLock();
        }
    }
    finally
    {
        changeStateLock.ExitWriteLock();
    }
}
0 голосов
/ 14 апреля 2019

Для практики я создал примитив синхронизации с именем KeyedLock, который допускает одновременные асинхронные операции только с одним ключом за раз. Все остальные ключи ставятся в очередь и разблокируются позже партиями (ключом). Класс предназначен для использования следующим образом:

KeyedLock _keyedLock;

async Task ChangeState(bool state)
{
    using (await this._keyedLock.LockAsync("ChangeState"))
    {
        await OutsideApi.ChangeState(state);
    }
}

async Task DoStuff()
{
    using (await this._keyedLock.LockAsync("DoStuff"))
    {
        await OutsideApi.DoStuff();
    }
}

Например, звонки ниже:

await ChangeState(true);
await DoStuff();
await DoStuff();
await ChangeState(false);
await DoStuff();
await ChangeState(true);

... будет выполнено в следующем порядке:

ChangeState(true);
ChangeState(false); // concurrently with the above
ChangeState(true); // concurrently with the above
DoStuff(); // after completion of the above
DoStuff(); // concurrently with the above
DoStuff(); // concurrently with the above

Класс KeyedLock:

class KeyedLock
{
    private object _currentKey;
    private int _currentCount = 0;
    private WaitingQueue _waitingQueue = new WaitingQueue();
    private readonly object _locker = new object();

    public Task WaitAsync(object key, CancellationToken cancellationToken)
    {
        if (key == null) throw new ArgumentNullException(nameof(key));
        lock (_locker)
        {
            if (_currentKey != null && key != _currentKey)
            {
                var waiter = new TaskCompletionSource<bool>();
                _waitingQueue.Enqueue(new KeyValuePair<object,
                    TaskCompletionSource<bool>>(key, waiter));
                if (cancellationToken != null)
                {
                    cancellationToken.Register(() => waiter.TrySetCanceled());
                }
                return waiter.Task;
            }
            else
            {
                _currentKey = key;
                _currentCount++;
                return cancellationToken.IsCancellationRequested ?
                    Task.FromCanceled(cancellationToken) : Task.FromResult(true);
            }
        }
    }
    public Task WaitAsync(object key) => WaitAsync(key, CancellationToken.None);

    public void Release()
    {
        List<TaskCompletionSource<bool>> tasksToRelease;
        lock (_locker)
        {
            if (_currentCount <= 0) throw new InvalidOperationException();
            _currentCount--;
            if (_currentCount > 0) return;
            _currentKey = null;
            if (_waitingQueue.Count == 0) return;
            var newWaitingQueue = new WaitingQueue();
            tasksToRelease = new List<TaskCompletionSource<bool>>();
            foreach (var entry in _waitingQueue)
            {
                if (_currentKey == null || entry.Key == _currentKey)
                {
                    _currentKey = entry.Key;
                    _currentCount++;
                    tasksToRelease.Add(entry.Value);
                }
                else
                {
                    newWaitingQueue.Enqueue(entry);
                }
            }
            _waitingQueue = newWaitingQueue;
        }
        foreach (var item in tasksToRelease)
        {
            item.TrySetResult(true);
        }
    }
    private class WaitingQueue :
        Queue<KeyValuePair<object, TaskCompletionSource<bool>>>
    { }

    public Task<Releaser> LockAsync(object key,
        CancellationToken cancellationToken)
    {
        var waitTask = this.WaitAsync(key, cancellationToken);
        return waitTask.ContinueWith(
            (_, state) => new Releaser((KeyedLock)state),
            this, cancellationToken,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default
        );
    }
    public Task<Releaser> LockAsync(object key)
        => LockAsync(key, CancellationToken.None);

    public struct Releaser : IDisposable
    {
        private readonly KeyedLock _parent;
        internal Releaser(KeyedLock parent) { _parent = parent; }
        public void Dispose() { _parent?.Release(); }
    }

}
0 голосов
/ 13 апреля 2019

Вы можете использовать ManualResetEvent или AutoResetEvent, чтобы сигнализировать о завершении потока, чтобы другой поток мог продолжить работу.

Некоторые образцы можно найти здесь и здесь :

0 голосов
/ 13 апреля 2019

РЕДАКТИРОВАТЬ: первое решение не отвечало требованиям.

  1. Создать пользовательский класс блокировки.
    Этот класс отслеживает, сколько экземпляров запущено из какого типа (ChangeState и DoThing) и предоставляет способ проверить, может ли задача выполняться.
        public class CustomLock
        {
            private readonly int[] Running;
            private readonly object _lock;
            public CustomLock(int Count)
            {
                Running = new int[Count];
                _lock = new object();
            }

            public void LockOne(int Task)
            {
                lock (_lock)
                {
                    Running[Task]++;
                }
            }

            public void UnlockOne(int Task)
            {
                lock (_lock)
                {
                    Running[Task]--;
                }
            }

            public bool Locked(int Task)
            {
                lock (_lock)
                {
                    for (int i = 0; i < Running.Length; i++)
                    {
                        if (i != Task && Running[i] != 0)
                            return true;
                    }
                    return false;
                }
            }
        }
Измените уже существующий код.
ChangeState будет задачей 0, а DoStuff будет задачей 1.
private CustomLock Lock = new CustomLock(2); //Create a new instance of the class for 2 tasks

async Task ChangeState(bool state)
{
   while (Lock.Locked(0)) //Wait for the task to get unlocked
      await Task.Delay(10);
   Lock.LockOne(0); //Lock this task
   await OutsideApi.ChangeState(state);
   Lock.UnlockOne(0); //Task finished, unlock one
}

async Task DoStuff()
{
   while (Lock.Locked(1))
      await Task.Delay(10);
   Lock.LockOne(1);
   await OutsideApi.DoStuff();
   Lock.UnlockOne(1);
}

Во время работы любого ChangeState новый можно запустить без ожиданияно когда вызывается DoStuff, он будет ждать до тех пор, пока все ChangeStates не завершат работу, и это работает и в другом направлении.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...