Очереди очереди для чайников - PullRequest
6 голосов
/ 28 апреля 2009

У меня есть довольно распространенный сценарий с потоками:

  • У меня есть 100 идентичных заданий для выполнения
  • Все вакансии независимы от каждого другой
  • Я хочу обработать максимум 15 рабочих мест одновременно
  • как каждая работа завершено, новая работа будет начата пока все задания не будут выполнены

Если вы предполагаете, что каждое задание будет запускать событие, когда он завершит (я использую класс BackgroundWorker), я могу придумать пару способов справиться с этим, но я не уверен, что «правильно» решение есть. Я надеялся, что некоторые из вас, гуру, смогут направить меня в правильном направлении.

РЕШЕНИЕ 1: Имейте while (продолжить) {Threading.Sleep (1000); } цикл в моей функции Main (). Код в обработчике событий Job_Completed установит continue = false, если A) не осталось заданий, которые должны быть поставлены в очередь, и B) все поставленные в очередь задания завершены. Я использовал это решение раньше, и хотя оно, кажется, работает нормально ... оно кажется мне немного странным.

РЕШЕНИЕ 2: Используйте Application.Run () в моей функции Main (). Точно так же код в обработчике событий Job_Completed будет вызывать Application.Exit (), когда A) нет заданий, которые должны быть поставлены в очередь, и B) все задания в очереди завершены.

РЕШЕНИЕ 3: Используйте ThreadPool, поставьте в очередь все 500-1000 запросов, позвольте им запускаться по 10 одновременно (SetMaxThreads) и каким-то образом дождитесь их завершения.

Во всех этих решениях основная идея состоит в том, что новое задание будет запускаться каждый раз, когда другое задание будет выполнено, пока не останется ни одного задания. Таким образом, проблема заключается не только в ожидании завершения существующих заданий, но и в ожидании, когда больше не будет отложенных заданий для запуска. Если ThreadPool - правильное решение, как правильно ждать на ThreadPool, чтобы завершить все элементы в очереди?

Я думаю, что моя главная путаница в том, что я не совсем понимаю, КАК события могут запускаться из моей функции Main (). Очевидно, они понимают, я просто не понимаю механику этого с точки зрения цикла сообщений Windows. Как правильно решить эту проблему и почему?

Ответы [ 8 ]

3 голосов
/ 28 апреля 2009

Несмотря на то, что другие ответы хороши, если вы хотите другой вариант (у вас никогда не может быть достаточно вариантов), то как об этом как об идее.

Просто поместите данные для каждого задания в структуру, которая находится в стеке FIFO.

Создать 15 тем.

Каждый поток получит следующее задание из стека, отбрасывая его.

Когда поток заканчивает обработку, получите следующее задание, если стек пуст, поток умирает или просто спит, ожидая.

Единственная сложность, которую довольно просто разрешить, заключается в том, чтобы всплывающее окно находилось в критической секции (синхронизировать чтение / всплывающее окно).

2 голосов
/ 28 апреля 2009

Re: "как-то дождаться их завершения"

ManualResetEvent - ваш друг, прежде чем вы начнете свою большую серию, создайте одного из этих щенков, в основном потоке ожидайте его, установите его в конце фоновой операции, когда работа завершена.

Другой вариант - создать потоки вручную и создать поток foreach, thread.Join ()

Вы можете использовать это (я использую это во время тестирования)

     private void Repeat(int times, int asyncThreads, Action action, Action done) {
        if (asyncThreads > 0) {

            var threads = new List<Thread>();

            for (int i = 0; i < asyncThreads; i++) {

                int iterations = times / asyncThreads; 
                if (i == 0) {
                    iterations += times % asyncThreads;                    
                }

                Thread thread = new Thread(new ThreadStart(() => Repeat(iterations, 0, action, null)));
                thread.Start();
                threads.Add(thread);
            }

            foreach (var thread in threads) {
                thread.Join();
            }

        } else {
            for (int i = 0; i < times; i++) {
                action();
            }
        }
        if (done != null) {
            done();
        }
    }

