Serial Task Executor;эта тема безопасна? - PullRequest
1 голос
/ 08 сентября 2010

У меня есть класс, который я создал, чтобы разрешить асинхронное последовательное выполнение задач, используя ThreadPool в качестве средства выполнения.Идея состоит в том, что у меня будет несколько экземпляров, выполняющих последовательные задачи в фоновом режиме, но я не хочу иметь отдельный выделенный поток для каждого экземпляра.Я хотел бы проверить, действительно ли этот класс поточно-ориентирован.Это довольно кратко, поэтому я подумал, что я буду управлять этим здесь экспертами, на случай, если я упущу что-то очевидное.Я пропустил несколько вспомогательных перегрузок для различных типов действий.

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool mTaskExecuting;

    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            bool isFirstTask = (mTasks.Count == 0);
            mTasks.Enqueue(task);

            //Only start executing the task if this is the first task
            //Additional tasks will be executed normally as part of sequencing
            if (isFirstTask && !mTaskExecuting)
                RunNextTask();
        }
    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (mTasks.Count > 0 || mTaskExecuting)
                Monitor.Wait(mTasks);
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        task();
        mTaskExecuting = false;
        RunNextTask();
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                mTaskExecuting = true;
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                //If anybody is waiting for tasks to be complete, let them know
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

ОБНОВЛЕНИЕ: я исправил код, чтобы исправить основные ошибки, любезно указанные Симоном.Сейчас он проходит модульные тесты, но я все еще приветствую наблюдения.

Ответы [ 5 ]

2 голосов
/ 08 сентября 2010

Не делай этого. (Или, по крайней мере, избегайте создания своих собственных вещей.)

Используйте System.Threading.Tasks (новое в .NET 4.0). Создайте свою задачу [] (размер зависит от количества желаемых параллельных задач) и дайте им возможность читать рабочие элементы из BlockingCollection в ожидании CancellationToken . Ваша реализация WaitForAll вызовет ваш токен и вызовет Task.WaitAll (Task []) , который будет блокироваться, пока все ваши задачи не будут выполнены.

1 голос
/ 25 февраля 2011

Он встроен в 4.0

Как: создать планировщик задач, ограничивающий степень параллелизма

Вы также можете использовать собственный планировщик для достижения функциональностипланировщик по умолчанию не обеспечивает, например, строгий порядок выполнения «первым пришел - первым вышел» (FIFO).В следующем примере показано, как создать собственный планировщик задач.Этот планировщик позволяет указать степень параллелизма.

1 голос
/ 08 сентября 2010

Вот мой второй ответ, предполагающий, что вы не можете использовать .NET 4.0 (и хотите получить комментарии к существующему коду).

QueueTask ставит в очередь первую задачу, получая isFirstTask = true, и запускает новый поток.Тем не менее, другой поток может поставить в очередь что-то в процессе обработки первого потока, и Count == 0 => isFirstTask = true, и еще один поток создается.

Кроме того, WaitUntilAllComplete будет зависать бесконечно, если выполнение задачи выдаетисключение (которое может не обязательно вызывать сбой всего, в зависимости от обработки исключения), в результате чего оно пропускает вызов RunNextTask ().

А ваш WaitUntilAllComplete просто ждет, пока не останется больше задач постановки в очередь, а не тех, которые в настоящее время выполняются, фактически выполняются (их можно просто поставить в очередь в ThreadPool) или завершить.

0 голосов
/ 15 июля 2013
public class ParallelExcecuter
{
    private readonly BlockingCollection<Task> _workItemHolder;

    public ParallelExcecuter(int maxDegreeOfParallelism)
    {
        _workItemHolder = new BlockingCollection<Task>(maxDegreeOfParallelism);
    }

    public void Submit(Action action)
    {
        _workItemHolder.Add(Task.Run(action).ContinueWith(t =>
        {
            _workItemHolder.Take();
        }));

    }

    public void WaitUntilWorkDone()
    {
        while (_workItemHolder.Count < 0)
        {
            Monitor.Wait(_workItemHolder);
        }
    }
}
0 голосов
/ 09 сентября 2010

Я вижу несколько проблем с вашим классом SerialAsyncTasker, но, похоже, вы хорошо разбираетесь в них, поэтому я не буду вдаваться в подробности по этой теме (я могу отредактировать свой ответ с более подробной информацией позже) , В комментариях вы указали, что не можете использовать функции .NET 4.0 и не можете использовать обратный порт Reactive Extensions. Я предлагаю использовать шаблон производитель-потребитель с одним потребителем в выделенном потоке. Это идеально соответствует вашему требованию асинхронного выполнения задач последовательно.

Примечание. Вам придется укрепить код, чтобы обеспечить корректное завершение работы, обработку исключений и т. Д.

public class SerialAsyncTasker
{
  private BlockingCollection<Action> m_Queue = new BlockingCollection<Action>();

  public SerialAsyncTasker()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action task = m_Queue.Take();
          task();
        }
      });
    thread.IsBackground = true;
    thread.Start();
  }

  public void QueueTask(Action task)
  {
    m_Queue.Add(task);
  }
}

Жаль, что вы не можете использовать BlockingCollection из загрузки .NET 4.0 BCL или Reactive Extension, но не беспокойтесь. На самом деле это не так уж сложно реализовать самостоятельно. Вы можете использовать очередь блокировки Стивена Туба в качестве отправной точки и просто переименовать несколько вещей.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0) Monitor.Wait(m_Queue);
            return m_Queue.Dequeue();
        }
    }

    public void Add(T value)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(value);
            Monitor.Pulse(m_Queue);
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...