Реализация очереди кражи работы в C / C ++? - PullRequest
32 голосов
/ 20 января 2010

Я ищу правильную реализацию очереди кражи работы в C / CPP. Я посмотрел вокруг Google, но не нашел ничего полезного.

Возможно, кто-то знаком с хорошей реализацией с открытым исходным кодом? (Я предпочитаю не применять псевдокод, взятый из оригинальных научных работ).

Ответы [ 13 ]

13 голосов
/ 31 января 2010

Реализовать «воровство работы» не сложно в теории. Вам нужен набор очередей, содержащих задачи, которые работают, выполняя комбинацию вычислений и генерируя другие задачи, чтобы выполнять больше работы. И вам нужен атомарный доступ к очередям, чтобы помещать вновь созданные задачи в эти очереди. Наконец, вам нужна процедура, которую каждая задача вызывает в конце, чтобы найти больше работы для потока, который выполнил задачу; эта процедура должна искать в рабочих очередях, чтобы найти работу.

В большинстве таких систем кражи работы предполагается, что имеется небольшое количество потоков (обычно резервируемых реальными ядрами процессора) и что для каждого потока существует ровно одна очередь работ. Затем вы сначала пытаетесь украсть работу из собственной очереди, а если она пуста, попробуйте украсть у других. Хитроумно знать, какие очереди искать; Сканирование их поочередно для работы довольно дорого и может вызвать колоссальные разногласия между потоками в поисках работы.

Пока что все это довольно общие вещи с одним, кроме двух основных исключений: 1) переключение контекстов (например, установка регистров контекста процессора, таких как «стек») не может быть задано в чистом C или C ++. Вы можете решить эту проблему, согласившись записать часть своего пакета в машинный код конкретной целевой платформы. 2) Атомарный доступ к очередям для мультипроцессора не может быть сделан исключительно в C или C ++ (без учета алгоритма Деккера), поэтому вам нужно будет кодировать те, которые используют примитивы синхронизации на ассемблере, такие как X86 LOCK XCH или Compare and Swap. Теперь код, используемый для обновления очереди после получения безопасного доступа, не очень сложен, и вы можете легко написать это в несколько строк на языке C.

Однако, я думаю, вы обнаружите, что попытка кодировать такой пакет на C и C ++ с помощью смешанного ассемблера все еще довольно неэффективна, и в конечном итоге вы все равно будете в конечном итоге кодировать все это на ассемблере. Все, что осталось, это C / C ++ совместимые точки входа: -}

Я сделал это для нашего PARLANSE языка параллельного программирования, который предлагает идею сколь угодно большого числа параллельных вычислений, живущих и взаимодействующих (синхронизирующих) в любой момент. Он реализован за кулисами на X86 точно с одним потоком на процессор, а реализация полностью на ассемблере. Код для кражи работы, вероятно, содержит в общей сложности 1000 строк, и его сложный код, потому что вы хотите, чтобы он был очень быстрым в неконфликтном случае.

Реальная ложка дегтя для C и C ++ заключается в том, сколько места в стеке вы назначаете при создании задачи, представляющей работу? Программы на последовательном C / C ++ избегают этого вопроса, просто перераспределяя огромные объемы (например, 10 МБ) одного линейного стека, и никто не заботится о том, сколько этого пространства стека потрачено впустую. Но если вы можете создавать тысячи задач и выполнять их все в определенный момент времени, вы не сможете разумно выделить 10 Мбайт для каждой из них. Так что теперь вам нужно либо статически определить, сколько стекового пространства понадобится задаче (сложный по Тьюрингу), либо вам нужно выделить куски стека (например, на вызов функции), чего не делают широко доступные компиляторы C / C ++ (например, тот, который вы, вероятно, используете). Последний выход - ограничить создание задач, чтобы в любой момент ограничить их несколькими сотнями, и объединить несколько сотен действительно огромных стеков среди текущих задач. Вы не можете сделать последнее, если задачи могут заблокировать / приостановить состояние, потому что вы достигнете своего порога. Таким образом, вы можете сделать это, только если задачи only выполняют вычисления. Это выглядит как довольно серьезное ограничение.

