Как я могу запустить поток SQL-запроса, а затем выполнить другую работу, прежде чем получить результаты? - PullRequest
6 голосов
/ 20 октября 2010

У меня есть программа, которая выполняет ограниченную форму многопоточности.Он написан на Delphi и использует libmysql.dll (C API) для доступа к серверу MySQL.Программа должна обработать длинный список записей, занимая ~ 0,1 с на запись.Думайте об этом как одна большая петля.Весь доступ к базе данных осуществляется рабочими потоками, которые либо предварительно выбирают следующие записи, либо записывают результаты, поэтому основному потоку не нужно ждать.

В верхней части этого цикла мы сначала ждем поток предварительной выборки,получить результаты, а затем сделать так, чтобы поток предварительной выборки выполнил запрос для следующей записи.Идея состоит в том, что поток предварительной выборки немедленно отправит запрос и будет ждать результатов, пока основной поток завершит цикл.

Это часто так работает.Но обратите внимание, что нет ничего, чтобы гарантировать, что поток предварительной выборки запускается сразу.Я обнаружил, что часто запрос не отправлялся до тех пор, пока основной поток не зациклился и не начал ждать предварительной выборки.

Я исправил это, вызвав sleep (0) сразу после запуска потока предварительной выборки.Таким образом, основной поток сдает остаток своего временного интервала, надеясь, что поток предварительной выборки теперь будет запущен, отправляя запрос.Затем этот поток будет находиться в режиме ожидания во время ожидания, что позволяет основному потоку снова запускаться.
Конечно, в ОС запущено гораздо больше потоков, но на самом деле это сработало в некоторой степени.

Я действительно хочу, чтобы основной поток отправил запрос, а затем рабочий поток дождался результатов.Используя libmysql.dll, я вызываю

result := mysql_query(p.SqlCon,pChar(p.query));

в рабочем потоке.Вместо этого я хотел бы, чтобы основной поток вызывал что-то вроде

mysql_threadedquery(p.SqlCon,pChar(p.query),thread);

, что бы выполнить задачу, как только данные вышли.

Кто-нибудь знает что-нибудь подобное?

Это действительно проблема планирования, поэтому я мог бы попробовать запустить поток предварительной выборки с более высоким приоритетом, а затем уменьшить его приоритет после отправки запроса.Но, опять же, у меня нет вызова mysql, который отделяет отправку запроса от получения результатов.

Возможно, он есть, и я просто не знаю об этом.Просветите меня, пожалуйста.

Добавлен вопрос:

Кто-нибудь думает, что эту проблему можно решить, запустив поток предварительной выборки с более высоким приоритетом, чем основной поток?Идея состоит в том, что предварительная выборка немедленно выгрузит основной поток и отправит запрос.Тогда он будет спать в ожидании ответа сервера.Тем временем основной поток будет работать.

Добавлено: Подробная информация о текущей реализации

Эта программа выполняет вычисления для данных, содержащихся в БД MySQL.Есть 33M предметов с добавлением каждую секунду.Программа работает непрерывно, обрабатывая новые элементы, а иногда и повторно анализируя старые элементы.Он получает список элементов для анализа из таблицы, поэтому в начале прохода (текущий элемент) он знает следующий идентификатор элемента, который ему понадобится.

Поскольку каждый элемент независим, это идеальная цельдля многопроцессорной обработки.Самый простой способ сделать это - запустить несколько экземпляров программы на нескольких машинах.Программа высоко оптимизирована за счет профилирования, переписывания и редизайна алгоритма.Тем не менее, один экземпляр использует 100% ядра процессора, когда не требуется данных.Я запускаю 4-8 копий на двух четырехъядерных рабочих станциях.Но с такой скоростью они должны тратить время на ожидание на сервере MySQL.(Оптимизация схемы Сервер / БД - это еще одна тема.)

Я реализовал многопоточность в этом процессе исключительно во избежание блокировки вызовов SQL.Вот почему я назвал это «ограниченная многопоточность».У рабочего потока есть одна задача: отправить команду и дождаться результатов.(ОК, две задачи.)

Оказывается, есть 6 задач блокировки, связанных с 6 таблицами.Два из них читают данные, а остальные 4 записывают результаты.Они достаточно похожи, чтобы их можно было определить общей структурой задач.Указатель на эту задачу передается менеджеру пула потоков, который назначает поток для выполнения работы.Основной поток может проверить состояние задачи через структуру Задачи.