Использование:

// Do something 100 times in 15 background threads, wait for them all to finish.
Repeat(100, 15, DoSomething, null)
1 голос
/ 28 апреля 2009

Я бы использовал ThreadPool.

Прежде чем начать работу, создайте ManualResetEvent и счетчик int. Добавьте каждое задание в ThreadPool, увеличивая счетчик каждый раз.

В конце каждого задания уменьшайте счетчик, а когда он достигнет нуля, вызовите Set () для события.

В вашем основном потоке вызовите WaitOne () , чтобы дождаться завершения всех заданий.

1 голос
/ 28 апреля 2009

Когда вы ставите рабочий элемент в очередь потока, вы должны получить обратно ручку ожидания. Поместите их все в массив, и вы можете передать его в качестве аргумента функции WaitAll().

1 голос
/ 28 апреля 2009

Я бы просто использовал библиотеку параллельных заданий.

Вы можете сделать это как один простой цикл Parallel.For с вашими задачами, и он автоматически справится с этим довольно чисто. Если вы не можете дождаться C # 4 и реализации Microsoft, временный обходной путь - просто скомпилировать и использовать Mono реализацию TPL . (Я лично предпочитаю реализацию MS, особенно более новые бета-версии, но Mono-версия сегодня функциональна и распространяется.)

0 голосов
/ 22 июня 2017

Microsoft Reactive Framework отлично подходит для этого:

Action[] jobs = new Action[100];

var subscription =
    jobs
        .ToObservable()
        .Select(job => Observable.Start(job))
        .Merge(15)
        .Subscribe(
            x => Console.WriteLine("Job Done."),
            () => Console.WriteLine("All Jobs Done."))

Готово.

Просто NuGet "System.Reactive".

0 голосов
/ 28 апреля 2009

ThreadPool может быть путь. Метод SetMaxThreads сможет ограничить количество выполняемых потоков. Однако это ограничивает максимальное количество потоков для процесса / домена приложений. Я бы не советовал использовать SetMaxThreads, если процесс запущен как служба.

private static ManualResetEvent manual = new ManualResetEvent(false);
private static int count = 0;

public void RunJobs( List<JobState> states )
{
     ThreadPool.SetMaxThreads( 15, 15 );

     foreach( var state in states )
     {
          Interlocked.Increment( count );
          ThreadPool.QueueUserWorkItem( Job, state );
     }

    manual.WaitOne();
}

private static void Job( object state )
{
    // run job
    Interlocked.Decrement( count );
    if( Interlocked.Read( count ) == 0 ) manual.Set();
}
0 голосов
/ 28 апреля 2009

Вот псевдокод того, как я бы подошел к нему (это не использует ThreadPool, поэтому у кого-то может быть лучший ответ:)

main
{
    create queue of 100 jobs
    create new array of 15 threads
    start threads, passing each the job queue
    do whatever until threads are done
}

thread(queue)
{
    while(queue isn't empty)
    {
        lock(queue) { if queue still isn't empty dequeue a thing }
        process the thing
    }

    queue is empty so exit thread
}

РЕДАКТИРОВАТЬ: Если ваша проблема заключается в том, как определить, когда потоки закончены, и вы используете обычные потоки C # (не потоки ThreadPooled), вы можете вызывать Thread.Join () для каждого потока с необязательным таймаутом, и он будет вернуть только после завершения потока. Если вы хотите отслеживать, сколько потоков сделано, не зацикливаясь на одном, вы можете циклически просматривать их следующим образом:

for(int i = 0; allThreads.Count > 0; i++)
{
    var thisThread = allThreads[i % threads.Count];
    if(thisThread.Join(timeout)) // something low, maybe 100 ms or something
        allThreads.Remove(thisThread);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...