Вот статья от Delphi3000 , показывающая, как использовать IOCP для создания пула потоков.Я не автор этого кода, но информация об авторе находится в исходном коде.
Я публикую здесь комментарии и код:
Каждый должен понимать, что такое поток, принципы потоков и так далее.Для тех, кто в этом нуждается, простая функция потока состоит в том, чтобы отделить обработку от одного потока к другому, чтобы обеспечить параллельное и параллельное выполнение.Основной принцип работы потоков так же прост: выделенная память, на которую ссылаются потоки, должна быть распределена для обеспечения безопасности доступа.Есть ряд других принципов, но это действительно тот, о котором нужно заботиться.
И еще ..
Потоково-безопасная очередь позволит нескольким потокам добавлять и удалять, выдвигать и извлекать значенияв очередь и из нее безопасно в порядке очереди.Имея эффективную и хорошо написанную очередь, вы можете иметь очень полезный компонент для разработки многопоточных приложений, от помощи в обеспечении многопоточного ведения журналов до асинхронной обработки запросов.
Пул потоков - это просто поток или несколько потоковкоторые чаще всего используются для управления очередью запросов.Например, веб-сервер, который должен иметь непрерывную очередь запросов, требующих обработки, использует пулы потоков для управления запросами http, или сервер COM + или DCOM использует пул потоков для обработки запросов rpc.Это сделано для того, чтобы уменьшить влияние обработки одного запроса на другой, скажем, если вы выполнили 3 запроса синхронно, а первый запрос занял 1 минуту, вторые два запроса не будут выполнены в течение по крайней мере 1 минуты, добавив сверхусобственное время для обработки, и для большинства клиентов это неприемлемо.
Так как это сделать ..
Начиная с очереди !!
Delphi предоставляет объект TQueue, который доступен, но, к сожалению, не безопасен для потоков и не слишком эффективен, но люди должны взглянуть на файл Contnrs.pas, чтобы увидеть, как Borland пишет там стеки и очереди.Для очереди требуются только две основные функции: добавление, удаление, отправка и удаление.Add / push добавит значение, указатель или объект в конец очереди.А remove / pop удалит и вернет первое значение в очереди.
Вы можете получить из объекта TQueue и переопределить защищенные методы и добавить в критические разделы, это поможет вам, но я бы хотелхочу, чтобы моя очередь ожидала, пока в очереди появятся новые запросы, и переведет поток в состояние покоя, пока он ожидает новых запросов.Это можно сделать, добавив мьютексы или сигнальные события, но есть более простой способ.Windows api предоставляет очередь завершения ввода-вывода, которая предоставляет нам потокобезопасный доступ к очереди и состояние покоя при ожидании нового запроса в очереди.
Реализация пула потоков
Пул потоков будет очень простым и будет управлять x желаемым количеством потоков и передавать каждый запрос очереди на событие, предоставляемое для обработки.Редко возникает необходимость в реализации класса TThread и вашей логики для реализации и инкапсуляции в событии execute класса, поэтому можно создать простой класс TSimpleThread, который будет выполнять любой метод в любом объекте в контексте другого потока.Как только люди это поймут, все, что вам нужно, - это выделить память.
Вот как это реализовано.
Реализация TThreadQueue и TThreadPool
(* Implemented for Delphi3000.com Articles, 11/01/2004
Chris Baldwin
Director & Chief Architect
Alive Technology Limited
http://www.alivetechnology.com
*)
unit ThreadUtilities;
uses Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
end;
implementation
{ TThreadQueue }
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue <> 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
(* Pop will return false if the queue is completed *)
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
while FThreads.Count < MaxThreads do
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count > 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
end.
Как вы можете видеть, это довольно просто, и с этим вы можете очень легко реализовать любую очередь запросов через потоки и действительно любые требования, которые требуютс помощью этого объекта можно выполнить многопоточность и сэкономить вам много времени и усилий.
Вы можете использовать это, чтобы ставить запросы из одного потока в несколько потоков или ставить запросы из нескольких потоков в один поток, что делает этодовольно хорошее решение.
Вот несколько примеров использования этих объектов.
Потоковое ведение журнала
Чтобы разрешить нескольким потокам асинхронную запись вфайл журнала.
uses Windows, ThreadUtilities,...;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 1);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
begin
Request := Data;
try
LogToFile(FFileName, Request^.LogText);
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
Поскольку это ведение журнала в файл, он будет обрабатывать все запросы вплоть до одного потока, но вы могли бы делать расширенные уведомления по электронной почте с большим числом потоков, илиеще лучше, процесс профилирования с тем, что происходит, или шаги в вашей программе, которые я покажу в другой статье, поскольку эта статья уже довольно длинная.
Пока я оставлю вас в этом, наслаждайтесь .. Оставьтепрокомментируйте, если есть что-то, с чем люди застряли.
Крис