Что не так с моим пользовательским пулом потоков? - PullRequest
0 голосов
/ 19 января 2009

Я создал специальную утилиту пула потоков, но, похоже, возникла проблема, которую я не могу найти.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}

}

Проблема здесь:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);

В заголовке всегда есть ОДИН идентификатор потока. И программа, кажется, работает в одном потоке.

Пример использования:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);

P.S .: это довольно грязно, но я все еще работаю над этим.

Ответы [ 4 ]

5 голосов
/ 19 января 2009

Я просмотрел ваш код, и вот пару проблем, которые я видел.

  1. Вы блокируете объект очереди, даже если это синхронизированная очередь. Это не нужно
  2. Вы непоследовательно блокируете объект очереди. Он должен быть либо заблокирован для каждого доступа, либо не заблокирован и зависит от режима синхронизации.
  3. Метод Proceed не является потокобезопасным. Эти две строки - проблема

        if (queue.Count > 0) {
          var item = (T)queue.Dequeue();
        ...
        }
    

    Использование синхронизированной очереди только гарантирует безопасность отдельных обращений. Таким образом, и метод .Count, и метод .Dequeue не будут связываться с внутренней структурой очереди. Однако представьте себе сценарий, когда два потока запускают эти строки кода одновременно с очередью отсчета 1

    • Тема1: if (...) -> true
    • Тема 2: if (...) -> true
    • Thread1: dequeue -> sucess
    • Thread2: dequeue -> терпит неудачу, потому что очередь пуста
  4. Между Рабочим и Приступом существует состояние гонки, которое может привести к тупику. Следующие две строки кода должны быть переключены.

    Код:

        res_thr.Set()
        Interlocked.Decrement(ref Num_Of_Threads);

    Первая строка разблокирует метод Worker. Если он работает достаточно быстро, он вернется через просмотр, заметьте, что Num_Of_Threads

  5. Свойство maxThread count кажется бесполезным после 4. Объект sem инициализируется для приема только 4 максимальных одновременных записей. Весь код, который фактически выполняет элемент, должен проходить через этот семафор. Таким образом, вы фактически ограничили максимальное количество одновременных элементов до 4, независимо от того, как установлено максимальное значение maxThread.
4 голосов
/ 19 января 2009

Написание надежного многопоточного кода не тривиально. Существует множество пулов потоков, к которым вы можете обратиться за справкой, но также обратите внимание, что Parallel Extensions (доступные как CTP или более поздние в .NET 4.0) включают множество дополнительных конструкций потоков вне коробка (в TPL / CCR). Например, Parallel.For / Parallel.ForEach, которые касаются кражи работы и эффективной обработки доступных ядер.

Пример предварительно свернутого пула потоков см. В статье CustomThreadPool Джона Скита .

2 голосов
/ 19 января 2009

Я думаю, что вы можете просто вещи значительно.

Вот измененная форма (я не проверял модификации) пула потоков, который я использую:

Единственная синхронизация. Примитив, который вам нужен - это монитор, заблокированный в пуле потоков. Вам не нужен семафор или события сброса.

internal class ThreadPool
{
    private readonly Thread[] m_threads;
    private readonly Queue<Action> m_queue;
    private bool m_shutdown;
    private object m_lockObj;


    public ThreadPool(int numberOfThreads)
    {
        Util.Assume(numberOfThreads > 0, "Invalid thread count!");
        m_queue = new Queue<Action>();
        m_threads = new Thread[numberOfThreads];
        m_lockObj = new object();

        lock (m_lockObj)
        {
            for (int i = 0; i < numberOfWriteThreads; ++i)
            {
                m_threads[i] = new Thread(ThreadLoop);
                m_threads[i].Start();
            }
        }

    }

    public void Shutdown()
    {
        lock (m_lockObj)
        {
            m_shutdown = true;
            Monitor.PulseAll(m_lockObj);

            if (OnShuttingDown != null)
            {
                OnShuttingDown();
            }
        }
        foreach (var thread in m_threads)
        {
            thread.Join();
        }
    }
    public void Enqueue(Action a)
    {
        lock (m_lockObj)
        {
            m_queue.Enqueue(a);
            Monitor.Pulse(m_lockObj);
        }
    }

    private void ThreadLoop()
    {
        Monitor.Enter(m_lockObj);

        while (!m_shutdown)
        {
            if (m_queue.Count == 0)
            {
                Monitor.Wait(m_lockObj);
            }
            else
            {
                var a = m_queue.Dequeue();
                Monitor.Pulse(m_lockObj);
                Monitor.Exit(m_lockObj);
                try
                {
                    a();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null);
                }
                Monitor.Enter(m_lockObj);
            }
        }

        Monitor.Exit(m_lockObj);
    }
}
1 голос
/ 19 января 2009

Вы, вероятно, должны использовать встроенный пул потоков. При запуске вашего кода я заметил, что вы раскручиваете кучу потоков, но поскольку число очередей <1, вы просто выходите, это продолжается до тех пор, пока очередь не заполнится, а затем ваш следующий поток обрабатывает все. Это очень дорогой процесс. Вы должны только раскручивать темы, если у вас есть чем заняться. </p>

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...