Научиться реализовывать пул потоков - сигнальные события теряются при использовании autoresetevent - PullRequest
3 голосов
/ 17 декабря 2011

Я твердо верю в обучение путем переизобретения. С таким состоянием ума я решил реализовать собственный пул потоков. Поставленная мною цель была следующей:

  1. Чтобы иметь возможность ставить рабочие элементы в очередь в пуле потоков.
  2. Чтобы иметь возможность обрабатывать рабочие элементы с фиксированным количеством потоков - все они создаются одновременно.
  3. Функция общего рабочего потока должна знать только, как обрабатывать и не должна иметь дело с другими функциями / свойствами, такими как IsEmpty или Count.

Мне удалось достичь вышеупомянутых целей, но я хочу проверить подход, который я использовал с экспертами в отношении стекопотока. Также хотелось бы узнать, есть ли лучшие подходы или как бы эксперт по многопоточности решил эту проблему. В следующих параграфах упоминается проблема, с которой я столкнулся, и как я ее исправил.

Пул потоков, который я создал, внутренне поддерживал очередь рабочих элементов, из которой все рабочие потоки выбирали элемент, а затем обрабатывали его. Всякий раз, когда новый элемент ставится в очередь, он сигнализирует о событии, так что любой свободный поток может его забрать и выполнить.

Я начал с autoresetevent, чтобы сигнализировать ожидающим потокам о любых новых рабочих элементах в очереди, но я столкнулся с проблемой потерянных сигнальных событий. Это происходит, когда в очередь помещается более одного элемента, и нет свободных потоков для его обработки. Общее количество элементов, которые остаются необработанными, соответствует общему количеству событий сигнала, которые были потеряны из-за перекрывающихся событий набора (сигнализации).

Чтобы решить проблему потерянных сигнальных событий, я создал обертку поверх autoresetevent и использовал ее вместо autoresetevent. Это исправлено на проблему. Вот список кодов для того же самого:

public static class CustomThreadPool
{
    static CustomThreadPool()
    {
        for (int i = 0; i < minThreads; i++)
            _threads.Add(
                new Thread(ThreadFunc) { IsBackground = true }
                );

        _threads.ForEach((t) => t.Start());
    }

    public static void EnqueWork(Action action)
    {
        _concurrentQueue.Enqueue(action);
        _enqueEvent.Set();
    }

    private static void ThreadFunc()
    {
        Action action = null;
        while (true)
        {
            _enqueEvent.WaitOne();
            _concurrentQueue.TryDequeue(out action);
            action();
        }
    }

    private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
    private static List<Thread> _threads = new List<Thread>();
    private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
    private static object _syncObject = new object();
    private const int minThreads = 4;
    private const int maxThreads = 10;

    public static void Test()
    {
        CustomThreadPool.EnqueWork(() => {

            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****First*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Second*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Third*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fourth*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fifth*****");
        });
    }
}

public class CountAutoResentEvent
{
    public void Set()
    {
        _event.Set();
        lock (_sync)
            _countOfSet++;
    }

    public void WaitOne()
    {
        _event.WaitOne();
        lock (_sync)
        {
            _countOfSet--;
            if (_countOfSet > 0)
                _event.Set();
        }
    }

    private AutoResetEvent _event = new AutoResetEvent(false);
    private int _countOfSet = 0;
    private object _sync = new object();
}

Теперь у меня есть несколько вопросов:

  1. Является ли мой подход полным доказательством?
  2. Какой механизм синхронизации лучше всего подходит для этой проблемы и почему?
  3. Как эксперт по многопоточности справится с этой проблемой?

Спасибо.

1 Ответ

1 голос
/ 17 декабря 2011

Из того, что я видел, я бы сказал, что это правильно.Мне нравится, что вы использовали ConcurrentQueue и не использовали собственную синхронизированную очередь.Это беспорядок, и, скорее всего, он будет не таким быстрым, как существующий.

Я хотел бы только отметить, что ваш собственный «механизм сигнализации» на самом деле очень похож на семафор: блокировка, которая допускает более одногопоток, чтобы войти в критическую секцию.Эта функциональность уже существует в классе Семафор .

...