Реализация списка заданий с внутренней синхронизацией - PullRequest
7 голосов
/ 15 января 2010

Я работаю над простой структурой потоков работы, которая очень похожа на описанную в id Tech 5 Challenges . На самом базовом уровне у меня есть набор списков заданий, и я хочу запланировать эти списки для нескольких потоков ЦП (используя стандартный пул потоков для фактической диспетчеризации.) Однако мне интересно, как этот сигнал / ожидание вещи внутри списка ожидания могут быть эффективно реализованы. Насколько я понимаю, токен ожидания блокирует выполнение списка, если токен сигнала не был выполнен. Это неявно означает, что все, прежде чем сигнал должен закончиться, прежде чем сигнал может быть поднят. Допустим, у нас есть такой список:

J1, J2, S, J3, W, J4

тогда отправка может идти так:

#1: J1, J2, J3
<wait for J1, J2, run other lists if possible>
#2: J4

Однако, это не так просто, как кажется, учитывая набор списков, мне пришлось бы перемещать некоторые из них между ready и waiting, а также иметь специальный код для сбора всех заданий перед сигналом. и пометить что-нибудь на них, чтобы они могли инициировать сигнал тогда и только тогда, когда они все закончили (например, это означает, что больше нельзя добавлять задания в список , пока выполняется, как следующие сигналы). получить доступ к ранее вставленным заданиям.)

Существует ли какой-либо "стандартный" способ эффективной реализации этого? Мне также интересно, как лучше спланировать выполнение списка заданий, прямо сейчас, каждое ядро ​​захватывает список заданий и планирует все задания в нем, что дает довольно хорошее масштабирование (для заданий 32 тыс. С 0,7 мс я получаю 101%, что, я думаю отчасти из-за того, что однопоточная версия запланирована на разные ядра несколько раз.)

Ответы [ 2 ]

4 голосов
/ 23 января 2010

Это относительно простой алгоритм планирования. Пару проблем на первый взгляд кажется сложным, но на самом деле это не так (сигнал / ожидание и локальность кэша). Я объясню методы, затем приведу некоторый код, который я написал, чтобы проиллюстрировать концепции, а затем дам несколько заключительных замечаний по настройке.

Алгоритмы для использования

Эффективная обработка сигнала / ожидания на первый взгляд кажется сложной, но на самом деле оказывается чрезвычайно простой. Поскольку пары сигнал / ожидание не могут быть вложенными или перекрывающимися, на самом деле могут быть удовлетворены только две из них и одна ожидает в любой момент времени. Простое удержание указателя «CurrentSignal» на самом последнем неудовлетворенном сигнале - все, что необходимо для ведения учета.

Убедиться в том, что ядра не перепрыгивают между списками слишком часто и что данный список не распределяется между слишком многими ядрами, также относительно легко: каждое ядро ​​продолжает получать задания из одного и того же списка, пока не заблокируется, а затем переключается на другой список. Чтобы все ядра не объединялись в один список, для каждого списка сохраняется WorkerCount, в котором указывается, сколько ядер его используют, и списки организованы таким образом, чтобы ядра выбирали списки с меньшим количеством рабочих.

Блокировку можно упростить, заблокировав только планировщик или список, над которым вы работаете, в любое время, но не оба одновременно.

Вы выразили некоторую обеспокоенность по поводу добавления заданий в список после того, как список уже начал выполняться. Оказывается, поддержка этого почти тривиальна: все, что нужно - это вызов из списка в планировщик, когда задание добавляется в список, который в настоящее время завершен, поэтому планировщик может планировать новое задание.

Структуры данных

Вот основные структуры данных, которые вам понадобятся:

class Scheduler
{
  LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
  LinkedList<JobList> Blocked;
  int ReadyCount;
  bool Exit;

  public:
    void AddList(JobList* joblist);
    void DoWork();

  internal:
    void UpdateQueues(JobList* joblist);

    void NotifyBlockedCores();
    void WaitForNotifyBlockedCores();
}

class JobList
{
  Scheduler Scheduler;
  LinkedList<JobList> CurrentQueue;

  LinkedList<Job> Jobs;            // All jobs in the job list
  LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
                                      plus a dummy

  Job* NextJob;                    // The next job to schedule, if any
  int NextJobIndex;                // The index of NextJob

  SignalPoint* CurrentSignal;      // First signal not fully satisfied

