Есть ли ExecutorService, который использует текущий поток? - PullRequest
81 голосов
/ 05 июля 2011

Что мне нужно, так это совместимый способ настроить использование пула потоков или нет. В идеале остальная часть кода не должна быть затронута вообще. Я мог бы использовать пул потоков с 1 потоком, но это не совсем то, что я хочу. Есть идеи?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc

Ответы [ 6 ]

76 голосов
/ 01 февраля 2013

Вы можете использовать Guava's MoreExecutors.newDirectExecutorService() или MoreExecutors.directExecutor(), если вам не нужен ExecutorService.

Если включить Guava слишком тяжелый-вес, вы можете реализовать что-то почти так же хорошо:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
60 голосов
/ 05 июля 2011

Вот действительно простая реализация Executor (не ExecutorService, заметьте), которая использует только текущий поток. Похищение этого из «Параллелизма Java на практике» (обязательное чтение).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService является более сложным интерфейсом, но может обрабатываться с помощью того же подхода.

51 голосов
/ 19 января 2015

Стиль Java 8:

Executor e = Runnable::run;

12 голосов
/ 05 декабря 2012

Я написал ExecutorService на основе AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
5 голосов
/ 05 июля 2011

Вы можете использовать RejectedExecutionHandler для запуска задачи в текущем потоке.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Вам нужен только один из них.

4 голосов
/ 28 августа 2017

Мне пришлось использовать тот же «CurrentThreadExecutorService» для целей тестирования, и, хотя все предложенные решения были хорошими (в частности, упомянутое способ Гуавы ), я придумал нечто похожее на то, что предложил Питер Лоури здесь .

Как упомянул Аксель Циглер здесь , к сожалению, решение Питера на самом деле не сработает из-за проверки, введенной в ThreadPoolExecutor для конструктора maximumPoolSizeпараметр (т.е. maximumPoolSize не может быть <=0).

Чтобы обойти это, я сделал следующее:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...