Ожидание нескольких потоков с использованием WaitForMultipleObjects - PullRequest
10 голосов
/ 29 июля 2011

Я использую функцию WaitForMultipleObjects, чтобы дождаться завершения нескольких потоков, но я делаю что-то не так, потому что результат не является ожидаемым

см. Пример кода

type
  TForm1 = class(TForm)
    Memo1: TMemo;
    Button1: TButton;
    procedure Button1Click(Sender: TObject);
  private
  end;

  TFoo = class(TThread)
  private
    Factor: Double;
    procedure ShowData;
  protected
    procedure Execute; override;
    constructor Create(AFactor : Double);
  end;


var
  Form1: TForm1;

implementation

Uses
 Math;

{$R *.dfm}

{ TFoo }

constructor TFoo.Create(AFactor: Double);
begin
  inherited Create(False);
  Factor := AFactor;
  FreeOnTerminate := True;

end;

procedure TFoo.Execute;
const
  Max=100000000;
var
  i : Integer;
begin
  inherited;
  for i:=1 to Max do
    Factor:=Sqrt(Factor);

  Synchronize(ShowData);
end;

procedure TFoo.ShowData;
begin
  Form1.Memo1.Lines.Add(FloatToStr(Factor));
end;

procedure TForm1.Button1Click(Sender: TObject);
const
 nThreads=5;
Var
 tArr  : Array[1..nThreads]  of TFoo;
 hArr  : Array[1..nThreads]  of THandle;
 i     : Integer;
 rWait : Cardinal;
begin
  for i:=1  to nThreads do
   begin
     tArr[i]:=TFoo.Create(Pi*i);
     hArr[i]:=tArr[i].Handle;
   end;

  repeat
    rWait:= WaitForMultipleObjects(nThreads, @hArr, True, 100);
    Application.ProcessMessages;
  until rWait<>WAIT_TIMEOUT;
  //here I want to show this message when all the threads are terminated    
  Memo1.Lines.Add('Wait done');
end;

end.

это текущий вывод демонстрационного приложения

1
Wait done
1
1
1
1

, но я хочу что-то вроде этого

1
1
1
1
1
Wait done

Как я должен использовать функцию WaitForMultipleObjects, чтобы подождать, пока всепоток завершен?

Ответы [ 9 ]

10 голосов
/ 29 июля 2011

Исправлено: Удалите FreeOnTerminate.

Ваш код вызывает освобождение потоков, когда вам все еще нужны дескрипторы.Это большая ошибка, и вы можете получить нарушения доступа где-то еще в вашем коде или коды возврата ошибок, возвращаемые из ваших WaitFormMultipleObjects.

TThread.handle становится недействительным, когда TThread освобождается, и это завершает ваш цикл ожиданиярано, потому что ручка больше не действительна.Вы также можете столкнуться с нарушением прав доступа, если попытаетесь получить доступ к TThread после его освобождения в фоновом режиме, поэтому я считаю, что лучше их удалять намеренно и в известное время.

Использование дескриптора потокатак как дескриптор события работает нормально, но вы не должны использовать FreeOnTerminate для освобождения потока, когда он завершает его, поскольку это слишком рано уничтожает дескрипторы.

Я также согласен с людьми, которые сказали, что выполняют цикл ожидания занятостис Application.Processmessages довольно некрасиво.Есть и другие способы сделать это.

unit threadUnit2;

interface

uses Classes, SyncObjs,Windows, SysUtils;

type
  TFoo = class(TThread)
  private
    FFactor: Double;
    procedure ShowData;
  protected
    procedure Execute; override;
    constructor Create(AFactor : Double);
    destructor Destroy; override;
  end;

  procedure WaitForThreads;


implementation

Uses
 Forms,
 Math;

procedure Trace(msg:String);
begin
  if Assigned(Form1) then
    Form1.Memo1.Lines.Add(msg);
end;



{ TFoo }

constructor TFoo.Create(AFactor: Double);
begin
  inherited Create(False);
  FFactor := AFactor;
