Как повторно использовать темы в .NET 3.5 - PullRequest
8 голосов
/ 29 апреля 2011

У меня есть подпрограмма, которая обрабатывает большие блоки информации. Чтобы использовать весь процессор, он разделяет работу на отдельные потоки. После того, как все потоки завершены, он заканчивается. Я читал, что создание и удаление потоков требует много накладных расходов, поэтому я попытался использовать пул потоков, но на самом деле он работает медленнее, чем создание собственных потоков. Как я могу создавать свои собственные потоки, когда программа запускается, а затем снова использовать их? Я видел, как некоторые люди говорят, что это не может быть сделано, но пул потоков делает это так, это должно быть возможно, верно?

Вот часть кода, который запускает новые потоки / использует пул потоков:

//initialization for threads
Thread[] AltThread = null;
if (NumThreads > 1)
    AltThread = new Thread[pub.NumThreads - 1];

do
{
    if (NumThreads > 1)
    {   //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads
        int ThreadWidth = DataWidth / NumThreads;
        if (UseThreadPool) //use threadpool threads
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread, 
                    new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            //get number of threads available after queue
            System.Threading.Thread.Sleep(0);
            int StartThreads, empty, EndThreads;
            ThreadPool.GetAvailableThreads(out StartThreads, out empty);
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            do
            {
                ThreadPool.GetAvailableThreads(out EndThreads, out empty);
                System.Threading.Thread.Sleep(1);
            } while (StartThreads - EndThreads > 0);
        }
        else //create new threads each time (can we reuse these?)
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                AltThread[i] = new Thread(ComputePartialDataOnThread);
                AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            foreach (Thread t in AltThread)
                t.Join(1000);
            foreach (Thread t in AltThread)
                if (t.IsAlive) t.Abort();
        }
    }
}

ComputePartialDataOnThread просто распаковывает информацию и вызывает ComputePartialData. Данные, которые будут обработаны, распределяются между потоками (они не пытаются читать / записывать одни и те же места). AltEngine [] - это отдельный механизм вычислений для каждого потока.

Операция выполняется на 10-20% с использованием пула потоков.

Ответы [ 3 ]

13 голосов
/ 29 апреля 2011

Это звучит как довольно распространенное требование, которое может быть решено с помощью многопоточной очереди производитель-потребитель.Потоки остаются «живыми» и получают сигнал на выполнение работы, когда новая работа добавляется в очередь.Работа представляется делегатом (в вашем случае ComputePartialDataOnThread), а данные, передаваемые делегату, - это то, что стоит в очереди (в вашем случае это параметры для ComputePartialDataOnThread).Полезной особенностью является то, что реализация управления рабочими потоками и фактические алгоритмы являются отдельными.Вот очередь ПК:

public class SuperQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;

    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="workerCount">The worker count.</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public SuperQueue(int workerCount, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;
        _workers = new List<Thread>(workerCount);

        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
            _workers.Add(t);
            t.Start();
        }
    }

    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
            Monitor.PulseAll(_locker);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item;
            lock (_locker)
            {
                while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        _workers.ForEach(thread => EnqueueTask(null));

        _workers.ForEach(thread => thread.Join());

    }
}

Как говорилось в предыдущих постерах, есть много встроенных структур (посмотрите на TPL), которые используют Threadpool, на который вы, возможно, захотите взглянуть, прежде чем создавать свою собственную очередь.

2 голосов
/ 29 апреля 2011

Таким образом, обычный способ сделать это состоит в том, чтобы точка входа каждого потока по существу выполняла что-то похожее (это всего лишь алгоритм, а не код C #, извините):

  1. Проверьте, можете ли выесть работа, которую нужно сделать
  2. Работать, если найден
  3. Ожидать сигнал

С другой стороны, когда у вас есть больше работы для вашего потока, добавьте его в очередьработы, а затем ваша нить по существу используется повторно.Это очень похоже на то, как можно реализовать пул потоков самостоятельно (если вы находитесь во время выполнения, вы можете сделать некоторые другие вещи, чтобы выручить вас, но это не так уж и сложно).

0 голосов
/ 29 апреля 2011

Вот поток, в котором говорится об этом: Пользовательский класс пула потоков / очереди.

...