Используйте ограниченный набор потоков для регулярного запуска похожих задач - PullRequest
0 голосов
/ 16 августа 2011

Я задавал подобный вопрос до здесь , но после долгих раздумий и реализации тех, кто мне ответил, я обнаружил, что мой подход мог быть неправильным.

Когда я реализую решение, данное мне по этому предыдущему вопросу, появился следующий результат теста:

  1. Когда я «моделирую» несколько задач, выполняющихся одновременно на нескольких потоках из пула потоков (создавая потокиспать в случайное время от 1 до 20 секунд, например), то модель, кажется, работает нормально.Я настроил систему на опрос каждые 1 секунду, чтобы увидеть, может ли она порождать другой поток, и все выглядит нормально.Более продолжительные (спящие) потоки будут завершены позже, и потоки начнут и умрут повсюду.Если у меня заканчиваются потоки (я установил для них не более 10), он будет сидеть и ждать, пока один из них не станет доступным.
  2. Когда я, однако, заставляю систему выполнять фактическую обработку в каждом потоке (чтопотребовалось бы от 3 секунд и более), что включало бы чтение данных, генерацию XML-данных, сохранение данных, отправку электронных писем и т. п., система порождала 1, 2 или 3 потока, выполняла обработку и затем просто закрывала потоки (3 ...2 ... 1 ...) и затем произнесите 0 запущенных потоков (я везде добавил console.writelines для документирования процесса).Затем он будет висеть вокруг 0 ​​потоков, никогда не выбирая больше работы!

Поэтому я решил вновь заявить о своей проблеме в надежде, что у кого-то есть решение.До сих пор я пробовал различные решения:

  1. ThreadPool: всегда упоминается, что вам не следует перегружать ThreadPool, и задания должны быть «быстрыми», но каково определение«быстро»?Как я узнаю, насколько занят / занят ThreadPool?
  2. Потоки: всегда указывается, что потоки дороги, и вы должны обрабатывать их, начиная и заканчивая, но как мне их ограничить, я пробовал семафоры,'блокировать' объекты, публичные переменные, но это бесполезно

Итак, вот что я хотел бы сделать:

У меня та же работа, которую нужно выполнять через регулярные промежутки времени, например, gmail будет проверять свой сервер на наличие новой электронной почты каждые 5 секунд. Если есть работа, которую нужно выполнить (т.е. у вас есть новаяписьма, которые будут отправлены на ваш почтовый ящик), затем создайте асинхронную ветку и заставьте ее начать работу.Эта работа обычно занимает больше времени, чем интервал, указанный в (1), следовательно, асинхронный поток, если интервал проходит, и система снова проверяет, есть ли новая работа, и видит, что у вас есть больше работы, она порождает другой поток и делает егоначать работу. Как и в моем примере, все задания относятся к одному виду работ (проверка новой почты) и полностью независимы друг от друга, они не влияют друг на друга.Если один из них выходит из строя, остальные могут продолжать работать без проблем. Мне нужно, чтобы было ограничение на количество одновременных потоков и максимальное количество потоков, которые я могу иметь.Если я выберу «10», то система должна начать проверку заданий, как в (1), и продолжать порождать потоки, как в (1), пока не достигнет 10 потоков.Все новые попытки на интервале порождать новый поток должны просто потерпеть неудачу (ничего не делать), пока поток не будет освобожден снова.Здесь я предполагаю, что выбор будет: (а) когда он будет выпущен, уже будет некоторая работа, ожидающая очереди для нового открытого потока, или (б) в следующем интервале, проверьте, есть ли новая работа, и назначьте ее новому открытому потоку.thread. Если нет работы, то обычно система должна сидеть и ждать, не имея потоков, и, по сути, единственное, что должно быть запущено, это какой-то таймер

В настоящее время я использую пример из предыдущего вопроса для выполнения следующих действий:

  1. Я запускаю таймер, который тикает каждые 1 секунду
  2. На каждом тике I 'ThreadPool.QueueUserWorkItem (new WaitCallback (DoWork) '
  3. В DoWork II создайте экземпляр класса и вызовите различные методы, которые выполняют некоторую работу

... но это приводит к тому, что я упоминал ранее, только 3 потока отмирают, а затем ничего.

Я думаю о том, чтобы сделать следующее:

  1. Установите ThreadPool на 10 потоков
  2. Запустите таймер и в каждом тике ThreadPool.QueueUserWorkItem 'и просто продолжайте делать это, надеясь, что ThreadPool будет обрабатывать все остальное. Разве это не то, что должен делать ThreadPool?

Любая помощь будет фантастической! (Извините за связанное объяснение!)

Ответы [ 2 ]

1 голос
/ 16 августа 2011

Попробуйте взглянуть на класс Семафор . Вы можете использовать это, чтобы установить ограничение на количество потоков, которые могут одновременно обращаться к конкретному ресурсу (и когда я говорю ресурс, это может быть что угодно).

Хорошо, отредактировано для деталей:

В вашем классе, управляющем потоками, вы создаете:

Semaphore concurrentThreadsEnforcer = new Semaphore(value1, value2);

Затем каждый запускаемый вами поток будет вызывать:

concurrentThreadsEnforcer.WaitOne();

Это либо возьмет один слот из семафора и передаст его новому потоку, либо заблокирует новый поток, пока слот не станет доступным.

Всякий раз, когда ваш новый поток заканчивает свою работу, он (мне нравится персонализация) ДОЛЖЕН звонить по понятным причинам:

concurrentThreadsEnforcer.Release().

Теперь, что касается конструктора, второй параметр довольно прост: указывает, сколько параллельных потоков может получить доступ к ресурсу в любой момент времени.

Первый немного сложнее. Разница между вторым параметром и первым будет определять, сколько слотов семафоров зарезервировано для вызывающего потока . То есть все ваши вновь порожденные потоки будут иметь доступ к количеству слотов, указанному в первом параметре, а оставшиеся до значения второго параметра будут зарезервированы для исходного потока, создавшего семафор. (вызывающая тема).

В вашем случае для 10 максимальных потоков вы должны использовать:

... = new Semaphore(10, 10);

Так как я в любом случае опубликовал историю, позвольте мне рассказать подробнее.

То, как я это сделаю в новых темах, будет таким:

bool aquired = false;
try
{
    aquired = concurrentThreadsEnforcer.WaitOne();

    // Do some work here
} // Optional catch statements
finally
{
    if (aquired)
        concurrentThreadsEnforcer.Release();;
}
0 голосов
/ 16 августа 2011

Я бы использовал комбинацию BlockingCollection и Parallel.ForEach

Примерно так:

private BlockingCollection<Job> jobs = new BlockingCollection<Job>();
private Task jobprocessor;

public void StartWork() {
    timer.Start();
    jobprocessor = Task.Factory.StartNew(RunJobs);
}

public void EndWork() {
    timer.Stop();
    jobs.CompleteAdding();
    jobprocessor.Wait();
}

public void TimerTick() {
   var job = new Job();
   if (job.NeedsMoreWork())
       jobs.Add(job);
}

public void RunJobs() {
    var options = new ParallelOptions { MaxDegreeOfParallelism = 10 };
    Parallel.ForEach(jobs.GetConsumingPartitioner(), options,
                     job => job.DoSomething());
}
...