Одна из книг, которые я читаю, - «Искусство многопроцессорного программирования Мориса Херлихи и Нира Шавита». В нем есть очередь «без ожидания», которая (после небольшой языковой адаптации) прекрасно выполняет тестовые и логические функции в многопоточной среде. По крайней мере, нет конфликтов даже для 10 000 000 элементов, распределенных по 5 потокам и логика проверяется.
(я отредактировал очередь, чтобы она возвращала значение false, если элемент не может быть получен, вместо того, чтобы выдавать исключение. Код приведен ниже).
Однако у него есть одно предупреждение; очередь не может расти. Простая логика проверяет состояния, которые не могут расти без блокировки очереди - что несколько отрицает необходимость иметь очередь без блокировки.
Таким образом, цель состоит в том, чтобы создать очередь без блокировки (или, по крайней мере, без голодания), которая может расти.
Итак: если мы, по сути, даем каждому потоку свою собственную общую очередь способом, который не является оксюмороном (и принимая высокую вероятность того, что это было решено и лучше для целей обучения на практике):
WaitFreeQueue<Queue<int>> queues = new WaitFreeQueue<Queue<int>>(threadCount);
// Dequeue a queue, enqueue an item, enqueue the queue.
// Dequeue a queue, dequeue an item, enqueue the queue.
И очередь без ожидания (предыдущий код включен в комментарии на случай, если я внесу какие-либо критические изменения):
/// <summary>
/// A wait-free queue for use in threaded environments.
/// Significantly adapted from "The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit".
/// </summary>
/// <typeparam name="T">The type of item in the queue.</typeparam>
public class WaitFreeQueue<T>
{
/// <summary>
/// The index to dequeue from.
/// </summary>
protected int head;
/// <summary>
/// The index to queue to.
/// </summary>
protected int tail;
/// <summary>
/// The array to queue in.
/// </summary>
protected T[] items;
/// <summary>
/// The number of items queued.
/// </summary>
public int Count
{
get { return tail - head; }
}
/// <summary>
/// Creates a new wait-free queue.
/// </summary>
/// <param name="capacity">The capacity of the queue.</param>
public WaitFreeQueue(int capacity)
{
items = new T[capacity];
head = 0; tail = 0;
}
/// <summary>
/// Attempts to enqueue an item.
/// </summary>
/// <param name="value">The item to enqueue.</param>
/// <returns>Returns false if there was no room in the queue.</returns>
public bool Enqueue(T value)
{
if (tail - head == items.Length)
// throw new IndexOutOfRangeException();
return false;
items[tail % items.Length] = value;
System.Threading.Interlocked.Increment(ref tail);
return true;
// tail++;
}
/// <summary>
/// Attempts to dequeue an item.
/// </summary>
/// <param name="r">The variable to dequeue to.</param>
/// <returns>Returns true if there was an available item to dequeue.</returns>
public bool Dequeue(out T r)
{
if (tail - head == 0)
// throw new InvalidOperationException("No more items.");
{ r = default(T); return false; }
r = items[head % items.Length];
System.Threading.Interlocked.Increment(ref head);
// head++;
// return r;
return true;
}
}
Итак: это сработает? Если нет, то почему? Если да, то есть ли еще какие-нибудь проблемы?
Спасибо.