  int WorkerCount;                 // # of cores executing in this list

  public:
    void AddJob(Job* job);
    void AddSignal();
    void AddWait();

  internal:
    void Ready { get; }
    void GetNextReadyJob(Job& job, int& jobIndex);
    void MarkJobCompleted(Job job, int jobIndex);
}
class SignalPoint
{
  int SignalJobIndex = int.MaxValue;
  int WaitJobIndex = int.MaxValue;
  int IncompleteCount = 0;
}

Обратите внимание, что сигнальные точки для данного списка заданий удобнее всего хранить отдельно от фактического списка заданий.

Реализация планировщика

Планировщик отслеживает списки заданий, назначает их ядрам и выполняет задания из списков заданий.

AddList добавляет задание в планировщик. Он должен быть помещен в очередь «Готов» или «Заблокирован» в зависимости от того, есть ли у него какая-либо работа (т. Е. Были ли еще добавлены какие-либо задания), поэтому просто вызовите UpdateQueues.

void Scheduler.AddList(JobList* joblist)
{
  joblist.Scheduler = this;
  UpdateQueues(joblist);
}

UpdateQueues централизует логику обновления очереди. Обратите внимание на алгоритм выбора новой очереди, а также уведомление для незанятых ядер, когда работа становится доступной:

void Scheduler.UpdateQueues(JobList* joblist)
{
  lock(this)
  {
    // Remove from prior queue, if any
    if(joblist.CurrentQueue!=null)
    {
      if(joblist.CurrentQueue!=Blocked) ReadyCount--;
      joblist.CurrentQueue.Remove(joblist);
    }

    // Select new queue
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;

    // Add to new queue
    joblist.CurrentQueue.Add(joblist);
    if(joblist.CurrentQueue!=Blocked)
      if(++ReadyCount==1) NotifyBlockedCores();
  }
}

DoWork - это обычная работа планировщика, за исключением: 1. Он выбирает JobList с наименьшим количеством работников, 2. Он обрабатывает задания из заданного списка заданий до тех пор, пока не может больше этого делать, и 3. Он сохраняет jobIndex, а также задание, чтобы список заданий мог легко обновлять состояние завершения (подробности реализации).

void Scheduler.DoWork()
{
  while(!Exit)
  {
    // Get a job list to work on
    JobList *list = null;
    lock(this)
    {
      for(int i=0; i<Ready.Length; i++)
        if(!Ready[i].Empty)
        {
          list = Ready[i].First;
          break;
        }
      if(list==null)  // No work to do
      {
        WaitForNotifyBlockedCores();
        continue;
      }
      list.WorkerCount++;
      UpdateQueues(list);
    }

    // Execute jobs in the list as long as possible
    while(true)
    {
      int jobIndex;
      Job job;
      if(!GetNextReadyJob(&job, &jobIndex)) break;

      job.Execute();

      list.MarkJobCompleted(job, jobIndex);
    }

    // Release the job list
    lock(this)
    {
      list.WorkerCount--;
      UpdateQueues(list);
    }
  }
}

Реализация JobList

JobList отслеживает, как сигнал / ожидание перемежаются с заданиями, и отслеживает, какие пары сигнал / ожидание уже завершили все до того, как их точка сигнала.

Конструктор создает фиктивную сигнальную точку для добавления заданий. Эта сигнальная точка становится реальной сигнальной точкой (и добавляется новый манекен) всякий раз, когда добавляется новый «сигнал».

JobList.JobList()
{
  // Always have a dummy signal point at the end
  Signals.Add(CurrentSignal = new SignalPoint());
}

AddJob добавляет работу в список. Он отмечен как неполный в SignalPoint. Когда задание фактически выполняется, IncompleteCount того же SignalPoint уменьшается. Также необходимо сообщить планировщику, что все могло измениться, поскольку новое задание может быть немедленно выполнено. Обратите внимание, что планировщик вызывается после снятия блокировки «this», чтобы избежать тупика.

void JobList.AddJob(Job job)
{
  lock(this)
  {
    Jobs.Add(job);
    Signals.Last.IncompleteCount++;
    if(NextJob == null)
      NextJob = job;
  }
  if(Scheduler!=null)
    Scheduler.UpdateQueues(this);
}

AddSignal и AddWait добавляют сигналы и ждут в списке заданий. Обратите внимание, что AddSignal фактически создает новую SignalPoint, а AddWait просто заполняет информацию о точке ожидания в ранее созданном SignalPoint.