Для PARLANSE мы создали компилятор, который распределяет записи активации в куче для каждого вызова функции.

13 голосов
/ 26 января 2010

Нет бесплатного обеда.

Пожалуйста, посмотрите оригинал работы краже бумаги . Этот документ трудно понять. Я знаю, что статья содержит теоретическое доказательство, а не псевдокод. Тем не менее, просто нет такой гораздо более простой версии, чем TBB. Если таковые имеются, это не даст оптимальной производительности. Сама кража работы влечет за собой некоторые накладные расходы, поэтому оптимизация и приемы очень важны. Особенно, очереди должны быть потокобезопасными. Реализация синхронизации с высокой степенью масштабируемости и минимальными издержками является сложной задачей.

Мне действительно интересно, зачем тебе это нужно. Я думаю, что правильная реализация означает что-то вроде TBB и Cilk. Опять же, воровство трудновыполнимо.

13 голосов
/ 20 января 2010

Взгляните на многопоточные блоки Intel.

http://www.threadingbuildingblocks.org/

2 голосов
/ 31 января 2010

Если вы ищете отдельную реализацию класса очередей рабочих столов в C ++, построенную на pthread или boost :: thread, то удачи, насколько мне известно, нет.

Однако, как говорили другиеCilk, TBB и Microsoft PPL имеют встроенные реализации рабочего процесса.

Вопрос в том, хотите ли вы использовать очередь рабочего процесса или внедрить ее?Если вы просто хотите использовать один из них, то вышеприведенные варианты являются хорошими отправными точками, для этого достаточно просто запланировать «задачу» в любом из них.

Как сказал BlueRaja, это сделает task_group & structured_task_group в PPL, также обратите внимание, что эти классы также доступны в последней версии Intel TBB.Параллельные циклы (parallel_for, parallel_for_each) также реализованы с помощью рабочего стола.

Если вам нужно смотреть на источник, а не использовать реализацию, TBB - это OpenSource, а Microsoft отправляет источники для своего CRT, так что вы можете приступить к изучению.

В блоге Джо Даффи также можно найти реализацию C # (но это C # и модель памяти другая).

-Rick

2 голосов
/ 27 января 2010

Существует инструмент, позволяющий сделать это очень элегантным способом. Это действительно эффективный способ распараллелить вашу программу за очень короткое время.

Проект Cilk

HPC Challenge Award

Наша заявка на участие в конкурсе HPC Challenge Премия класса 2 получила награду 2006 года за «Лучшее сочетание элегантности и Спектакль''. Награда была сделана в SC'06 в Тампе 14 ноября 2006 года.

1 голос
/ 31 декабря 2018

Эта библиотека с открытым исходным кодом https://github.com/cpp-taskflow/cpp-taskflow поддерживает рабочие пулы потоков с декабря 2018 года.

Взгляните на класс WorkStealingQueue, который реализует очередь на кражу работы, как описано в статье «Динамическая круговая блокировка на работу», SPAA, 2015.

1 голос
/ 17 ноября 2015

OpenMP вполне может поддерживать кражу работы, хотя его называют рекурсивным параллелизмом

Сообщение на форуме OpenMP

Спецификация OpenMP определяет конструкции задач (которые могут быть вложенными, поэтому очень подходят для рекурсивного параллелизма), но не определяет детали того, как они реализованы. Реализации OpenMP, включая gcc, обычно используют некоторую форму кражи работы для задач, хотя точный алгоритм (и результирующая производительность) могут отличаться!

См. #pragma omp task и #pragma omp taskwait

Обновление

Глава 9 книги Параллелизм C ++ в действии описывает, как реализовать «кражу работы для потоков пула». Я не читал / не реализовывал это сам, но это не выглядит слишком сложно.

1 голос
/ 20 января 2014

