Асинхронный процессор задач - PullRequest
0 голосов
/ 24 апреля 2020

Я занимаюсь разработкой асинхронного процессора задач. Мне нужен высокопроизводительный процессор, поэтому используемые примитивы синхронизации должны быть как можно более низкого уровня. Процессор должен содержать поток, который спит, когда нет задач, и просыпается, когда задачи появляются. Обработка задач и добавление задач должны выполняться в разных потоках.

Я попытался реализовать с AutoResetEvent, но он имеет условие гонки:

public class Processor
{
    ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
    AutoResetEvent _newWorkItemAutoResetEvent = new AutoResetEvent(false);
    private bool _disposed;
    Thread _thread;

    public void Do(Action action)
    {
        _workItemQueue.Enqueue(action);
        _newWorkItemAutoResetEvent.Set();
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_disposed)
            {
                _newWorkItemAutoResetEvent.WaitOne(); // 
                while (_workItemQueue.TryDequeue(out Action action))
                {
                    action();
                }
                // at this "bad" moment another thread calls Do method. 
                // New action has been enqueued, but when we call
                // _newWorkIteManualAutoEvent.WaitOne() we fall asleep.
            }
        });
        _thread.Start();
    }
}

Затем я попытался реализовать с ManualResetEvent:

public class Processor
{
    ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
    ManualResetEventSlim _newWorkItemManualResetEvent = new ManualResetEventSlim(false);
    private bool _disposed;
    Thread _thread;

    public void Do(Action action)
    {
        _workItemQueue.Enqueue(action);
        _newWorkItemManualResetEvent.Set();
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_disposed)
            {
                _newWorkItemManualResetEvent.WaitOne();
                _newWorkItemManualResetEvent.Reset();

                while (_workItemQueue.TryDequeue(out Action action))
                {
                    action();
                }
            }
        });
        _thread.Start();
    }
}

Я не Я не вижу никаких условий гонки в реализации с ManualResetEvent.

ВОПРОС: Я прав? Или мне нужен другой примитив синхронизации? Я думаю о графе up Событие (реверс CountdownEvent). Он сигнализируется, когда его счет больше нуля, и не сигнализируется, когда его счет равен нулю. Count up Счет событий соответствует количеству задач, которые должны быть выполнены.

1 Ответ

1 голос
/ 24 апреля 2020

Удобная BlockingCollection справится с большей частью этого за вас.

Что-то вроде:

public sealed class Processor : IDisposable
{
    //set a max queue depth to provide back pressure to the request rate
    BlockingCollection<Action> _workItemQueue = new BlockingCollection<Action>(32);
    private bool _disposed = false;
    private Thread _workerThread;
    private CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();

    public void Do(Action action)
    {
        _workItemQueue.Add(action);
    }

    public void Dispose()
    {
        if (!_disposed)
        {
            _workItemQueue.CompleteAdding();
            _cancelTokenSource.Cancel();
            _disposed = true;
        }
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_workItemQueue.IsCompleted)
            {
                if (_workItemQueue.TryTake(out Action action, 1000*2,_cancelTokenSource.Token))
                {
                    action();
                }
            }

        });

        _workerThread.Start();
    }
}
...