void JobList.AddSignal()
{
  lock(this)
  {
    Signals.Last.SignalJobIndex = Jobs.Count;  // Reify dummy signal point
    Signals.Add(new SignalPoint());            // Create new dummy signal point
  }
}


void JobList.AddWait()
{
  lock(this)
  {
    Signals.Last.Previous.WaitJobIndex = Jobs.Count;
  }
}

Свойство Ready определяет, готов ли список для назначенных ему дополнительных ядер. Может быть два или три ядра, работающие над списком без того, чтобы список был «готов», если следующее задание ожидает сигнала, прежде чем оно может начаться.

bool JobList.Ready
{
  get
  {
    lock(this)
    {
      return NextJob!=null &&
        (CurrentSignal==Signals.Last ||
         NextJobIndex < CurrentSignal.WaitJobIndex);
    }
  }
}

GetNextReadyJob очень прост: если мы готовы, просто верните следующую работу в списке.

void JobList.GetNextReadyJob(Job& job, int& jobIndex)
{
  lock(this)
  {
    if(!Ready) return false;
    jobIndex = list.NextJobIndex++;
    job = list.NextJob; list.NextJob = job.Next;
    return true;

  }
}

MarkJobCompleted, пожалуй, самый интересный из всех. Из-за структуры сигналов и ожиданий текущее задание находится либо перед CurrentSignal, либо между CurrentSignal и CurrentSignal.Next (если оно находится после последнего фактического сигнала, оно будет посчитано как находящееся между CurrentSignal и фиктивной SignalPoint в конце ). Нам нужно уменьшить количество незавершенных работ. Возможно, нам также понадобится перейти к следующему сигналу, если этот счетчик обнуляется. Конечно, мы никогда не передаем фиктивную SignalPoint в конце.

Обратите внимание, что этот код не имеет вызова Scheduler.UpdateQueue, потому что мы знаем, что планировщик будет вызывать GetNextReadyJob в течение секунды, и если он вернет false, он все равно будет вызывать UpdateQueue.

void JobList.MarkJobCompleted(Job job, int jobIndex)
{
  lock(this)
  {
    if(jobIndex >= CurrentSignal.SignalJobIndex)
      CurrentSignal.Next.IncompleteCount--;
    else
    {
      CurrentSignal.IncompleteCount--;
      if(CurrentSignal.IncompleteCount==0)
        if(CurrentSignal.WaitJobIndex < int.MaxValue)
          CurrentSignal = CurrentSignal.Next;
    }
  }
}

Настройка на основе длины списка, оценки длины задания и т. Д.

Приведенный выше код не обращает никакого внимания на длину списков заданий, поэтому, если имеется сотня крошечных списков заданий и один огромный, каждое ядро ​​может взять отдельный крошечный список заданий, а затем все собраться на огромном, что приводит к неэффективности. Эту проблему можно решить, сделав Ready [] массивом приоритетных очередей с приоритетом (joblist.Jobs.Count - joblist.NextJobIndex), но с приоритетом, который фактически обновляется только в обычных ситуациях UpdateQueue для повышения эффективности.

Это может стать еще сложнее, если создать эвристику, учитывающую количество и интервал комбинаций сигнал / ожидание для определения приоритета. Эту эвристику лучше всего настроить, используя распределение продолжительности работы и использование ресурсов.

Если известны длительности отдельных заданий или для них имеются хорошие оценки, то эвристика может использовать расчетную оставшуюся продолжительность вместо просто длины списка.

Заключительные замечания

Это довольно стандартное решение проблемы, которую вы представляете. Вы можете использовать алгоритмы, которые я дал, и они будут работать, включая блокировку, но вы не сможете скомпилировать код, который я написал выше, по нескольким причинам:

  1. Это безумное сочетание синтаксиса C ++ и C #. Первоначально я начал писать на C #, а затем изменил синтаксис на стиль C ++, так как думал, что это более вероятно, что вы будете использовать для такого проекта. Но я оставил довольно много C # -измов. К счастью, нет LINQ; -).

  2. Детали LinkedList немного помахали руками. Я предполагаю, что список может выполнять «Первый», «Последний», «Добавить» и «Удалить», а элементы списка могут выполнять «Предыдущий» и «Следующий». Но я не использовал реальный API для какого-либо реального класса связанного списка, о котором я знаю.

  3. Я не скомпилировал и не протестировал его. Я гарантирую, что где-то есть ошибка или два.