Это делает код основного потока очень простым.Когда ему нужно выполнить задачу 1, он ждет, пока задача 1 не будет занята, помещает команду SQL в задачу 1 и передает ее.Когда Task1 больше не занят, он содержит результаты (если есть).

4 задачи, которые пишут результаты, тривиальны.В главном потоке есть задача записи записей, пока он переходит к следующему элементу.Когда закончите с этим элементом, убедитесь, что предыдущая запись завершена, прежде чем начинать другую.

2 потока чтения менее тривиальны.Ничего не получится, если передать чтение потоку, а затем ждать результатов.Вместо этого эти задачи предварительно выбирают данные для следующего элемента.Таким образом, основной поток, приходя к этой задаче блокировки, проверяет, выполнена ли предварительная выборка;При необходимости ожидает завершения предварительной выборки, затем получает данные из Задачи.Наконец, он перезапускает задачу с идентификатором NEXT Item.

Идея состоит в том, чтобы задача предварительной выборки немедленно выдавала запрос и ожидала сервера MySQL.Затем основной поток может обработать текущий элемент и ко времени запуска следующего элемента необходимые данные уже находятся в задаче предварительной выборки.

Таким образом, многопоточность, пул потоков, синхронизация, структуры данных и т. Д.все сделано.И это все работает.У меня осталась проблема планирования.

Проблема планирования такова: весь выигрыш в скорости происходит при обработке текущего элемента, пока сервер выбирает следующий элемент.Мы выполняем задачу предварительной выборки перед обработкой текущего элемента, но как мы можем гарантировать, что она запускается?Планировщик ОС не знает, что для задачи предварительной выборки важно сразу же выполнить запрос, а затем он ничего не делает, кроме как ждет.

Планировщик ОС пытается быть «честным» и разрешать каждой задачебежать за назначенный интервал времени.Мой худший случай таков: основной поток получает свой фрагмент и выдает предварительную выборку, затем завершает текущий элемент и должен ждать следующего элемента.Ожидание освобождает оставшуюся часть времени, поэтому планировщик запускает поток предварительной выборки, который выдает запрос, а затем ожидает.Теперь обе темы ждут.Когда сервер сообщает, что запрос завершен, поток предварительной выборки перезапускается и запрашивает результаты (набор данных), а затем переходит в спящий режим.Когда сервер предоставляет результаты, поток предварительной выборки просыпается, помечает задачу как выполненную и завершается.Наконец, основной поток перезапускается и получает данные из завершенного задания.

Чтобы избежать этого наихудшего планирования, мне нужен какой-то способ убедиться, что запрос предварительной выборки выполняется до того, как основной поток продолжит работу с текущим элементом.,До сих пор я думал о трех способах сделать это:

  1. Сразу после выполнения задачи предварительной выборки основной поток вызывает Sleep (0).Это должно освободить оставшуюся часть времени.Затем я надеюсь , что планировщик запускает поток предварительной выборки, который выдаст запрос и затем подождет.Затем планировщик должен перезапустить основной поток (я надеюсь.) Как бы плохо это ни звучало, на самом деле это работает лучше, чем ничего.

  2. Я мог бы выдать поток предварительной выборки с более высоким приоритетом, чемосновная нить.Это должно привести к тому, что планировщик запустит его сразу, даже если он должен выгружать основной поток.Это также может иметь нежелательные последствия.Для фонового рабочего потока кажется неестественным получение более высокого приоритета.

  3. Возможно, я могу выполнить запрос асинхронно.То есть отдельная отправка запроса от получения результатов.Таким образом, основной поток мог отправить предварительную выборку с помощью mysql_send_query (без блокировки) и продолжить работу с текущим элементом.Затем, когда ему понадобится следующий элемент, он вызовет mysql_read_query, который будет блокироваться, пока не будут доступны данные.

Обратите внимание, что решение 3 даже не использует рабочий поток.Это выглядит как лучший ответ, но требует переписать некоторый низкоуровневый код.В настоящее время я ищу примеры такого асинхронного доступа клиент-сервер.

Я также хотел бы получить какие-либо опытные мнения об этих подходах.Я что-то пропустил или я делаю что-то не так?Обратите внимание, что это все рабочий код.Я не спрашиваю, как это сделать, но как это сделать лучше / быстрее.

Ответы [ 4 ]

4 голосов
/ 25 октября 2010

Тем не менее, один экземпляр использует 100% ядра ЦП, когда не требуется данных. Я запускаю 4-8 копий на двух четырехъядерных рабочих станциях.

