CompletionSerivce более мощный, чем только FutureTask, и во многих случаях он более подходит.Я получаю некоторые идеи, чтобы решить эту проблему.Кроме того, его подкласс ExecutorCompletionService проще, чем FutureTask, просто включает в себя несколько строк кода.Это легко читать.Поэтому я модифицирую класс, чтобы получить частично вычисленный результат.Для меня удовлетворительное решение, в конце концов, выглядит просто и понятно.
CompletionService может гарантировать, что FutureTask уже выполнен, мы получаем из метода take
или poll
.Зачем?Поскольку класс QueueingFuture
, его метод run
вызываются только, другие методы, такие как cancel
, не вызывались.Другими словами, он завершается нормально.
Демонстрационный код:
CompletionService<List<DeviceInfo>> completionService =
new MyCompletionService<>(Executors.newCachedThreadPool());
Future task = completionService.submit(yourTask);
try {
LogHelper.i(TAG, "result 111: " );
Future<List<DeviceInfo>> result = completionService.take();
LogHelper.i(TAG, "result: " + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Это код класса:
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
/**
* This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
* from our FutureTask which returned from submit, even we cancel or interrupt it.
* Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
*/
public class MyCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask extension to enqueue upon completion.
*/
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}
private static class DoneFutureTask<V> extends FutureTask<V> {
private Object outcome;
DoneFutureTask(Callable<V> task) {
super(task);
}
DoneFutureTask(Runnable task, V result) {
super(task, result);
}
@Override
protected void set(V v) {
super.set(v);
outcome = v;
}
@Override
public V get() throws InterruptedException, ExecutionException {
try {
return super.get();
} catch (CancellationException e) {
return (V)outcome;
}
}
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
return new DoneFutureTask<V>(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
return new DoneFutureTask<V>(task, result);
}
/**
* Creates an MyCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public MyCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* Creates an MyCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public MyCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}