Управление динамическим количеством потоков - PullRequest
7 голосов
/ 27 января 2012

Прежде всего, я все еще знакомлюсь с многопоточностью и не знаю много терминологии. Мне нужно убедиться, что я делаю это правильно, потому что это чувствительный предмет.

Технические характеристики

Я создаю компонент, который будет содержать динамическое число потоков. Каждый из этих потоков повторно используется для выполнения ряда запросов. Я могу предоставить все необходимые детали потоку, как я его создаю и до того, как я его выполню, а также предоставить обработчики событий. Как только он будет выполнен, я почти закончу с одним запросом и добавлю другой запрос. Запросы поступают в эти потоки из другого автономного фонового потока, который постоянно обрабатывает очередь запросов. Таким образом, эта система имеет два списка: 1) список записей запросов и 2) список указателей потоков.

Я использую потомков класса TThread (по крайней мере, я знаком с этим методом потоков). Я получаю обратную связь от потоков, синхронизируя триггеры событий, которые я назначил при создании потоков. Потоки загружают и сохраняют данные в фоновом режиме, а по завершении они сбрасывают себя, готовые обработать следующий запрос.

Задача

Теперь проблема начинается с решения, как обрабатывать событие изменения количества разрешенных потоков (через свойство компонента ActiveThreads: TActiveThreadRange, который TActiveThreadRange = 1..20). Следовательно, одновременно может быть создано от 1 до 20 потоков. Но когда, скажем, приложение, использующее этот компонент, изменит это свойство с 5 на 3. В настоящее время уже создано 5 потоков, и я не хочу принудительно освобождать этот поток, если он окажется занятым. Мне нужно подождать, пока это не будет сделано, прежде чем освободить его. А с другой стороны, если свойство изменяется с 3 на 5, то мне нужно создать 2 новых потока. Мне нужно знать правильный подход, чтобы «отслеживать» эти темы в этом сценарии.

Возможность

Вот несколько возможных способов «отслеживать» эти темы ...

  • Храните TList, содержащий каждый созданный поток - легко управлять
  • Создание оболочки TList или потомка, содержащего каждый созданный поток - проще в управлении, но больше работы
  • Храните array, содержащий каждый созданный поток. Было бы лучше, чем TList?
  • Создать упаковщик массива, содержащий каждый созданный поток

Но вернемся к моей первоначальной проблеме - что делать с существующими занятыми потоками, когда свойство ActiveThreads уменьшается? Создание их не проблема, но выпуск их становится запутанным. Я обычно делаю темы, которые освобождают себя, но я впервые делаю одну, которая используется повторно. Мне просто нужно знать правильный метод уничтожения этих потоков без прерывания их задач.

Обновление

Основываясь на отзывах, я приобрел и начал внедрять OmniThreadLibrary (а также давно необходимый FastMM). Я также немного изменил свой подход - способ, которым я могу создавать эти многопоточные процессы без управления ими и без другого потока для обработки очереди ...

  • 1 мастер-метод для создания нового процесса
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest - это запись со спецификациями того, что должно быть сделано (имя файла, параметры и т. Д.)
    • TProcessInfo - это запись, которая передает некоторую информацию о состоянии.
  • Введите обработчик событий для события, «выполняемого» с его задачей при создании нового процесса. Когда компонент получит это сообщение, он проверит очередь.
    • Если команда поставлена ​​в очередь, она будет сравнивать ограничение активного процесса с текущим счетчиком процесса
    • > Если превышен лимит, просто остановитесь, и следующий завершенный процесс выполнит ту же проверку
    • > Если в пределах лимита, запустить другой новый процесс (после того, как будет выполнен предыдущий процесс)
    • Если в очереди нет команд, просто остановите
  • Каждый процесс может умереть сам по себе после того, как он выполнил свою задачу (без поддержки потоковых потоков)
  • Мне не придется беспокоиться о другом таймере или потоке для непрерывного циклачерез
    • Вместо этого каждый процесс уничтожает себя и проверяет наличие новых запросов, прежде чем сделать это

Другое обновление

Я фактически вернулся к использованию TThread, так как OTL очень неудобно использовать.Мне нравится хранить вещи в своих собственных классах.

Ответы [ 3 ]

5 голосов
/ 27 января 2012

