Стратегия Java ThreadPoolExecutor, «Прямая передача обслуживания» с очередью? - PullRequest
6 голосов
/ 08 марта 2012

Я ищу ThreadPoolExecutor, где я могу установить corePoolSize и maximumPoolSize, и получается, что очередь немедленно передаст задачу пулу потоков и, таким образом, создаст новые потоки, пока не достигнетmaximumPoolSize затем начните добавлять в очередь.

Есть ли такая вещь?Если нет, то есть ли какая-либо веская причина, по которой у него нет такой стратегии?

По сути, я хочу, чтобы задачи передавались на выполнение, и когда они достигают точки, когда они, по сути, собираются стать «худшими»Производительность из-за слишком большого количества потоков (путем установки MaximumPoolSize), он прекратит добавлять новые потоки и работать с этим пулом потоков и начнет очередь, тогда, если очередь заполнена, он отклоняет.

И когда загрузка возвращается вниз, он может начать демонтаж потоков, которые не используются обратно в corePoolSize.

Для меня это имеет больше смысла, чем «три общие стратегии», перечисленные в http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

Ответы [ 2 ]

3 голосов
/ 08 марта 2012

Примечание: эти реализации несколько ошибочны и недетерминированы.Пожалуйста, прочитайте весь ответ и комментарии, прежде чем использовать этот код.

Как насчет создания рабочей очереди, которая отклоняет элементы, когда исполнитель меньше максимального размера пула, и начинает принимать их, когда максимумдостигнут?

Это зависит от задокументированного поведения:

"Если запрос не может быть поставлен в очередь, создается новый поток, если он не превысит MaximumPoolSize, и в этом случае задача будетбыть отклоненным. "

public class ExecutorTest
{
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int KEEP_ALIVE_TIME_MS = 5000;

    public static void main(String[] args)
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue();

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 6; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread());
                }
            });
        }
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private ThreadPoolExecutor executor;

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (executor.getPoolSize() < executor.getMaximumPoolSize())
            {
                return false;
            }
            return super.offer(e);
        }
    }
}

Примечание. Ваш вопрос удивил меня, поскольку я ожидал, что вашим желаемым поведением будет поведение по умолчанию для ThreadPoolExecutor, настроенного с corePoolSize


Идея # 2

Я думаю, что у меня есть несколько лучший подход.Он основан на поведении побочных эффектов, закодированном в методе setCorePoolSize в ThreadPoolExecutor.Идея состоит в том, чтобы временно и условно увеличить размер основного пула, когда рабочий элемент ставится в очередь.При увеличении размера основного пула ThreadPoolExecutor немедленно создаст достаточно новых потоков для выполнения всех задач в очереди (queue.size ()).Затем мы немедленно уменьшаем размер основного пула, что позволяет естественному сокращению пула потоков в последующие периоды низкой активности.Этот подход все еще не является полностью детерминированным (например, размер пула может превышать максимальный размер пула), но я думаю, что почти во всех случаях он лучше, чем первая стратегия.

В частностиЯ считаю, что этот подход лучше первого, потому что:

  1. Он будет чаще использовать потоки
  2. Он не будет отклонять выполнение в результате гонки
  3. Я хотел бы еще раз упомянуть, что при первом подходе пул потоков увеличивается до максимального размера даже при очень небольшом использовании.Этот подход должен быть намного более эффективным в этом отношении.

-

public class ExecutorTest2
{
    private static final int KEEP_ALIVE_TIME_MS = 5000;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;

    public static void main(String[] args) throws InterruptedException
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE);

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 60; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread()
                            + " poolSize: " + executor.getPoolSize());
                }
            });
        }

        executor.shutdown();

        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private final int corePoolSize;
        private final int maximumPoolSize;
        private ThreadPoolExecutor executor;

        public SaturateExecutorBlockingQueue(int corePoolSize, 
                int maximumPoolSize)
        {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
        }

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (super.offer(e) == false)
            {
                return false;
            }
            // Uncomment one or both of the below lines to increase
            // the likelyhood of the threadpool reusing an existing thread 
            // vs. spawning a new one.
            //Thread.yield();
            //Thread.sleep(0);
            int currentPoolSize = executor.getPoolSize();
            if (currentPoolSize < maximumPoolSize 
                    && currentPoolSize >= corePoolSize)
            {
                executor.setCorePoolSize(currentPoolSize + 1);
                executor.setCorePoolSize(corePoolSize);
            }
            return true;
        }
    }
}
2 голосов
/ 24 августа 2012

Мы нашли решение этой проблемы с помощью следующего кода:

Эта очередь является гибридной SynchronousQueue / LinkedBlockingQueue.

public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> {
  private static final long serialVersionUID = 1L;

  private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>();

  public OverflowingSynchronousQueue() {
    super();
  }

  public OverflowingSynchronousQueue(int capacity) {
    super(capacity);
  }

  @Override
  public boolean offer(E e) {
    // Create a new thread or wake an idled thread
    return synchronousQueue.offer(e);
  }

  public boolean offerToOverflowingQueue(E e) {
    // Add to queue
    return super.offer(e);
  }

  @Override
  public E take() throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue take
      return synchronousQueue.take();
    }
  }

  @Override
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue poll
      return synchronousQueue.poll(timeout, unit);
    }
  }

}

Чтобы это работало, нам нужно обернуть RejectedExecutionHandler для вызова «offerToOverflowingQueue» при отклонении задачи.

public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler {

  private OverflowingSynchronousQueue<Runnable> queue;
  private RejectedExecutionHandler adaptedRejectedExecutionHandler;

  public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue,
                                           RejectedExecutionHandler adaptedRejectedExecutionHandler)
  {
    super();
    this.queue = queue;
    this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler;
  }

  @Override
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!queue.offerToOverflowingQueue(r)) {
      adaptedRejectedExecutionHandler.rejectedExecution(r, executor);
    }
  }
}

Вот как мы создаем ThreadPoolExecutor

public static ExecutorService newSaturatingThreadPool(int corePoolSize,
                                                        int maxPoolSize,
                                                        int maxQueueSize,
                                                        long keepAliveTime,
                                                        TimeUnit timeUnit,
                                                        String threadNamePrefix,
                                                        RejectedExecutionHandler rejectedExecutionHandler)
  {
  OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize);
  OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue,
                                                                                                     rejectedExecutionHandler);
  ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                                                         maxPoolSize,
                                                         keepAliveTime,
                                                         timeUnit,
                                                         queue,
                                                         new NamedThreadFactory(threadNamePrefix),
                                                         rejectionPolicyAdapter);
  return executor;
}
...