Есть ли очевидные проблемы с этой цепочкой логики / кода? - PullRequest
2 голосов
/ 26 июня 2011

Одна из книг, которые я читаю, - «Искусство многопроцессорного программирования Мориса Херлихи и Нира Шавита». В нем есть очередь «без ожидания», которая (после небольшой языковой адаптации) прекрасно выполняет тестовые и логические функции в многопоточной среде. По крайней мере, нет конфликтов даже для 10 000 000 элементов, распределенных по 5 потокам и логика проверяется.

(я отредактировал очередь, чтобы она возвращала значение false, если элемент не может быть получен, вместо того, чтобы выдавать исключение. Код приведен ниже).

Однако у него есть одно предупреждение; очередь не может расти. Простая логика проверяет состояния, которые не могут расти без блокировки очереди - что несколько отрицает необходимость иметь очередь без блокировки.

Таким образом, цель состоит в том, чтобы создать очередь без блокировки (или, по крайней мере, без голодания), которая может расти.

Итак: если мы, по сути, даем каждому потоку свою собственную общую очередь способом, который не является оксюмороном (и принимая высокую вероятность того, что это было решено и лучше для целей обучения на практике):

        WaitFreeQueue<Queue<int>> queues = new WaitFreeQueue<Queue<int>>(threadCount);

        // Dequeue a queue, enqueue an item, enqueue the queue.

        // Dequeue a queue, dequeue an item, enqueue the queue.

И очередь без ожидания (предыдущий код включен в комментарии на случай, если я внесу какие-либо критические изменения):

/// <summary>
/// A wait-free queue for use in threaded environments.
/// Significantly adapted from "The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit".
/// </summary>
/// <typeparam name="T">The type of item in the queue.</typeparam>
public class WaitFreeQueue<T>
{
    /// <summary>
    /// The index to dequeue from.
    /// </summary>
    protected int head;
    /// <summary>
    /// The index to queue to.
    /// </summary>
    protected int tail;
    /// <summary>
    /// The array to queue in.
    /// </summary>
    protected T[] items;


    /// <summary>
    /// The number of items queued.
    /// </summary>
    public int Count
    {
        get { return tail - head; }
    }


    /// <summary>
    /// Creates a new wait-free queue.
    /// </summary>
    /// <param name="capacity">The capacity of the queue.</param>
    public WaitFreeQueue(int capacity)
    {
        items = new T[capacity];
        head = 0; tail = 0;
    }


    /// <summary>
    /// Attempts to enqueue an item.
    /// </summary>
    /// <param name="value">The item to enqueue.</param>
    /// <returns>Returns false if there was no room in the queue.</returns>
    public bool Enqueue(T value)
    {
        if (tail - head == items.Length)
            // throw new IndexOutOfRangeException();
            return false;
        items[tail % items.Length] = value;
        System.Threading.Interlocked.Increment(ref tail);
        return true;
        // tail++;
    }


    /// <summary>
    /// Attempts to dequeue an item.
    /// </summary>
    /// <param name="r">The variable to dequeue to.</param>
    /// <returns>Returns true if there was an available item to dequeue.</returns>
    public bool Dequeue(out T r)
    {
        if (tail - head == 0)
        // throw new InvalidOperationException("No more items.");
        { r = default(T); return false; }
        r = items[head % items.Length];
        System.Threading.Interlocked.Increment(ref head);
        // head++;
        // return r;
        return true;
    }
}

Итак: это сработает? Если нет, то почему? Если да, то есть ли еще какие-нибудь проблемы?

Спасибо.

Ответы [ 2 ]

5 голосов
/ 26 июня 2011

Пытаться писать многопоточный код без блокировок сложно, и вы должны либо оставить его людям, которые знают его лучше, чем вы или я (и используйте, например, ConcurrentQueue<T>), либо вообще не писать его (и использоватьблокирует), если возможно.

При этом существует несколько проблем с вашим кодом:

  1. Это не очередь.Если threadCount равно 2, и вы ставите в очередь элементы 1, 2 и 3, один за другим, а затем удаляете из очереди, вы получаете элемент 2!
  2. Сначала вы не можете использовать значениеполе и затем назовите Interlocked.Increment() на нем, как вы делаете.Представьте себе что-то вроде этого:

    1. В потоке 1: items[tail % items.Length] = value;
    2. В потоке 2: items[tail % items.Length] = value;
    3. В потоке 2: Interlocked.Increment(ref head);
    4. В потоке 1: Interlocked.Increment(ref head);

    Теперь обе нити поставлены в одну и ту же позицию, и позиция после этого не изменилась.Это неправильно.

2 голосов
/ 26 июня 2011

Код, который вы разместили, не является потокобезопасным ни для постановки в очередь, ни для удаления из очереди.

Ставить

  • У вас пустая очередь
  • head = tail = 0
  • У вас есть 2 темы, которые пытаются поставить в очередь.
  • Первый поток выполняет Enqueue и прерывается после items[tail % items.Length] = value;, но до выполнения взаимосвязанного приращения. Теперь он записал значение в item[0], но не увеличил tail
  • Второй поток вводит Enqueue и выполняет метод. Теперь tail по-прежнему равен 0, и поэтому значение, записанное предыдущим потоком, заменяется, поскольку второй поток также записывает значение в item[0].
  • Обе темы заканчивают выполнение.
  • Количество предметов будет правильным, но вы потеряете данные.

Dequeue

  • Предположим, у вас есть два элемента в очереди в item[0] и items[1]
  • head = 0 и tail = 2
  • У вас есть два потока, пытающихся удалить из очереди.
  • Первый поток попадает в Dequeue и прерывается после r = items[head % items.Length];, но до выполнения взаимосвязанных приращений.
  • Теперь второй поток входит в Dequeue и возвращает тот же элемент, что и head по-прежнему 0
  • После этого ваша очередь будет пуста, но вы прочтете один элемент дважды, а один элемент совсем не будет.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...