Я твердо верю в обучение путем переизобретения. С таким состоянием ума я решил реализовать собственный пул потоков. Поставленная мною цель была следующей:
- Чтобы иметь возможность ставить рабочие элементы в очередь в пуле потоков.
- Чтобы иметь возможность обрабатывать рабочие элементы с фиксированным количеством потоков - все они создаются одновременно.
- Функция общего рабочего потока должна знать только, как обрабатывать и не должна иметь дело с другими функциями / свойствами, такими как
IsEmpty или Count.
Мне удалось достичь вышеупомянутых целей, но я хочу проверить подход, который я использовал с экспертами в отношении стекопотока. Также хотелось бы узнать, есть ли лучшие подходы или как бы эксперт по многопоточности решил эту проблему. В следующих параграфах упоминается проблема, с которой я столкнулся, и как я ее исправил.
Пул потоков, который я создал, внутренне поддерживал очередь рабочих элементов, из которой все рабочие потоки выбирали элемент, а затем обрабатывали его. Всякий раз, когда новый элемент ставится в очередь, он сигнализирует о событии, так что любой свободный поток может его забрать и выполнить.
Я начал с autoresetevent, чтобы сигнализировать ожидающим потокам о любых новых рабочих элементах в очереди, но я столкнулся с проблемой потерянных сигнальных событий. Это происходит, когда в очередь помещается более одного элемента, и нет свободных потоков для его обработки. Общее количество элементов, которые остаются необработанными, соответствует общему количеству событий сигнала, которые были потеряны из-за перекрывающихся событий набора (сигнализации).
Чтобы решить проблему потерянных сигнальных событий, я создал обертку поверх autoresetevent и использовал ее вместо autoresetevent. Это исправлено на проблему. Вот список кодов для того же самого:
public static class CustomThreadPool
{
static CustomThreadPool()
{
for (int i = 0; i < minThreads; i++)
_threads.Add(
new Thread(ThreadFunc) { IsBackground = true }
);
_threads.ForEach((t) => t.Start());
}
public static void EnqueWork(Action action)
{
_concurrentQueue.Enqueue(action);
_enqueEvent.Set();
}
private static void ThreadFunc()
{
Action action = null;
while (true)
{
_enqueEvent.WaitOne();
_concurrentQueue.TryDequeue(out action);
action();
}
}
private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
private static List<Thread> _threads = new List<Thread>();
private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
private static object _syncObject = new object();
private const int minThreads = 4;
private const int maxThreads = 10;
public static void Test()
{
CustomThreadPool.EnqueWork(() => {
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****First*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Second*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Third*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fourth*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fifth*****");
});
}
}
public class CountAutoResentEvent
{
public void Set()
{
_event.Set();
lock (_sync)
_countOfSet++;
}
public void WaitOne()
{
_event.WaitOne();
lock (_sync)
{
_countOfSet--;
if (_countOfSet > 0)
_event.Set();
}
}
private AutoResetEvent _event = new AutoResetEvent(false);
private int _countOfSet = 0;
private object _sync = new object();
}
Теперь у меня есть несколько вопросов:
- Является ли мой подход полным доказательством?
- Какой механизм синхронизации лучше всего подходит для этой проблемы и почему?
- Как эксперт по многопоточности справится с этой проблемой?
Спасибо.