У меня здесь концептуальная проблема. В вашей ситуации я бы либо создал многопроцессорное решение, в котором каждый процесс делал бы все в одном потоке, либо я создал бы многопоточное решение, ограниченное одним экземпляром на любой конкретной машине. Как только вы решите работать с несколькими потоками и примете дополнительную сложность и вероятность трудно исправляемых ошибок, вы должны максимально использовать их. Использование одного процесса с несколькими потоками позволяет использовать различное количество потоков для чтения и записи в базу данных и для обработки ваших данных. Количество потоков может даже измениться во время выполнения вашей программы, и соотношение базы данных и потоков обработки также может измениться. Этот вид динамического разделения работы возможен только в том случае, если вы можете управлять всеми потоками из одной точки программы, что невозможно для нескольких процессов.

Я реализовал многопоточность только для того, чтобы избежать блокировки вызовов SQL.

С несколькими процессами не было бы реальной необходимости делать это. Если ваши процессы связаны с вводом-выводом в течение некоторого времени, они не потребляют ресурсы процессора, поэтому вам, вероятно, просто нужно запустить их больше, чем у вашей машины есть ядра. Но тогда у вас есть проблема, чтобы узнать, сколько процессов порождать, и это может снова измениться со временем, если машина выполняет другую работу. Потоковое решение в одном процессе может быть адаптировано к изменяющейся среде относительно простым способом.

Итак, потоки, пул потоков, синхронизация, структуры данных и т. Д. Все готово. И это все работает. У меня осталась проблема с расписанием.

Который вы должны оставить для ОС. Просто есть один процесс с необходимыми объединенными потоками. Примерно так:

  • Ряд потоков читает записи из базы данных и добавляет их в очередь производителя-потребителя с верхней границей, которая находится где-то между N и 2 * N где N - количество ядер процессора в системе. Эти потоки будут блокироваться в полной очереди, и они могут иметь повышенный приоритет, так что они будут запланированы для запуска, как только в очереди появится больше места, и они станут разблокированными. Поскольку в большинстве случаев они будут блокироваться при вводе-выводе, их более высокий приоритет не должен быть проблемой.
    Я не знаю, какое это количество потоков, вам нужно измерить.

  • Количество потоков обработки, вероятно, по одному на ядро ​​процессора в системе. Они будут получать рабочие элементы из очереди, упомянутой в предыдущем пункте, в блоке этой очереди, если он пуст. Обработанные рабочие элементы должны идти в другую очередь.

  • Количество потоков, которые берут обработанные рабочие элементы из второй очереди и записывают данные обратно в базу данных. Вероятно, должна быть верхняя граница и для второй очереди, чтобы она не записывала обработанные данные обратно в базу данных и не приводила к тому, что обработанные данные накапливались и заполняли все пространство памяти вашего процесса.

Необходимо определить количество потоков, но все планирование будет выполняться планировщиком ОС. Главное - иметь достаточно потоков, чтобы использовать все ядра ЦП, и необходимое количество вспомогательных потоков, чтобы они были заняты и обрабатывали свои выводы. Если эти потоки происходят из пулов, вы также можете изменять их номера во время выполнения.

Библиотека Omni Thread имеет решение для задач, пулов задач, очередей потребителя производителей и всего остального, что вам потребуется для реализации этого. В противном случае вы можете написать свои собственные очереди, используя мьютексы.

Проблема планирования заключается в следующем: весь выигрыш в скорости происходит при обработке текущего элемента, пока сервер выбирает следующий элемент.Мы выполняем задачу предварительной выборки перед обработкой текущего элемента, но как мы можем гарантировать, что он запускается?

Придавая ему более высокий приоритет.

Планировщик ОС незнать, что для задачи предварительной выборки важно сразу выполнить запрос

Он будет знать, имеет ли поток более высокий приоритет.

Планировщик ОС пытается«справедливый» и разрешить выполнение каждой задачи в течение назначенного интервала времени.

Только для потоков с одинаковым приоритетом.Никакой поток с более низким приоритетом не получит никакого фрагмента ЦП, в то время как поток с более высоким приоритетом в том же процессе работает.
[Редактировать: Это не совсем верно, больше информации в конце.Однако это достаточно близко к истине, чтобы гарантировать, что сетевые потоки с более высоким приоритетом отправляют и получают данные как можно скорее.]

  1. Сразу после выполнения задачи предварительной выборки,Основной поток вызывает Sleep (0).

