Отменяемый SemaphoreSlim - Вы бы предложили какие-либо улучшения для моей инкапсуляции? - PullRequest
0 голосов
/ 28 сентября 2018

Мне нужно по причине X написать инкапсуляцию семафора, которая позволила бы мне отменить все ожидающие процессы на моем SemaphoreSlim.(SemaphoreSlim Cancellation Encapsulation)

Есть мой класс:

public class CancellableSemaphoreSlim
{
    readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
    readonly SemaphoreSlim ss;

    /// <summary>
    /// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
    /// </summary>
    /// <param name="initialCount">Initial count.</param>
    public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }

    /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
    /// <returns>A task that will complete when the semaphore has been entered. </returns>
    /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
    /// <exception cref="T:System.OperationCanceledException" />
    public Task WaitAsync()
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        tokens.Enqueue(cancellationTokenSource);
        return ss.WaitAsync(cancellationTokenSource.Token);
    }

    /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
    /// <returns>A task that will complete when the semaphore has been entered. </returns>
    /// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
    /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
    /// <exception cref="T:System.OperationCanceledException">
    ///     <paramref name="cancellationTokenSource" /> was canceled. 
    /// </exception>
    public Task WaitAsync(CancellationToken cancellationToken)
    {
        CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        tokens.Enqueue(cancellationTokenSource);
        return ss.WaitAsync(cancellationTokenSource.Token);
    }

    /// <summary>
    /// Release this instance.
    /// </summary>
    /// <returns>The released semaphore return.</returns>
    public int Release() => ss.Release();

    /// <summary>
    /// Cancel all processus currently in WaitAsync() state.
    /// </summary>
    public void CancelAll()
    {
        while (tokens.Count > 0)
        {
            CancellationTokenSource token = tokens.Dequeue();
            if (!token.IsCancellationRequested)
                token.Cancel();
        }
    }
}

Вы можете использовать его как базовый SemaphoreSlim, я написал простой пример:

class Program
{
    static void Main(string[] args)
    {
        AsyncContext.Run(() => MainAsync(args));
    }

    static async void MainAsync(string[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            try
            {
                CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(10000);
                await Task.WhenAll(
                    MakeAnAction(i, cancellationTokenSource),
                    MakeAnAction(i, cancellationTokenSource),
                    MakeAnAction(i, cancellationTokenSource),
                    MakeAnAction(i, cancellationTokenSource),
                    MakeAnAction(i, cancellationTokenSource)
                    );
            }
            catch (OperationCanceledException) { }
        }
        await Task.Delay(5000);
        cancellableSemaphoreSlim.CancelAll();
        await Task.Delay(5000);
    }

    readonly static CancellableSemaphoreSlim cancellableSemaphoreSlim = new CancellableSemaphoreSlim(1);
    readonly static Random rnd = new Random();

    internal static async Task MakeAnAction(int id, CancellationTokenSource cancellationTokenSource)
    {
        try
        {
            await cancellableSemaphoreSlim.WaitAsync(cancellationTokenSource.Token);
            int actionTime = rnd.Next(2, 10) * 1000;
            Output($"{id} : Start ({actionTime})");
            await Task.Delay(actionTime, cancellationTokenSource.Token);
            Output($"{id} : OK ({actionTime})");
        }
        catch (OperationCanceledException)
        {
            Output($"{id} : Cancelled");
        }
        finally
        {
            cancellableSemaphoreSlim.Release();
        }
    }

    private static void Output(string str)
    {
        Debug.WriteLine(str);
        Console.WriteLine(str);
    }
}

Однако мне было интересно, может ли использование Queue<CancellationTokenSource> создать какую-либо асинхронную проблему?Потому что, если у нас есть метод (наподобие makeAnAction), который может вызываться различными потоками / задачами, если CancelAll () вызывается перед новым вызовом задачи / потока makeAnAction, это означает, что этот будет добавленв очередь, которая фактически снимает все свои предметы с очереди ..

Я так думал о попытке создать уникальную связь между всеми моими токенами отмены, используя CancellationTokenSource.CreateLinkedTokenSource(cancellationToken).Однако, даже если это логика varargs (params), создаст ли она ту же проблему?

