Лучший подход для очереди с синхронизацией потоков - PullRequest
3 голосов
/ 07 декабря 2009

У меня есть очередь, в которой я могу поставить в очередь разные потоки, поэтому я могу обеспечить две вещи:

  1. Запрос обрабатывается один за другим.
  2. Запрос обрабатывается в поступающем заказе

Второй момент важен. В противном случае достаточно простого критического раздела. У меня разные группы запросов, и только внутри одной группы эти пункты должны быть выполнены. Запросы из разных групп могут выполняться одновременно.

Это выглядит так:

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

РЕДАКТИРОВАТЬ : Я удалил фактическую реализацию, потому что она скрывает проблему, которую я хочу решить

Мне это нужно, потому что у меня есть веб-сервер на основе Indy, который принимает запросы http. Сначала я нахожу основную сессию для запроса. Затем запрос (код) выполняется для этого сеанса. Я могу получить несколько запросов на один и тот же сеанс (читай, я могу получать новые запросы, пока первый еще обрабатывается), и они должны выполняться по одному в правильном порядке поступления. Поэтому я ищу общую очередь синхронизации, которую можно использовать в таких ситуациях, чтобы запросы могли быть поставлены в очередь. Я не контролирую потоки, и каждый запрос может выполняться в другом потоке.

Каков наилучший (обычный) подход к решению проблемы такого рода? Проблема в том, что Enqueue и Dequeue должны быть атомарными операциями, чтобы правильный порядок сохранялся. Моя текущая реализация имеет существенное узкое место, но она работает.

РЕДАКТИРОВАТЬ : Сильфон - это проблема атомных операций постановки / снятия с очереди

Вы обычно делаете что-то вроде этого:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

Теперь проблема в том, что это не атомное. Если у вас уже есть один поток в очереди, а другой приходит и вызывает Enqueue, может случиться так, что второй поток просто покинет критическую секцию и попытается заблокировать себя. Теперь планировщик потока возобновит работу первого потока, который попытается разблокировать следующий (второй) поток. Но второй поток еще не заблокирован, поэтому ничего не происходит. Теперь второй поток продолжается и блокируется, но это не правильно, потому что он не будет разблокирован. Если блокировка находится внутри критической секции, то критическая секция никогда не закрывается, и мы заходим в тупик.

Ответы [ 3 ]

9 голосов
/ 07 декабря 2009

Другой подход:

Пусть каждый поток запроса имеет событие ручного сброса, которое изначально не установлено. Администратор очередей - это простой объект, который поддерживает потокобезопасный список таких событий. Оба метода Enqueue() и Dequeue() принимают событие потока запроса в качестве параметра.

type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

Каждый поток запросов вызывает Enqueue() в администраторе очередей и затем ждет, пока его собственное событие не станет сигнальным. Затем он обрабатывает запрос и вызывает Dequeue():

{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

Администратор очередей блокирует список событий как в Enqueue(), так и в Dequeue(). Если список пуст в Enqueue(), он устанавливает событие в параметре, иначе он сбрасывает событие. Затем он добавляет событие в список. Таким образом, первый поток может продолжить выполнение запроса, все остальные блокируются. В Dequeue() событие удаляется из верхней части списка, и устанавливается следующее событие (если оно есть).

Таким образом, последний поток запроса приведет к разблокировке следующего потока запроса, полностью без приостановки или возобновления потоков. Это решение также не требует никаких дополнительных потоков или окон, все, что нужно - это отдельный объект события на поток запроса.

2 голосов
/ 07 декабря 2009

Я отвечу с учетом дополнительной информации из вашего комментария.

Если у вас есть несколько потоков, которые необходимо сериализовать, вы можете использовать механизм сериализации, предоставляемый Windows бесплатно. Пусть каждая очередь будет потоком со своим окном и стандартным циклом сообщений. Используйте SendMessage() вместо PostThreadMessage(), и Windows позаботится о том, чтобы блокировать потоки отправки до тех пор, пока сообщение не будет обработано, и убедиться, что поддерживается правильный порядок выполнения. Используя поток с собственным окном для каждой группы запросов, вы обеспечиваете одновременную обработку нескольких групп.

Это простое решение, которое будет работать только в том случае, если сам запрос может быть обработан в другом контексте потока, чем тот, в котором он возник, что во многих случаях не должно быть проблемой.

0 голосов
/ 08 декабря 2009

Вы пробовали объект TThreadList, предоставленный Delphi?

Это потокобезопасный и он управляет замками для вас. Вы управляете списком "вне" цепочки внутри вашей основной ветки.

Когда запросы запрашивают новое задание, вы добавляете его в список. Когда поток завершается, с помощью события OnTerminate вы можете вызвать следующий поток в списке.

...