Самая близкая реализация этого алгоритма кражи работ, который я нашел, - это что-то под названием Wool от Karl-Filip Faxén. src / отчет / сравнение

1 голос
/ 25 января 2010

Класс structd_task_group PPL использует для своей работы очередь на кражу работы Если вам нужен WSQ для многопоточности, я бы порекомендовал это.
Если вы действительно ищете источник, я не знаю, указан ли код в ppl.h или есть предварительно скомпилированный объект; Мне придется проверить, когда я вернусь домой сегодня вечером.

0 голосов
/ 30 декабря 2014

Я перенес этот проект C на C ++.

Оригинал Steal может испытывать грязное чтение при расширении массива. Я попытался исправить ошибку, но в конце концов сдался, потому что мне фактически не нужен динамически растущий стек. Вместо попытки выделить место метод Push просто возвращает false. Затем вызывающий абонент может выполнить ожидание вращения, то есть while(!stack->Push(value)){}.

#pragma once
#include <atomic>

  // A lock-free stack.
  // Push = single producer
  // Pop = single consumer (same thread as push)
  // Steal = multiple consumer

  // All methods, including Push, may fail. Re-issue the request
  // if that occurs (spinwait).

  template<class T, size_t capacity = 131072>
  class WorkStealingStack {

  public:
    inline WorkStealingStack() {
      _top = 1;
      _bottom = 1;
    }

    WorkStealingStack(const WorkStealingStack&) = delete;

    inline ~WorkStealingStack()
    {

    }

    // Single producer
    inline bool Push(const T& item) {
      auto oldtop = _top.load(std::memory_order_relaxed);
      auto oldbottom = _bottom.load(std::memory_order_relaxed);
      auto numtasks = oldbottom - oldtop;

      if (
        oldbottom > oldtop && // size_t is unsigned, validate the result is positive
        numtasks >= capacity - 1) {
        // The caller can decide what to do, they will probably spinwait.
        return false;
      }

      _values[oldbottom % capacity].store(item, std::memory_order_relaxed);
      _bottom.fetch_add(1, std::memory_order_release);
      return true;
    }

    // Single consumer
    inline bool Pop(T& result) {

      size_t oldtop, oldbottom, newtop, newbottom, ot;

      oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
      ot = oldtop = _top.load(std::memory_order_acquire);
      newtop = oldtop + 1;
      newbottom = oldbottom - 1;

      // Bottom has wrapped around.
      if (oldbottom < oldtop) {
        _bottom.store(oldtop, std::memory_order_relaxed);
        return false;
      }

      // The queue is empty.
      if (oldbottom == oldtop) {
        _bottom.fetch_add(1, std::memory_order_release);
        return false;
      }

      // Make sure that we are not contending for the item.
      if (newbottom == oldtop) {
        auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
        if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
          _bottom.fetch_add(1, std::memory_order_release);
          return false;
        }
        else {
          result = ret;
          _bottom.store(newtop, std::memory_order_release);
          return true;
        }
      }

      // It's uncontended.
      result = _values[newbottom % capacity].load(std::memory_order_acquire);
      return true;
    }

    // Multiple consumer.
    inline bool Steal(T& result) {
      size_t oldtop, newtop, oldbottom;

      oldtop = _top.load(std::memory_order_acquire);
      oldbottom = _bottom.load(std::memory_order_relaxed);
      newtop = oldtop + 1;

      if (oldbottom <= oldtop)
        return false;

      // Make sure that we are not contending for the item.
      if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
        return false;
      }

      result = _values[oldtop % capacity].load(std::memory_order_relaxed);
      return true;
    }

  private:

    // Circular array
    std::atomic<T> _values[capacity];
    std::atomic<size_t> _top; // queue
    std::atomic<size_t> _bottom; // stack
  };

Full Gist (включая модульные тесты). Я проводил тесты только на сильной архитектуре (x86 / 64), поэтому, если использовать слабую архитектуру, ваш пробег может варьироваться, если вы попытаетесь использовать это например Неон / PPC.

...