Как объясняется @NGLN и т. Д., Вам нужно объединить несколько потоков и признать, что самый простой способ управлять номерами потоков - это отделить фактическое количество потоков от желаемого.Добавить потоки в пул легко - просто создайте еще несколько экземпляров (передавая входную очередь задачи «производитель-потребитель» в качестве параметра, чтобы поток знал, что ожидать).Если желаемое количество потоков меньше существующего в настоящее время, вы можете поставить в очередь достаточно «ядовитых таблеток», чтобы убить лишние потоки.

Не сохраняйте список указателей потоков - это загрузкахлопоты по микроуправлению, которые просто не нужны (и, вероятно, пойдут не так).Все, что вам нужно сохранить, это счетчик количества желаемых потоков в пуле, чтобы вы знали, какие действия нужно предпринять, когда что-то меняет свойство poolDepth.

Триггеры событий лучше всего загружаются в задания, которыевыдан в пул - спускайте их все из некоторого класса TpooledTask, который принимает событие в качестве параметра конструктора и сохраняет его в каком-то FonComplete TNotifyEvent.Поток, выполняющий задачу, может вызвать FonComplete, когда он выполнит задание (с параметром отправителя TpooledTask) - вам не нужно знать, какой поток выполнял задачу.

Пример:

    unit ThreadPool;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, contnrs, syncobjs;


    type

    TpooledTask=class(TObject)
    private
      FonComplete:TNotifyEvent;
    protected
      Fparam:TObject;
      procedure execute; virtual; abstract;
    public
      constructor create(onComplete:TNotifyEvent;param:TObject);
    end;

    TThreadPool=class(TObjectQueue)
    private
      access:TcriticalSection;
      taskCounter:THandle;
      threadCount:integer;
    public
      constructor create(initThreads:integer);
      procedure addTask(aTask:TpooledTask);
    end;

    TpoolThread=class(Tthread)
    private
      FmyPool:TThreadPool;
    protected
      procedure Execute; override;
    public
      constructor create(pool:TThreadPool);
    end;

    implementation

    { TpooledTask }

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject);
    begin
      FonComplete:=onComplete;
      Fparam:=param;
    end;

    { TThreadPool }

    procedure TThreadPool.addTask(aTask: TpooledTask);
    begin
      access.acquire;
      try
        push(aTask);
      finally
        access.release;
      end;
      releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore
    end;

    constructor TThreadPool.create(initThreads: integer);
    begin
      inherited create;
      access:=TcriticalSection.create;
      taskCounter:=createSemaphore(nil,0,maxInt,'');
      while(threadCount<initThreads) do
      begin
        TpoolThread.create(self);
        inc(threadCount);
      end;
    end;

    { TpoolThread }

    constructor TpoolThread.create(pool: TThreadPool);
    begin
      inherited create(true);
      FmyPool:=pool;
      FreeOnTerminate:=true;
      resume;
    end;

procedure TpoolThread.execute;
var thisTask:TpooledTask;
begin
  while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do
  begin
    FmyPool.access.acquire;
    try
      thisTask:=TpooledTask(FmyPool.pop);
    finally
      FmyPool.access.release;
    end;
    thisTask.execute;
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask);
  end;
end;

end.
4 голосов
/ 27 января 2012

Вы можете внедрить сообщение FreeNotify в очередь запросов, и когда рабочий поток получит это сообщение, освободите себя. В вашем примере, когда вы уменьшаете количество потоков с 5 до 3, просто поместите 2 сообщения FreeNotify в свою очередь, и 2 рабочих потока будут свободны.

3 голосов
/ 27 января 2012

О вашей проблеме с декрементом активных тем: извините, но вы просто должны решить для себя.Либо немедленно освободите ненужные потоки (что завершает их в самый короткий момент), либо дайте им работать до тех пор, пока они не будут завершены (что завершит их после завершения всей работы).Это твой выбор.Конечно, вы должны отделить переменную для желаемого числа от фактического числа потоков.Проблема обновления фактического числа потоков переменной (может быть просто List.Count) для обоих абсолютно одинакова, так как любое решение потребует некоторого времени.

И по управлению несколькими потоками: вы можете изучить этот ответ , который хранит потоки в TList.Это требует небольшой доработки для вашего конкретного списка пожеланий, но, пожалуйста, кричите в случае необходимости помощи с этим.Кроме того, есть, конечно, больше возможных реализаций, которые могут быть получены из использования TThread по умолчанию.И обратите внимание, что существуют другие (многопоточные) библиотеки, но у меня никогда не было необходимости их использовать.

...