//  FreeOnTerminate := True;

end;

destructor TFoo.Destroy;
begin
  inherited;
end;

procedure TFoo.Execute;
const
  Max=100000000;
var
  i : Integer;
begin
  inherited;
  for i:=1 to Max do
    FFactor:=Sqrt(FFactor);


  Synchronize(ShowData);
end;


procedure TFoo.ShowData;
begin

  Trace(FloatToStr(FFactor));
end;

procedure WaitForThreads;
const
 nThreads=5;
Var
 tArr  : Array[1..nThreads]  of TFoo;
 hArr  : Array[1..nThreads]  of THandle;
 i     : Integer;
 rWait : Cardinal;
begin
  for i:=1  to nThreads do
   begin
     tArr[i]:=TFoo.Create(Pi*i);
     hArr[i]:=tArr[i].handle; // Event.Handle;
   end;

  repeat
    rWait:= WaitForMultipleObjects(nThreads, @hArr[1],{waitAll} True, 150);
    Application.ProcessMessages;
  until rWait<>WAIT_TIMEOUT;
  Sleep(0);
  //here I want to show this message when all the threads are terminated
  Trace('Wait done');

  for i:=1  to nThreads do
   begin
     tArr[i].Free;
   end;

end;

end.
3 голосов
/ 29 июля 2011

Вот что происходит.

  1. Ваш код возвращает WAIT_FAILED из WaitForMultipleObjects.
  2. Вызов GetLastError приводит к коду ошибки 6, Дескрипторнедопустимо.
  3. Единственные дескрипторы, которые вы передаете WaitForMultipleObjects, - это дескрипторы потока, поэтому один из дескрипторов потока недопустим.
  4. Единственный способ, которым один из потоков обрабатываетможет стать недействительным, если он был закрыт.
  5. Как указали другие, вы закрываете дескрипторы, устанавливая FreeOnTerminate.

Мораль этой истории состоит в проверке вашеговерните значения правильно из всех функций, и пусть GetLastError приведет вас к основной причине проблемы.

3 голосов
/ 29 июля 2011

Если вы действительно хотите узнать, как работает многопоточность, вы на правильном пути - изучите код и задавайте вопросы, как здесь. Однако, если вы просто хотите использовать многопоточность в своем приложении, вы можете сделать это намного проще с OmniThreadLibrary при условии, что вы используете хотя бы Delphi 2009.

uses
  Math,
  OtlTask,
  OtlParallel;

function Calculate(factor: real): real;
const
  Max = 100000000;
var
  i: integer;
begin
  Result := factor;
  for i := 1 to Max do
    Result := Sqrt(Result);
end;

procedure TForm35.btnClick(Sender: TObject);
const
  nThreads = 5;
begin
  Parallel.ForEach(1, nThreads).Execute(
    procedure (const task: IOmniTask; const value: integer)
    var
      res: real;
    begin
      res := Calculate(Pi*value);
      task.Invoke(
        procedure begin
          Form35.Memo1.Lines.Add(FloatToStr(res));
        end
      );
    end
  );
  Memo1.Lines.Add('All done');
end;
2 голосов
/ 29 июля 2011

Я не мог упустить эту возможность, чтобы создать рабочий пример запуска нескольких потоков и использования обмена сообщениями для сообщения результатов обратно в графический интерфейс.

Потоки, которые будут запущены, объявлены как:

type
  TWorker = class(TThread)
  private
    FFactor: Double;
    FResult: Double;
    FReportTo: THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(const aFactor: Double; const aReportTo: THandle);

    property Factor: Double read FFactor;
    property Result: Double read FResult;
  end;

Конструктор просто устанавливает закрытые члены и устанавливает FreeOnTerminate на False . Это важно, так как это позволит основному потоку запросить экземпляр результата. Метод execute выполняет свои вычисления, а затем отправляет сообщение дескриптору, которое он получил в своем конструкторе, чтобы сказать, что сделано.

procedure TWorker.Execute;
const
  Max = 100000000;
var
  i : Integer;