Вызов Sleep() - это плохой способ заставить потоки выполняться в определенном порядке.Установите приоритет потока в соответствии с приоритетом выполняемой ими работы и используйте примитивы ОС, чтобы блокировать потоки с более высоким приоритетом, если они не должны запускаться.

Я мог бы выдать поток предварительной выборки с более высоким приоритетом, чемосновная нить.Это должно привести к тому, что планировщик запустит его сразу, даже если он должен выгружать основной поток.Это также может иметь нежелательные последствия.Для фонового рабочего потока кажется неестественным получение более высокого приоритета.

В этом нет ничего противоестественного.Это предполагаемый способ использования потоков.Вы только должны убедиться, что потоки с более высоким приоритетом блокируются рано или поздно, и любой поток, который идет в ОС для ввода / вывода (файл или сеть), блокирует.В схеме, которую я набросал выше, потоки с высоким приоритетом также будут блокировать очереди.

Я мог бы выполнить запрос асинхронно.

Я бы туда не пошел.Эта методика может быть необходима, когда вы пишете сервер для множества одновременных подключений, а поток для каждого подключения чрезмерно дорог, но в противном случае блокировка доступа к сети в многопоточном решении должна работать нормально.

Редактировать:

Спасибо Jeroen Pluimers за то, что выглядишь поближе к этому.Поскольку информация в ссылках, которые он дал в своем комментарии, показывает мое утверждение

Никакой поток с более низким приоритетом не получит какой-либо фрагмент ЦП, в то время как поток с более высоким приоритетом в том же процессе работает.

не соответствует действительности.Потоки с более низким приоритетом, которые не работали в течение длительного времени, получают случайное повышение приоритета и действительно рано или поздно получат долю ЦП, даже если потоки с более высоким приоритетом работают.Для получения дополнительной информации об этом см., В частности, «Приоритет инверсии и планировщик Windows NT» .

Чтобы проверить это, я создал простую демонстрацию с Delphi:

type
  TForm1 = class(TForm)
    Label1: TLabel;
    Label2: TLabel;
    Label3: TLabel;
    Label4: TLabel;
    Label5: TLabel;
    Label6: TLabel;
    Timer1: TTimer;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure Timer1Timer(Sender: TObject);
  private
    fLoopCounters: array[0..5] of LongWord;
    fThreads: array[0..5] of TThread;
  end;

var
  Form1: TForm1;

implementation

{$R *.DFM}

// TTestThread

type
  TTestThread = class(TThread)
  private
    fLoopCounterPtr: PLongWord;
  protected
    procedure Execute; override;
  public
    constructor Create(ALowerPriority: boolean; ALoopCounterPtr: PLongWord);
  end;

constructor TTestThread.Create(ALowerPriority: boolean;
  ALoopCounterPtr: PLongWord);
begin
  inherited Create(True);
  if ALowerPriority then
    Priority := tpLower;
  fLoopCounterPtr := ALoopCounterPtr;
  Resume;
end;

procedure TTestThread.Execute;
begin
  while not Terminated do
    InterlockedIncrement(PInteger(fLoopCounterPtr)^);
end;

// TForm1

procedure TForm1.FormCreate(Sender: TObject);
var
  i: integer;
begin
  for i := Low(fThreads) to High(fThreads) do
//    fThreads[i] := TTestThread.Create(True, @fLoopCounters[i]);
    fThreads[i] := TTestThread.Create(i >= 4, @fLoopCounters[i]);
end;

procedure TForm1.FormDestroy(Sender: TObject);
var
  i: integer;
begin
  for i := Low(fThreads) to High(fThreads) do begin
    if fThreads[i] <> nil then
      fThreads[i].Terminate;
  end;
  for i := Low(fThreads) to High(fThreads) do
    fThreads[i].Free;
end;

procedure TForm1.Timer1Timer(Sender: TObject);
begin
  Label1.Caption := IntToStr(fLoopCounters[0]);
  Label2.Caption := IntToStr(fLoopCounters[1]);
  Label3.Caption := IntToStr(fLoopCounters[2]);
  Label4.Caption := IntToStr(fLoopCounters[3]);
  Label5.Caption := IntToStr(fLoopCounters[4]);
  Label6.Caption := IntToStr(fLoopCounters[5]);
end;

Это создает 6 потоков (на моем 4-ядерном компьютере), либо все с более низким приоритетом, либо 4 с нормальным и 2 с более низким приоритетом.В первом случае все 6 потоков работают, но с разной долей процессорного времени:

6 threads with lower priority

Во втором случае 4 потока работают с примерно равной долей процессорного времени, нодругие два потока также получают небольшую долю ЦП:

