Ждите отмены () на FutureTask - PullRequest
12 голосов
/ 18 мая 2011

Я хочу отменить FutureTask, который я получаю от ThreadPoolExecutor, но я хочу быть уверен, что Callable в пуле потоков остановил его работу.

Если я вызываю FutureTask # cancel (false), а затемget () (заблокировать до завершения) Я получаю исключение CancelledException.Выдается ли это исключение сразу или после того, как задача перестает выполняться?

Ответы [ 5 ]

2 голосов
/ 08 апреля 2013

Да, CancellationException бросается немедленно.Вы можете расширить FutureTask, добавив версию метода get(), которая ожидает окончания потока Callable.

public class ThreadWaitingFutureTask<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public ThreadWaitingFutureTask(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    public T getWithJoin() throws InterruptedException, ExecutionException {
        try {
            return super.get();
        }
        catch (CancellationException e) {
            semaphore.acquire();
            semaphore.release();
            throw e;
        }
    }

    private ThreadWaitingFutureTask(final Callable<T> callable, 
                final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                }
                finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
2 голосов
/ 20 ноября 2013

Пример Алексея работает хорошо.Я написал вариант с конструктором, принимающим Runnable (будет возвращать ноль) и показывающим, как напрямую блокировать (объединять) функцию cancel ():

public class FutureTaskCancelWaits<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public FutureTaskCancelWaits(Runnable runnable) {
        this(Executors.callable(runnable, (T) null));
    }

    public FutureTaskCancelWaits(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        // If the task was successfully cancelled, block here until call() returns
        if (super.cancel(mayInterruptIfRunning)) {
            try {
                semaphore.acquire();
                // All is well
                return true;
            } catch (InterruptedException e) {
                // Interrupted while waiting...
            } finally {
                semaphore.release();
            }
        }
        return false;
    }

    private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                } finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
1 голос
/ 06 мая 2015

Этот ответ исправляет состояние гонки в коде Алексея и FooJBar, проверяя, была ли задача отменена внутри вызываемой.(Существует окно между тем, когда FutureTask.run проверяет состояние и запускает вызываемый объект, во время которого могут успешно завершиться и cancel, и getWithJoin. Однако вызываемый элемент все равно будет работать.)

Я также решил не переопределятьоригинальная отмена, так как новая отмена должна объявить InterruptedException.Новая отмена избавляет от своего бесполезного возвращаемого значения (поскольку true может означать любое из следующих слов: «задание не запущено», «задание запущено и уже нанесло значительный ущерб», «задание запущено и в конечном итоге завершится»).Прошла также проверка возвращаемого значения super.cancel, поэтому, если новая отмена вызывается несколько раз из разных потоков, они все будут ждать завершения задачи.

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Based on: /5874833/zhdite-otmeny-na-futuretask
 * 
 * @author Aleksandr Dubinsky
 */
public class FixedFutureTask<T> extends FutureTask<T> {

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable}, 
      * and arrange that {@code get} will return the given result on successful completion.
      *
      * @param runnable the runnable task
      * @param result the result to return on successful completion. 
      *               If you don't need a particular result, consider using constructions of the form:
      *               {@code Future<?> f = new FutureTask<Void>(runnable, null)}
      * @throws NullPointerException if the runnable is null
      */
      public 
    FixedFutureTask (Runnable runnable, T result) {
            this (Executors.callable (runnable, result));
        }

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
      *
      * @param  callable the callable task
      * @throws NullPointerException if the callable is null
      */
      public 
    FixedFutureTask (Callable<T> callable) {
            this (new MyCallable (callable));
        }

      /** Some ugly code to work around the compiler's limitations on constructors */
      private 
    FixedFutureTask (MyCallable<T> myCallable) {
            super (myCallable);
            myCallable.task = this;
        }

    private final Semaphore semaphore = new Semaphore(1);

    private static class MyCallable<T> implements Callable<T>
    {
        MyCallable (Callable<T> callable) {
                this.callable = callable;
            }

        final Callable<T> callable;
        FixedFutureTask<T> task;

          @Override public T
        call() throws Exception {

                task.semaphore.acquire();
                try 
                {
                    if (task.isCancelled())
                        return null;

                    return callable.call();
                }
                finally 
                {
                    task.semaphore.release();
                }
            }
    }

     /**
      * Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      */
      @Override public T 
    get() throws InterruptedException, ExecutionException, CancellationException {

            try 
            {
                return super.get();
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      * @throws CancellationException
      * @throws TimeoutException if the wait timed out
      */
      @Override public T
    get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {

            try 
            {
                return super.get (timeout, unit);
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Attempts to cancel execution of this task and waits for the task to complete if it has been started.
      * If the task has not started when {@code cancelWithJoin} is called, this task should never run.
      * If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
      * whether the thread executing this task should be interrupted in an attempt to stop the task.
      *
      * <p>After this method returns, subsequent calls to {@link #isDone} will
      * always return {@code true}.  Subsequent calls to {@link #isCancelled}
      * will always return {@code true} if this method returned {@code true}.
      *
      * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; 
      *                              otherwise, in-progress tasks are allowed to complete
      * @throws InterruptedException if the thread is interrupted
      */
      public void
    cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {

            super.cancel (mayInterruptIfRunning);

            semaphore.acquire();
            semaphore.release();
        }
}
1 голос
/ 18 мая 2011

Он выбрасывается, как только его отменяют.

Нет простого способа узнать, началось ли оно и закончено ли.Вы можете создать оболочку для запуска, чтобы проверить ее состояние.

final AtomicInteger state = new AtomicInteger();
// in the runnable
state.incrementAndGet();
try {
    // do work
} finally {
   state.decrementAdnGet();
}
0 голосов
/ 12 октября 2018

CompletionSerivce более мощный, чем только FutureTask, и во многих случаях он более подходит.Я получаю некоторые идеи, чтобы решить эту проблему.Кроме того, его подкласс ExecutorCompletionService проще, чем FutureTask, просто включает в себя несколько строк кода.Это легко читать.Поэтому я модифицирую класс, чтобы получить частично вычисленный результат.Для меня удовлетворительное решение, в конце концов, выглядит просто и понятно.

CompletionService может гарантировать, что FutureTask уже выполнен, мы получаем из метода take или poll.Зачем?Поскольку класс QueueingFuture, его метод run вызываются только, другие методы, такие как cancel, не вызывались.Другими словами, он завершается нормально.

Демонстрационный код:

CompletionService<List<DeviceInfo>> completionService =
                new MyCompletionService<>(Executors.newCachedThreadPool());   
        Future task = completionService.submit(yourTask);
    try {
        LogHelper.i(TAG, "result 111: " );
        Future<List<DeviceInfo>> result = completionService.take();
        LogHelper.i(TAG, "result: " + result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

Это код класса:

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
*  This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
 *  from our FutureTask which returned from submit, even we cancel or interrupt it.
 *  Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
 */
public class MyCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

    private static class DoneFutureTask<V> extends FutureTask<V> {
        private Object outcome;

        DoneFutureTask(Callable<V> task) {
            super(task);
        }

        DoneFutureTask(Runnable task, V result) {
            super(task, result);
        }

        @Override
        protected void set(V v) {
            super.set(v);
            outcome = v;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            try {
                return super.get();
            } catch (CancellationException e) {
                return (V)outcome;
            }
        }

    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
            return new DoneFutureTask<V>(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            return new DoneFutureTask<V>(task, result);
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public MyCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public MyCompletionService(Executor executor,
                               BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
...