Как отменить те задачи, которые занимают слишком много времени с помощью CompletionService - PullRequest
5 голосов
/ 10 марта 2011

Я отправляю некоторые задачи на будущее с использованием CompletionService, обернутого вокруг 2-х потоков FixedThreadPool ExecutorService, я устанавливаю, а затем устанавливаю цикл, равный количеству отправленных задач, и использую завершениеservice.take (), ожидая их завершения или сбоя.Проблема иногда возникает, но она никогда не заканчивается (но я не знаю почему), поэтому я изменил метод take () на опрос (300, Timeout.SECONDS), идея в том, что для выполнения одной задачи требуется более 5 минут.опрос завершится неудачно, а затем, в конце концов, выйдет из цикла, и я смогу пройти через все фьючерсы и вызвать future.cancel (true), чтобы принудительно отменить вызывающую проблему задачу.

Но когда я запускаю код, и онзависает, я вижу, что опрос не проходит постоянно каждые 5 минут, и больше задач не запускается, поэтому я предполагаю, что оба рабочих заблокированы каким-либо образом и никогда не завершаются, и никогда не позволяют запускать дополнительные задачи.Поскольку время ожидания составляет 5 минут, а для выполнения задачи по-прежнему оставалось 1000 задач, время, затраченное на прерывание цикла, было слишком длинным, поэтому я отменил задание.

Итак, я хочу выполнить прерывание / принудительную отмену текущей задачи.если не завершено в течение 5 минут, но я не вижу никакого способа сделать это.

Этот пример кода показывает упрощенную версию того, о чем я говорю

import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println("Worker TimedOut:");
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && result.get())
                        {
                            System.out.println("Worker Completed:");
                        }
                        else
                        {
                            System.out.println("Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace();
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println("Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
    {
        if(number==3)
        {
            try
            {
                Thread.sleep(50000);
            }
            catch(InterruptedException tie)
            {

            }
        }
        return true;
    }
}

Вывод

Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done

Ответы [ 3 ]

4 голосов
/ 10 марта 2011

Я думаю, что я решил это, в основном, если происходит тайм-аут, я перебираю свой список будущих объектов и нахожу первый, который еще не завершен, и принудительно отменяю.Не кажется элегантным, но, кажется, работает.

Я изменил размер пула, чтобы показать вывод, который лучше демонстрирует решение, но также работает с 2-поточным пулом.

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                System.out.println("Invocation:"+t);
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println(new Date()+":Worker Timedout:");
                    //So lets cancel the first futures we find that havent completed
                    for(Future future:futures)
                    {
                        System.out.println("Checking future");
                        if(future.isDone())
                        {
                            continue;
                        }
                        else
                        {
                            future.cancel(true);
                            System.out.println("Cancelled");
                            break;
                        }
                    }
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && !result.isCancelled() && result.get())
                        {
                            System.out.println(new Date()+":Worker Completed:");
                        }
                        else if(result.isDone() && !result.isCancelled() && !result.get())
                        {
                            System.out.println(new Date()+":Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace(System.out);
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println(new Date()+":Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
        throws InterruptedException
    {
        try
        {
            if(number==3)
            {
                Thread.sleep(50000);
            }
        }
        catch(InterruptedException ie)
        {
            System.out.println("Worker Interuppted");
            throw ie;
        }
        return true;
    }
}

Вывод

1007 *
2 голосов
/ 10 марта 2011

В вашем рабочем примере ваш Callable блокирует вызов, который поддерживает прерывание. Если ваш реальный код блокируется встроенной блокировкой (блок synchronized), вы не сможете отменить его через прерывание. Вместо этого вы можете использовать явную блокировку (java.util.concurrent.Lock), которая позволяет указать, как долго вы хотите ожидать получения блокировки. Если поток ожидает ожидания блокировки, возможно, из-за ситуации взаимоблокировки, он может прервать работу с сообщением об ошибке.

Кстати, в вашем примере ваш Callable не должен глотать InterruptedException. Вы должны либо передать его (пересобрать, либо добавить InterruptedException в строку выброса объявления вашего метода), либо в блоке catch сбросить прерванное состояние потока (с помощью Thread.currentThread().interrupt()).

1 голос
/ 02 октября 2012

Вы всегда можете позвонить future.get(timeout...)
Он вернет исключение тайм-аута, если он еще не закончился ... тогда вы можете позвонить future.cancel().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...