begin
  inherited;

  FResult := FFactor;
  for i := 1 to Max do
    FResult := Sqrt(FResult);

  PostMessage(FReportTo, UM_WORKERDONE, Self.Handle, 0);
end;

Объявления для пользовательского сообщения UM_WORKERDONE объявляются как:

const
  UM_WORKERDONE = WM_USER + 1;

type
  TUMWorkerDone = packed record
    Msg: Cardinal;
    ThreadHandle: Integer;
    unused: Integer;
    Result: LRESULT;
  end;

Форма, запускающая потоки, добавляется в объявление:

  private
    FRunning: Boolean;
    FThreads: array of record
      Instance: TThread;
      Handle: THandle;
    end;
    procedure StartThreads(const aNumber: Integer);
    procedure HandleThreadResult(var Message: TUMWorkerDone); message UM_WORKERDONE;

Функция FRunning используется для предотвращения нажатия кнопки во время работы. FThreads используется для хранения указателя экземпляра и дескриптора созданных потоков.

Процедура запуска потоков имеет довольно простую реализацию:

procedure TForm1.StartThreads(const aNumber: Integer);
var
  i: Integer;
begin
  Memo1.Lines.Add(Format('Starting %d worker threads', [aNumber]));
  SetLength(FThreads, aNumber);
  for i := 0 to aNumber - 1 do
  begin
    FThreads[i].Instance := TWorker.Create(pi * (i+1), Self.Handle);
    FThreads[i].Handle := FThreads[i].Instance.Handle;
  end;
end;

Самое интересное в реализации HandleThreadResult:

procedure TForm1.HandleThreadResult(var Message: TUMWorkerDone);
var
  i: Integer;
  ThreadIdx: Integer;
  Thread: TWorker;
  Done: Boolean;
begin
  // Find thread in array
  ThreadIdx := -1;
  for i := Low(FThreads) to High(FThreads) do
    if FThreads[i].Handle = Cardinal(Message.ThreadHandle) then
    begin
      ThreadIdx := i;
      Break;
    end;

  // Report results and free the thread, nilling its pointer so we can detect
  // when all threads are done.
  if ThreadIdx > -1 then
  begin
    Thread := TWorker(FThreads[i].Instance);
    Memo1.Lines.Add(Format('Thread %d returned %f', [ThreadIdx, Thread.Result]));
    FreeAndNil(FThreads[i].Instance);
  end;

  // See whether all threads have finished.
  Done := True;
  for i := Low(FThreads) to High(FThreads) do
    if Assigned(FThreads[i].Instance) then
    begin
      Done := False;
      Break;
    end;
  if Done then
    Memo1.Lines.Add('Work done');
end;

Наслаждайтесь ...

2 голосов
/ 29 июля 2011

Всякий раз, когда вы ждете, и это связано с сообщением, вы должны использовать MsgWait ... и указать маску для обработки ожидаемого сообщения

repeat
    rWait:= MsgWaitForMultipleObjects(nThreads, @hArr[1], True, INFINITE, QS_ALLEVENTS);
    Application.ProcessMessages;
 until (rWait<>WAIT_TIMEOUT) and (rWait <> (WAIT_OBJECT_0 + nThreads));

nThreads

2 голосов
/ 29 июля 2011

Не передавайте такой короткий период ожидания, как последний параметр.

Согласно MSDN

dwMilliseconds [in] Интервал времени ожидания в миллисекундах. Функция возвращает значение, если интервал истек, даже если условия, заданные параметром bWaitAll, не выполнены. Если dwMilliseconds равно нулю, функция проверяет состояния указанных объектов и немедленно возвращает. Если dwMilliseconds имеет значение INFINITE, интервал ожидания функции никогда не истекает.

Обратите особое внимание на второе предложение. Вы говорите, что нужно подождать все ручки, но время ожидания истекло через 100 мс. Вместо этого передайте INFINITE в качестве последнего параметра и используйте WAIT_OBJECT_0 вместо WAIT_TIMEOUT в качестве теста выхода.

1 голос
/ 31 июля 2011