Я просто пытаюсь достичь ее так, чтобы она не провалилась, но, полагаю, я простов данный момент у меня плохой подход, поэтому я просто хотел бы знать, может ли кто-нибудь дать мне точку зрения об этой инкапсуляции и ее логике?

Не стесняйтесь давать мне какие-либо советы, если вы считаете, что что-то не таклогика:)

Макс

Редактировать 1

Затем я отредактировал код, чтобы следить за обсуждением с @NthDeveloper.Я пытался добавить систему lock

public class CancellableSemaphoreSlim
{
    object _syncObj = new object();
    readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
    readonly SemaphoreSlim ss;

    /// <summary>
    /// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
    /// </summary>
    /// <param name="initialCount">Initial count.</param>
    public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }

    /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
    /// <returns>A task that will complete when the semaphore has been entered. </returns>
    /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
    /// <exception cref="T:System.OperationCanceledException" />
    public Task WaitAsync()
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        lock (_syncObj)
        {
            tokens.Enqueue(cancellationTokenSource);
        }
        return ss.WaitAsync(cancellationTokenSource.Token);
    }

    /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
    /// <returns>A task that will complete when the semaphore has been entered. </returns>
    /// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
    /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
    /// <exception cref="T:System.OperationCanceledException">
    ///     <paramref name="cancellationTokenSource" /> was canceled. 
    /// </exception>
    public Task WaitAsync(CancellationToken cancellationToken)
    {
        CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        lock (_syncObj)
        {
            tokens.Enqueue(cancellationTokenSource);
        }
        return ss.WaitAsync(cancellationTokenSource.Token);
    }

    /// <summary>
    /// Release this instance.
    /// </summary>
    /// <returns>The released semaphore return.</returns>
    public int Release() => ss.Release();

    /// <summary>
    /// Cancel all processus currently in WaitAsync() state.
    /// </summary>
    public void CancelAll()
    {
        lock (_syncObj)
        {
            while (tokens.Count > 0)
            {
                CancellationTokenSource token = tokens.Dequeue();
                if (!token.IsCancellationRequested)
                    token.Cancel();
            }
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Я думаю, что вы можете упростить код, используя только один CancellationSource, который запускается и заменяется на новый в CancelAll:

public class CancellableSemaphoreSlim
{
    CancellationTokenSource cancelSource = new CancellationTokenSource();
    readonly SemaphoreSlim ss;

    public CancellableSemaphoreSlim(int initialCount) 
    { 
        ss = new SemaphoreSlim(initialCount); 
    }

    public Task WaitAsync() => ss.WaitAsync(cancelSource.Token);

    public Task WaitAsync(CancellationToken cancellationToken)
    {
        // This operation will cancel when either the user token or our cancelSource signal cancellation
        CancellationTokenSource linkedSource =  CancellationTokenSource.CreateLinkedTokenSource(cancelSource.Token, cancellationToken);
        return ss.WaitAsync(linkedSource.Token);
    }

    public int Release() => ss.Release();

    public void CancelAll()
    {
        var currentCancelSource = Interlocked.Exchange(ref cancelSource, new CancellationTokenSource());
        currentCancelSource.Cancel();
    }
}

Всегда будет гонкасвоего рода, чтобы определить, будет ли WaitAsync отменен вызовом к CancelAll, работающему в то же время.

В этой версии просто до того, будет ли старый или новый cancelSource.Token захвачен в WaitAsync().

0 голосов
/ 28 сентября 2018

Пример потокаобезопасного класса, который защищает свой внутренний список от одновременных изменений, а также предотвращает использование класса после его удаления.

public class SampleThreadSafeDisposableClass: IDisposable
{
    bool _isDisposed;
    object _syncObj = new object();

    List<object> _list = new List<object>();

    public void Add(object obj)
    {
        lock(_syncObj)
        {
            if (_isDisposed)
                return;

            _list.Add(obj);
        }
    }       

    //This method can be Dispose/Clear/CancelAll
    public void Dispose()
    {
        lock (_syncObj)
        {
            if (_isDisposed)
                return;

            _isDisposed = true;

            _list.Clear();
        }
    }
}

Надеюсь, это поможет.

...