Как узнать, когда CompletionService завершает выдачу результатов? - PullRequest
5 голосов
/ 03 апреля 2012

Я хочу использовать CompletionService для обработки результатов из ряда потоков по мере их завершения . У меня есть служба в цикле для получения объектов Future, которые она предоставляет, когда они становятся доступными, но я не знаю лучшего способа определить, когда все потоки завершены (и, таким образом, выйти из цикла):

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;             

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}

Я пытался проверить состояние ThreadPoolExecutor, но я знаю, что методы getCompletedTaskCount и getTaskCount являются лишь приблизительными и на них нельзя полагаться. Есть ли лучший способ убедиться, что я получил все фьючерсы из CompletionService, чем сам их подсчитывать?


Редактировать: и ссылка, предоставленная Nobeh, и эта ссылка предполагают, что подсчет количества отправленных задач, а затем многократный вызов метода take () - это путь. Я просто удивлен, что нет способа спросить CompletionService или его исполнителя, что осталось вернуть.

Ответы [ 3 ]

4 голосов
/ 12 июня 2014

См. http://www.javaspecialists.eu/archive/Issue214.html для приличного предложения о том, как расширить ExecutorCompletionService, чтобы сделать то, что вы ищете. Я вставил соответствующий код ниже для вашего удобства. Автор также предлагает сделать сервис реализующим Iterable, что, я думаю, было бы неплохо.

FWIW, я согласен с вами, что это действительно должно быть частью стандартной реализации, но, увы, это не так.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}
2 голосов
/ 03 апреля 2012

Ответ на эти вопросы дает вам ответ?

  • Создают ли ваши асинхронные задачи другие задачи, отправленные в CompletionService?
  • Является ли service единственным объектом, который должен обрабатывать задачи, созданные в вашем приложении?

На основании справочной документации , CompletionService действует на основе подхода потребителя / производителя и использует внутренний Executor. Таким образом, до тех пор, пока вы создаете задачи в одном месте и потребляете их в другом месте, CompletionService.take() будет обозначать, если есть еще какие-либо результаты для выдачи.

Я считаю этот вопрос также помогает вам.

1 голос
/ 20 апреля 2018

Код ниже вдохновлен ответом @ Mark, но мне удобнее использовать:

package com.example;

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletionIterator<T> implements Iterator<T>, AutoCloseable {

    private AtomicInteger count = new AtomicInteger(0);

    private CompletionService<T> completer;

    private ExecutorService executor = Executors.newWorkStealingPool(100);

    public CompletionIterator() {
        this.completer = new ExecutorCompletionService<>(executor);
    }

    public void submit(Callable<T> task) {
        completer.submit(task);
        count.incrementAndGet();
      }

    @Override
    public boolean hasNext() {
        return count.decrementAndGet() > 0;
    }

    @Override
    public T next() {
        try {
            return completer.take().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        try {
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            executor = null;
            completer = null;
            count = null;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

Вот как его можно использовать:

try(CompletionIterator service = new CompletionIterator()) {
  service.submit(task1);
  service.submit(task2);
  // all tasks must be submitted before iterating, to avoid race condition
  for (Future<Integer> future : service) {
    System.out.printf("Job %d is done%n", future.get());
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...