Вы могли бы реорганизовать свой код так, чтобы он ожидал только одного объекта вместо многих.

Я бы хотел познакомить вас с маленьким помощником, который обычно помогает мне в подобных случаях.На этот раз его зовут IFooMonitor :

IFooMonitor = interface
  function WaitForAll(ATimeOut: Cardinal): Boolean;
  procedure ImDone;
end;

TFoo и IFooMonitor будут друзьями:

TFoo = class(TThread)
strict private
  FFactor: Double;
  FMonitor: IFooMonitor;
  procedure ShowData;
protected
  procedure Execute; override;
public
  constructor Create(const AMonitor: IFooMonitor; AFactor: Double);
end;

constructor TFoo.Create(const ACountDown: ICountDown; AFactor: Double);
begin
  FCountDown := ACountDown;
  FFactor := AFactor;
  FreeOnTerminate := True;
  inherited Create(False);// <- call inherited constructor at the end!
end;

Когда TFoo завершит свою работу, он расскажет об этом своему новому другу:

procedure TFoo.Execute;
const
  Max = 100000000;
var
  i: Integer;
begin
  for i := 1 to Max do
    FFactor := Sqrt(FFactor);

  Synchronize(ShowData);

  FMonitor.ImDone(); 
end;

Теперь мы можем реорганизовать обработчик событий, чтобы он выглядел так:

procedure TForm1.Button1Click(Sender: TObject);
const
  nThreads = 5;
var
 i: Integer;
 monitor: IFooMonitor;
begin
  monitor := TFooMonitor.Create(nThreads); // see below for the implementation.

  for i := 1 to nThreads do
    TFoo.Create(monitor, Pi*i);

  while not monitor.WaitForAll(100) do
    Application.ProcessMessages;

  Memo1.Lines.Add('Wait done');
end;

И вот как мы можем реализовать IFooMonitor :

uses
  SyncObjs;

TFooMonitor = class(TInterfacedObject, IFooMonitor)
strict private
  FCounter: Integer;
  FEvent: TEvent;
  FLock: TCriticalSection;
private
  { IFooMonitor }
  function WaitForAll(ATimeOut: Cardinal): Boolean;
  procedure ImDone;
public
  constructor Create(ACount: Integer);
  destructor Destroy; override;   
end;

constructor TFooMonitor.Create(ACount: Integer);
begin
  inherited Create;
  FCounter := ACount;
  FEvent := TEvent.Create(nil, False, False, '');
  FLock := TCriticalSection.Create;
end;

procedure TFooMonitor.ImDone;
begin
  FLock.Enter;
  try
    Assert(FCounter > 0);
    Dec(FCounter);
    if FCounter = 0 then
      FEvent.SetEvent;
  finally
    FLock.Leave
  end;
end;

destructor TFooMonitor.Destroy;
begin
  FLock.Free;
  FEvent.Free;
  inherited;
end;

function TFooMonitor.WaitForAll(ATimeOut: Cardinal): Boolean;
begin
  Result := FEvent.WaitFor(ATimeOut) = wrSignaled 
end;
1 голос
/ 29 июля 2011

Я добавил следующие строки в конец процедуры:

memo1.Lines.add(intToHex(rWait, 2));
if rWait = $FFFFFFFF then
  RaiseLastOSError;

Оказывается, что WaitForMultipleObjects завершается с ошибкой «Отказано в доступе», скорее всего, потому что некоторые, но не все потоки заканчиваютсяи вычищать себя между итерациями.

У вас здесь есть проблема.Вам нужно поддерживать работающую рассылку сообщений, иначе синхронизация звонков не будет работать, поэтому вы не можете передать INFINITE, как предложил Кен.Но если вы делаете то, что делаете в настоящее время, вы сталкиваетесь с этой проблемой.

Решение состоит в том, чтобы переместить вызов WaitForMultipleObjects и код вокруг него в отдельный поток.Он должен ждать INFINITE, затем, когда он закончится, он должен каким-то образом сигнализировать потоку UI, чтобы он знал, что это сделано.(Например, когда вы нажимаете кнопку, отключаете кнопку, а затем, когда поток монитора завершает свою работу, она снова включает кнопку.)

