C # одновременный: это хорошая идея, чтобы использовать много AutoResetEvent? - PullRequest
3 голосов
/ 23 апреля 2019

Предположим, что есть много потоков, вызывающих Do(), и только один рабочий поток обрабатывает фактическое задание.

void Do(Job job)
{
    concurrentQueue.Enqueue(job);
    // wait for job done
}

void workerThread()
{
    while (true)
    {
        Job job;
        if (concurrentQueue.TryDequeue(out job))
        {
            // do job
        }
    }
}

Функция Do () должна подождать, пока работа не будет выполнена, прежде чем вернуться.Поэтому я написал следующий код:

class Task 
{
    public Job job;
    public AutoResetEvent ev;
}

void Do(Job job)
{
    using (var ev = new AutoResetEvent(false))
    {
        concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
        ev.WaitOne();
    }
}

void workerThread()
{
    while (true)
    {
        Task task;
        if (concurrentQueue.TryDequeue(out task))
        {
            // do job
            task.ev.Set();
        }
    }
}

После некоторых испытаний я обнаружил, что он работает, как и ожидалось.Однако я не уверен, что это хороший способ выделить много объектов AutoResetEvents или есть лучший способ сделать это?

Ответы [ 3 ]

2 голосов
/ 23 апреля 2019

Поскольку все клиенты должны ждать выполнения одного потока, нет необходимости использовать очередь.Поэтому я предлагаю вместо этого использовать класс Monitor и, в частности, функциональность Wait / Pulse .Это немного низкий уровень и подробный.

class Worker<TResult> : IDisposable
{
    private readonly object _outerLock = new object();
    private readonly object _innerLock = new object();
    private Func<TResult> _currentJob;
    private TResult _currentResult;
    private Exception _currentException;
    private bool _disposed;

    public Worker()
    {
        var thread = new Thread(MainLoop);
        thread.IsBackground = true;
        thread.Start();
    }

    private void MainLoop()
    {
        lock (_innerLock)
        {
            while (true)
            {
                Monitor.Wait(_innerLock); // Wait for client requests
                if (_disposed) break;
                try
                {
                    _currentResult = _currentJob.Invoke();
                    _currentException = null;
                }
                catch (Exception ex)
                {
                    _currentException = ex;
                    _currentResult = default;
                }
                Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
            }
        } // We are done
    }

    public TResult DoWork(Func<TResult> job)
    {
        TResult result;
        Exception exception;
        lock (_outerLock) // Accept only one client at a time
        {
            lock (_innerLock) // Acquire inner lock
            {
                if (_disposed) throw new InvalidOperationException();
                _currentJob = job;
                Monitor.Pulse(_innerLock); // Notify worker thread about the new job
                Monitor.Wait(_innerLock); // Wait for worker thread to process the job
                result = _currentResult;
                exception = _currentException;
                // Clean up
                _currentJob = null;
                _currentResult = default;
                _currentException = null;
            }
        }
        // Throw the exception, if occurred, preserving the stack trace
        if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
        return result;
    }

    public void Dispose()
    {
        lock (_outerLock)
        {
            lock (_innerLock)
            {
                _disposed = true;
                Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
            }
        }
    }
}

Пример использования:

var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();

Вывод:

Result: 1

Обновление: Предыдущее решение не является благоприятным для ожидания, так что вот то, которое позволяет правильно ожидать.Он использует TaskCompletionSource для каждого задания, сохраненный в BlockingCollection.

class Worker<TResult> : IDisposable
{
    private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
        = new BlockingCollection<TaskCompletionSource<TResult>>();

    public Worker()
    {
        var thread = new Thread(MainLoop);
        thread.IsBackground = true;
        thread.Start();
    }

    private void MainLoop()
    {
        foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
        {
            var job = (Func<TResult>)tcs.Task.AsyncState;
            try
            {
                var result = job.Invoke();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    }

    public Task<TResult> DoWorkAsync(Func<TResult> job)
    {
        var tcs = new TaskCompletionSource<TResult>(job,
            TaskCreationOptions.RunContinuationsAsynchronously);
        _blockingCollection.Add(tcs);
        return tcs.Task;
    }

    public TResult DoWork(Func<TResult> job) // Synchronous call
    {
        var task = DoWorkAsync(job);
        try { task.Wait(); } catch { } // Swallow the AggregateException
        // Throw the original exception, if occurred, preserving the stack trace
        if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
        return task.Result;
    }

    public void Dispose()
    {
        _blockingCollection.CompleteAdding();
    }
}

Пример использования

var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();

Вывод:

Result: 1
2 голосов
/ 23 апреля 2019

С точки зрения синхронизации это работает нормально.

Но, кажется, бесполезно делать это таким образом. Если вы хотите выполнять задания одно за другим, вы можете просто использовать блокировку:

lock (lockObject) {
  RunJob();
}

Каково ваше намерение с этим кодом?

Существует также вопрос эффективности, поскольку каждая задача создает событие ОС и ожидает его. Если вы используете более современный TaskCompletionSource, он будет использовать то же самое под капотом, если вы будете синхронно ждать этой задачи. Вы можете использовать асинхронное ожидание (await myTCS.Task;), чтобы немного повысить эффективность. Конечно, это заражает весь стек вызовов с помощью async / await. Если это довольно низкая громкость, вы не получите много.

1 голос
/ 23 апреля 2019

В общем, я думаю, что это сработает, хотя, когда вы говорите, что «многие» потоки вызывают Do (), это может плохо масштабироваться ... приостановленные потоки используют ресурсы.

Другая проблема с этим кодом заключается в том, что во время простоя у вас будет «жесткий цикл» в «workerThread», который заставит ваше приложение возвращать высокую загрузку ЦП.Возможно, вы захотите добавить этот код в «workerThread»:

if (concurrentQueue.IsEmpty) Thread.Sleep(1);

Возможно, вы также захотите ввести тайм-аут для вызова WaitOne, чтобы избежать застревания журнала.

...