Как сделать блок метода submit () ThreadPoolExecutor, если он насыщен? - PullRequest
96 голосов
/ 04 января 2010

Я хочу создать ThreadPoolExecutor таким образом, чтобы при достижении максимального размера и заполнении очереди метод submit() блокировал при попытке добавления новых задач. Нужно ли реализовывать пользовательский RejectedExecutionHandler для этого или существует существующий способ сделать это с помощью стандартной библиотеки Java?

Ответы [ 19 ]

0 голосов
/ 25 января 2015

Я считаю, что есть довольно элегантный способ решить эту проблему, используя java.util.concurrent.Semaphore и делегируя поведение Executor.newFixedThreadPool. Новая служба executor выполнит новую задачу только тогда, когда для этого есть поток. Блокировка управляется семафором с количеством разрешений, равным количеству потоков. Когда задача завершена, она возвращает разрешение.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
0 голосов
/ 03 ноября 2017

Вот решение, которое, кажется, работает очень хорошо. Это называется NotifyingBlockingThreadPoolExecutor .

Демонстрационная программа.

Редактировать: с этим кодом существует проблема , метод await () содержит ошибки. Вызов shutdown () + awaitTermination (), кажется, работает нормально.

0 голосов
/ 12 декабря 2013

Вам следует взглянуть на эту ссылку (notifying-blocking-thread-pool) , которая обобщает несколько решений и, наконец, дает элегантное уведомление с уведомлением.

0 голосов
/ 04 января 2010

Создайте собственную очередь блокировки, которая будет использоваться Исполнителем, с требуемым поведением блокировки, всегда возвращая доступную оставшуюся емкость (гарантируя, что исполнитель не будет пытаться создать больше потоков, чем его основной пул, или инициировать отклонение обработчик).

Я верю, что это поможет вам найти блокирующее поведение, которое вы ищете. Обработчик отклонения никогда не будет соответствовать требованиям, так как это означает, что исполнитель не может выполнить задачу. Я мог предположить, что в обработчике вы получаете некоторую форму «ожидания ожидания». Это не то, что вы хотите, вы хотите очередь для исполнителя, которая блокирует звонящего ...

0 голосов
/ 02 сентября 2016

Я нашел эту политику отклонения в клиенте эластичного поиска. Он блокирует поток вызывающих в очереди блокировки. Код ниже-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
0 голосов
/ 02 октября 2016

У меня недавно была потребность добиться чего-то похожего, но на ScheduledExecutorService.

Я должен был также убедиться, что я обработал задержку, передаваемую методу, и убедиться, что задача передается для выполнения в то время, как ожидает вызывающий объект, или просто терпит неудачу, таким образом выбрасывая RejectedExecutionException.

Другие методы из ScheduledThreadPoolExecutor для внутреннего выполнения или отправки задачи вызывают #schedule, который все равно в свою очередь вызывает переопределенные методы.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

У меня есть код здесь, будем благодарны за любые отзывы. https://github.com/AmitabhAwasthi/BlockingScheduler

0 голосов
/ 16 апреля 2014

Чтобы избежать проблем с решением @FixPoint. Можно использовать ListeningExecutorService и выпустить семафор onSuccess и onFailure внутри FutureCallback.

0 голосов
/ 07 ноября 2017

Мне не всегда нравится CallerRunsPolicy, тем более что он позволяет отклоненной задаче «пропускать очередь» и выполняться перед задачами, которые были отправлены ранее. Более того, выполнение задачи в вызывающем потоке может занять гораздо больше времени, чем ожидание появления первого слота.

Я решил эту проблему с помощью пользовательского RejectedExecutionHandler, который просто на некоторое время просто блокирует вызывающий поток, а затем снова пытается отправить задачу:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Этот класс можно просто использовать в исполнителе пула потоков как RejectedExecutinHandler, как и любой другой, например:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

Единственный недостаток, который я вижу, это то, что вызывающий поток может быть заблокирован немного дольше, чем это строго необходимо (до 250 мс). Более того, поскольку этот исполнитель фактически вызывается рекурсивно, очень долгое ожидание появления потока (часы) может привести к переполнению стека.

Тем не менее мне лично нравится этот метод. Он компактен, прост для понимания и хорошо работает.

0 голосов
/ 26 июля 2014

Недавно я обнаружил, что этот вопрос имеет ту же проблему. OP не говорит об этом явно, но мы не хотим использовать RejectedExecutionHandler, который выполняет задачу в потоке отправителя, потому что это приведет к неполному использованию рабочих потоков, если эта задача долго выполняется.

Чтение всех ответов и комментариев, в частности, некорректного решения с помощью семафора или использования afterExecute. Я внимательно посмотрел код ThreadPoolExecutor , чтобы узнать, есть ли выход. Я был поражен, увидев, что есть более 2000 строк (закомментированных) кода, некоторые из которых заставляют меня чувствовать головокружение . Учитывая довольно простое требование, которое у меня на самом деле есть - один производитель, несколько потребителей, пусть производитель блокирует, когда ни один из потребителей не может взять на себя работу - я решил найти свое собственное решение. Это не ExecutorService, а просто Executor. И он не адаптирует количество потоков к рабочей нагрузке, а содержит только фиксированное количество потоков, что также соответствует моим требованиям. Вот код Не стесняйтесь разглагольствовать об этом: -)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
...