Потоки ThreadPoolExecutor, конфликтующие с другими потоками - PullRequest
3 голосов
/ 01 апреля 2011

Я работаю над улучшением для существующего приложения Java. Приложение представляет собой процессор сообщений, который ежедневно обрабатывает несколько миллионов сообщений. Он написан с использованием Core Java с потоками, а очереди реализованы с использованием классов Collection.

В этом приложении некоторые типы сообщений выполняются в одном потоке. Передо мной была поставлена ​​задача сделать эту конкретную часть приложения многопоточной, чтобы быстрее обрабатывать сообщения, поскольку у нас есть два процессора.

Поскольку мы используем Java 5, я использовал подход ThreadPoolExcecutor. Я создал процессорные потоки для каждого клиента, чтобы сообщения для определенных потоков могли обрабатываться в своем собственном потоке. Потоки процессора реализуют интерфейс Callable, поскольку это позволит мне проверить будущий объект, завершено ли предыдущее задание или нет.

В процессе инициализации я перейду по всем клиентам, создаю потоки процессора для каждого и сохраню их в карте, используя их идентификатор в качестве уникального ключа. Чтобы отслеживать ранее отправленную работу, я снова сохраняю будущий объект на другой карте, используя тот же идентификатор, что и уникальный ключ.

Ниже приведен фрагмент кода, который я использовал: В основном классе -

ThreadPoolExecutor  threadPool = null;
int poolSize = 20;
int maxPoolSize = 50;
long keepAliveTime = 10;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000);
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,keepAliveTime, TimeUnit.SECONDS, queue);

   ....
   ....
 for (each client...) {
   id = getId()..
   future = futuremap.get(id);
   if(!future.isDone())
      continue;
   if(future == null || future.isDone()) {
      processor = processormap.get(id);
      if(processor == null) {
         processor = new Processor(.....);
         //add to the map
         processormap.put(id,processor);
      }
      //submit the processor
      future = threadPool.submit(processor );
      futuremap.put(id,future);
 }
} 

Процессорная нить

public class MyProcessor implements Callable<String> {
        .....
        .....
    public String call() {
        ....
        ....
    }
 }

Выпуск

Приведенная выше реализация работает хорошо в моей тестовой среде. Однако в производственной среде ( Edit # 1 - Ubuntu , Linux Slackware, Java - 1.6.0_18) мы заметили, что другие потоки приложения, которые не управляются с помощью этого нового ThreadpoolExecutor страдают. их задачи откладываются на несколько часов. Это потому, что потоки, созданные ThreadPoolExecutors, берут все ресурсы или что-то еще и не дают возможности другим потокам.

Новые потоки, созданные с помощью ThreadPoolExceutor, выполняют самостоятельную задачу и не конфликтуют с другими потоками за ресурсы. то есть, нет сценария состояния гонки.

В журнале, для новых потоков, я вижу, что работает максимум 20 потоков (corepoolsize), и нет никаких исключений отклонения, т. Е. Количество отправок находится в пределах очереди.

Есть идеи, почему это происходит?

Спасибо заранее.

1 Ответ

0 голосов
/ 01 апреля 2011

Мой предыдущий опыт работы с потоками в Linux ясно показал, что он гораздо более подвержен истощению потоков, когда очень загружен, чем та же система, работающая в Windows под той же нагрузкой. Я написал тестовую программу, которая показала, что при большой загрузке ЦП несколько потоков получат гораздо больше процессорного времени, чем другие, при использовании стандартных примитивов ожидания / уведомления - IIRC, на порядок. Мое решение состояло в том, чтобы использовать блокировку повторного входа в честном режиме, чтобы округлить их.

Опять же, IIRC, добавив Thread.yield в конце каждого рабочего мероприятия, не дало положительного эффекта.

На все это может существенно повлиять то, какая из нескольких библиотек потоков используется вашим дистрибутивом Linux.

Вы можете добиться некоторого улучшения, добавив регулятор в рабочую очередь, которая доминирует в рабочей нагрузке, хотя было бы лучше, если бы он каким-то образом адаптировался к объему работы, ожидающей в других несвязанных потоках.

...