4 threads with normal, 2 threads with lower priority

Но доля времени ЦП очень очень мала, намного ниже процента того, что получают другие потоки.

И вернемся к вашему вопросу: жизнеспособным решением должна быть программа, использующая несколько потоков с пользовательским приоритетом, соединенные через очереди производителя-потребителя.В обычном случае потоки базы данных будут блокироваться большую часть времени либо в сетевых операциях, либо в очередях.А планировщик Windows позаботится о том, чтобы даже поток с более низким приоритетом не умер от голода полностью.

1 голос
/ 20 октября 2010

Я не знаю ни одного уровня доступа к базе данных, который бы разрешал это.

Причина в том, что каждый поток имеет свое собственное " локальное хранилище потока " (ключевое слово threadvar в Delphiдругие языки имеют эквиваленты, это используется во многих инфраструктурах).
Когда вы запускаете вещи в одном потоке, а продолжаете в другом, вы смешиваете эти локальные хранилища, вызывая разного рода хаос.

Лучшее, что вы можете сделать, это:

  1. передать запрос и параметры потоку, который будет обрабатывать это (используйте для этого стандартные механизмы синхронизации потоков Delphi)
  2. естьфактический поток запросов выполняет запрос
  3. возвращает результаты в основной поток (для этого используйте стандартные механизмы синхронизации потока Delphi)

Ответы на этот вопрос объясняет синхронизацию потока более подробно.

Редактировать: (при предполагаемой медлительности запуска чего-либо в другом потоке)

"Сразу"относительный термин: это зависит от того, как вы выполняете синхронизацию потоков, и может быть очень быстрым (т. е. менее миллисекунды).
Создание нового потока может занять некоторое время.
Решение состоит в том, чтобы иметь пул потоковдостаточно больших рабочих потоков, чтобы эффективно обслуживать разумное количество запросов.
Таким образом, если система еще не слишком загружена, у вас будет рабочий поток, готовый начать обслуживание вашего запроса почти сразу.

Я сделал это (даже перекрестный процесс) в большом аудиоприложении, которое требовало отклика с низкой задержкой, и оно работает как чудо.
Процесс аудиосервера выполняется с высоким приоритетом в ожидании запросов.Когда он простаивает, он не потребляет ЦП, но когда он получает запрос, он отвечает очень быстро.

Ответы на этот вопрос об изменениях с большими улучшениями и thisВопрос о межпотоковом взаимодействии предоставляет несколько интересных советов о том, как заставить работать это асинхронное поведение.
Найдите слова AsyncCalls, OmniThread и thread.

- jeroen

1 голос
/ 25 октября 2010

Я даю второй ответ, для вашей второй части вопроса: ваша Проблема планирования Это облегчает различение обоих ответов.

Прежде всего, вы должны прочитать Последствия алгоритма планирования: сон не всегда помогает , который является частью Raymond Chen blog ' The Old New Вещь ».
Сон против опроса также является хорошим чтением.
В основном все эти делают хорошее чтение.

Если я правильно понимаю вашу проблему планирования, у вас есть 3 вида потоков:

  1. Основной поток: гарантирует, что потокам извлечения всегда есть над чем работать
  2. Fetch Threads: (привязка к базе данных) выборка данных для потоков Threading
  3. Обработка потоков: (связанный с процессором) процесс извлекает данные

Единственный способ сохранить работу 3 - это получить 2 данных как можно больше.
Единственный способ сохранить 2 выборки - это предоставить 1 достаточно записей для выборки.

Вы можете использовать очереди для передачи данных между 1 и 2 и между 2 и 3.

Ваша проблема теперь двоякая:

  • нахождение баланса между количеством потоков в категории 2 и 3
  • убедившись, что у 2 всегда есть работа

Я думаю, что вы решили первое.
Последнее сводится к тому, что очередь между 1 и 2 никогда не бывает пустой.

Несколько трюков:

  • Вы можете использовать Sleep (1) (см. Статью блога) как простой способ «заставить» 2 работать
  • Никогда не позволяйте протекторам выходить из своего исполнения: создание и уничтожение потоков стоит дорого
  • тщательно выбирайте объекты синхронизации (часто называемые объектами IPC) ( Kudzu имеет хорошую статью на них)

- Йерун

0 голосов
/ 20 октября 2010

Вам просто нужно использовать стандартный механизм синхронизации потоков в потоке Delphi.

Проверьте справку IDE для класса TEvent и связанных с ним методов.

...