1 голос
/ 29 июля 2011

Существует одно условие, которое удовлетворяет вашему условию «до» в цикле повторения, которое вы игнорируете, WAIT_FAILED:

until rWait<>WAIT_TIMEOUT; 
Memo1.Lines.Add('Wait done');

Поскольку время ожидания несколько ограничено, один (или более) изпотоки заканчивают работу и освобождают себя, делая один (или более) дескриптор недопустимым для следующего WaitForMultipleObjects, что заставляет его возвращать 'WAIT_FAILED', в результате чего отображается сообщение 'Done done'.

Для каждой итерации в повторениипетля, вы должны удалить ручки готовых нитей из вашего hArr.Кроме того, не забывайте проверять «WAIT_FAILED» в любом случае.

редактирование: Ниже приведен пример кода, показывающий, как это можно сделать.Отличие этого подхода от сохранения потоков состоит в том, что он не оставляет неиспользуемые объекты ядра и RTL.Это не имеет значения для рассматриваемого примера, но для большого количества потоков, занимающихся длительным бизнесом, это может быть предпочтительным.

В коде WaitForMultipleObjects вызывается с передачей false для параметра bWaitAll.чтобы иметь возможность удалить дескриптор потока без использования дополнительного вызова API, чтобы выяснить, является ли он недействительным или нет.Но это допускает иное, так как код также должен иметь возможность обрабатывать потоки, заканчивающиеся вне ожидающего вызова.

procedure TForm1.Button1Click(Sender: TObject);

const
  nThreads=5;

Var
  tArr  : Array[1..nThreads]  of TFoo;
  hArr  : Array[1..nThreads]  of THandle;
  i     : Integer;
  rWait : Cardinal;

  hCount: Integer;  // total number of supposedly running threads
  Flags: DWORD;     // dummy variable used in a call to find out if a thread handle is valid

  procedure RemoveHandle(Index: Integer); // Decrement valid handle count and leave invalid handle out of range
  begin
    if Index <> hCount then
      hArr[Index] := hArr[hCount];
    Dec(hCount);
  end;

begin
  Memo1.Clear;

  for i:=1  to nThreads do
   begin
     tArr[i]:=TFoo.Create(Pi*i);
     hArr[i]:=tArr[i].Handle;
   end;
   hCount := nThreads;

  repeat
    rWait:= WaitForMultipleObjects(hCount, @hArr, False, 100);

    case rWait of

      // one of the threads satisfied the wait, remove its handle
      WAIT_OBJECT_0..WAIT_OBJECT_0 + nThreads - 1: RemoveHandle(rWait + 1);

      // at least one handle has become invalid outside the wait call, 
      // or more than one thread finished during the previous wait,
      // find and remove them
      WAIT_FAILED:
        begin
          if GetLastError = ERROR_INVALID_HANDLE then
          begin
            for i := hCount downto 1 do 
              if not GetHandleInformation(hArr[i], Flags) then // is handle valid?
                RemoveHandle(i);
          end
          else
            // the wait failed because of something other than an invalid handle
            RaiseLastOSError;
        end;

      // all remaining threads continue running, process messages and loop.
      // don't process messages if the wait returned WAIT_FAILED since we didn't wait at all
      // likewise WAIT_OBJECT_... may return soon
      WAIT_TIMEOUT: Application.ProcessMessages; 
    end;

  until hCount = 0;  // no more valid thread handles, we're done

  Memo1.Lines.Add('Wait done');
end;

Обратите внимание, что это ответ на вопрос так, как он задан.Я бы предпочел использовать событие TThreads OnTerminate, чтобы уменьшить счетчик и вывести сообщение «Wait done», когда оно достигнет «0».Это или, как другие рекомендовали, перемещение ожидания в отдельный поток, было бы проще и, вероятно, чище, и избавило бы от необходимости Application.ProcessMessages.

...