Итог: Вы должны рассматривать приведенный выше код как псевдокод, даже если он выглядит как настоящий Маккой.

Наслаждайтесь!

1 голос
/ 25 января 2010

Если у вас есть доступ к рабочей краже в вашей среде (например, Cilk, если вы находитесь в C, или fork / join Framework Дуга Ли в Java), вы можете легко получить простое и чистое решение (по сравнению с низкоуровневыми специальными попытками, которые вам, вероятно, придется делать, если вы не можете использовать что-то подобное), которые обеспечивают автоматическую балансировку нагрузки и хорошие местонахождение данных.

Вот высокоуровневое описание решения: вы запускаете один поток на ядро. Каждому из них присваивается список до тех пор, пока они не будут исчерпаны (множество способов сделать это - это задача очень хороших параллельных механизмов очередей, и именно поэтому вы хотели бы избегать самостоятельных решений, если это возможно). Каждый работник проходит по строкам списков один за другим: - Поддерживаются две очереди: одна для этих заданий до токена signal и одна или после него. - При обнаружении задания оно разветвляется и добавляется в соответствующую очередь (в зависимости от того, видели ли мы токен signal или нет) - При обнаружении токена wait мы объединяем все задания перед сигналом (это семантика, которую вы опишите, если я правильно понял). Обратите внимание, что в коде я использую helpJoin(), это означает, что поток действительно поможет (вытолкнув разветвленные задачи и выполняя их до тех пор, пока соединение не может продолжаться)

«Форк» означает помещение задачи в локальную очередь потока, которая либо будет выполнена самим потоком позже, либо может быть украдена другим потоком, который ищет какую-то работу.

В целях иллюстрации, здесь приведено рабочее моделирование ~ 80 строк этого сценария с использованием вышеупомянутого Java-фреймворка. Он создает столько потоков, сколько доступных ядер и несколько списков, и начинает их выполнение. Обратите внимание, насколько прост метод run (), хотя он все еще обладает преимуществами балансировки нагрузки, и потоки в основном выполняют задачи из своего собственного списка, если только они не заканчиваются и не запускают кражу, чтобы получить их. Конечно, если вы не находитесь на Java или C, вам придется найти похожую платформу, но тот же набор основных идей также упростит ваш код независимо от языка.

import java.util.*;
import java.util.concurrent.*;
import jsr166y.ForkJoinPool;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

public class FJTest {
    public static void main(String[] args) throws Exception {
        Iterable<List<TaskType>> lists = createLists(10);

        ForkJoinPool pool = new ForkJoinPool();

        for (final List<TaskType> list : lists) {
            pool.submit(new Runnable() {
                public void run() {
                    List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>();
                    List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>();
                    boolean signaled = false;
                    for (TaskType task : list) {
                        switch (task) {
                            case JOB:
                                ForkJoinTask job = new Job();
                                if (signaled == false)
                                    beforeSignal.add(job);
                                else
                                    afterSignal.add(job);
                                job.fork();
                                break;
                            case SIGNAL:
                                signaled = true;
                                break;
                            case WAIT:
                                signaled = false;
                                for (ForkJoinTask t : beforeSignal) {
                                    t.helpJoin();
                                }
                                beforeSignal = afterSignal;
                                afterSignal = new ArrayList<ForkJoinTask>();
                        }
                    }
                }
            });
        }

        pool.shutdown();
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    }

    private static Iterable<List<TaskType>> createLists(int size) {
        List<List<TaskType>> tasks = new ArrayList<List<TaskType>>();
        for (int i = 0; i < size; i++) {
            tasks.add(createSomeList());
        }
        return tasks;
    }

    private static List<TaskType> createSomeList() {
        return Arrays.asList(
                TaskType.JOB,
                TaskType.JOB,
                TaskType.SIGNAL,
                TaskType.JOB,
                TaskType.WAIT,
                TaskType.JOB);
    }

}

enum TaskType {
    JOB, SIGNAL, WAIT;
}
class Job extends RecursiveTask<Void> {
    @Override
    protected Void compute() {
        long x = 1;
        for (long i = 1; i < 200000001; i++) {
            x = i * x;
        }
        System.out.println(x); //just to use x
        return null;
    }
}
...