Когда я должен использовать CompletionService поверх ExecutorService? - PullRequest
70 голосов
/ 06 февраля 2011

Я только что нашел CompletionService в этом сообщении в блоге .Тем не менее, это на самом деле не демонстрирует преимущества CompletionService по сравнению со стандартным ExecutorService.Один и тот же код может быть написан с любым.Итак, когда полезна услуга CompletionService?

Можете ли вы привести пример короткого кода, чтобы сделать его кристально чистым?Например, этот пример кода просто показывает, где не требуется CompletionService (= эквивалент ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

Ответы [ 10 ]

141 голосов
/ 08 февраля 2011

Опуская много деталей:

  • ExecutorService = входящая очередь + рабочие потоки
  • CompletionService = входящая очередь + рабочие потоки + очередь вывода
90 голосов
/ 07 апреля 2011

С ExecutorService, после того как вы отправили задачи для запуска, вам нужно вручную написать код для эффективного получения результатов выполненных задач.

С CompletionService это в значительной степени автоматизировано. Разница не очень очевидна в представленном вами коде, потому что вы отправляете только одну задачу. Однако представьте, что у вас есть список задач для отправки. В приведенном ниже примере несколько задач передаются в CompletionService. Затем, вместо того, чтобы попытаться выяснить, какая задача выполнена (для получения результатов), он просто просит экземпляр CompletionService вернуть результаты, когда они станут доступны.

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }
10 голосов
/ 06 февраля 2011

Я думаю, что Javadoc лучше всего отвечает на вопрос, когда CompletionService полезен, а ExecutorService - нет.

Служба, которая отделяет производство новых асинхронных задачпотребление результатов выполненных задач.

По сути, этот интерфейс позволяет программе иметь производителей, которые создают и отправляют задачи (и даже проверяют результаты этих представлений), не зная ни о каких других потребителяхрезультаты этих задач.Между тем, потребители, которые знают о CompletionService, могут poll получить или take результаты, не зная, что производители отправляют задачи.

Для записи, и я могу ошибаться, потому что это скореепоздно, но я вполне уверен, что пример кода в этом посте ведет к утечке памяти.Без активного потребителя, получающего результаты из внутренней очереди ExecutorCompletionService, я не уверен, как блоггер ожидал, что эта очередь истечет.

9 голосов
/ 07 февраля 2011

В основном вы используете CompletionService, если хотите выполнить несколько задач параллельно, а затем работать с ними в порядке их завершения.Итак, если я выполню 5 заданий, CompletionService даст мне первое, что закончится.Пример, в котором есть только одна задача, не дает никакого дополнительного значения в течение Executor, кроме возможности отправить Callable.

4 голосов
/ 03 февраля 2012

Прежде всего, если мы не хотим тратить процессорное время, мы не будем использовать

while (!future.isDone()) {
        // Do some work...
}

Мы должны использовать

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

Плохая вещь в этом коде в том, что он отключится ExecutorService. Если мы хотим продолжить работу с ним (т. Е. У нас есть рекурсивное создание задачи), у нас есть две альтернативы: invokeAll или ExecutorService.

invokeAll будет ждать, пока все задачи не будут выполнены. ExecutorService дает нам возможность получать или опрашивать результаты один за другим.

И, наконец, рекурсивный пример:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}
1 голос
/ 31 июля 2015

Допустим, у вас есть 5 долго выполняющихся задач (вызываемая задача), и вы отправили их в службу выполнения.Теперь представьте, что вы не хотите ждать, пока все 5 заданий конкурируют, вместо этого вы хотите выполнить какую-то обработку для этого задания, если оно выполнится.Теперь это можно сделать, написав логику опроса для будущих объектов или используя этот API.

1 голос
/ 15 января 2013

Посмотрите сами во время выполнения, попробуйте внедрить оба решения (Executorservice и Completionservice), и вы увидите, как они ведут себя по-разному, и вам будет понятнее, когда использовать одно или другое. Вот пример, если вы хотите http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

0 голосов
/ 25 апреля 2019
package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

============================================================

package com.barcap.test.test00;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

===========================================================

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

======================================================================

разница журналов для службы завершения исполнителя: текущий поток - пул-1-поток-1, результат должен быть равен 1, текущий поток - пул-1-поток-2.результат должен быть 2 текущий поток является пул-1-поток-3 результат должен быть 3 текущий поток является пул-1-поток-4 результат должен быть 4 текущий поток является пул-1-поток-6 результат долженесли 6 текущий поток - пул-1-нить-5, то результат должен быть 5 текущий поток - пул-1-нить-7 - результат должен быть 7 текущий поток - пул-1-нить-9 - результат должен быть 9текущий поток - пул-1-нить-8, результат должен быть равен 8; задача выполнена для пула-1-нить-9; результат должен быть равен 9, а результат равен 9; задача выполнена для пула-1-нить-8:быть 8 задача соmplted для pool-1-thread-7 результат должен быть 7 задача выполнена для pool-1-thread-6 результат должен быть 6 задача завершена для pool-1-thread-5 результат должен быть 5 задача выполнена дляpool-1-thread-4 результат должен быть 4, задача, выполненная для pool-1-thread-3, результат должен быть 3

задача, завершенная для pool-1-thread-2, результат должен быть 2

текущий поток - пул-1-нить-1, результат должен быть равен 1, текущий поток - пул-1-нить-3, результат должен быть равен 3, текущий поток - пул-1-нить-2.результат должен быть 2 текущий поток - пул-1-нить-5 результат должен быть 5 текущий поток - пул-1-нить-4 результат должен быть 4 текущий поток - пул-1-нить-6 - результат долженесли 6 текущий поток - пул-1-нить-7, результат должен быть 7 текущий поток - пул-1-нить-8, результат должен быть 8 текущий поток - пул-1-нить-9, результат должен быть 9задача выполнена для pool-1-thread-9 результат должен быть 9 задача complted для pool-1-thread-8 результат должен быть 8 задача завершена для pool-1-thread-7 результат должен быть 7 задача завершена для pool-1-thread-6 результат должен быть 6 задача выполнена дляpool-1-thread-5 результат должен быть 5 задача выполнена для pool-1-thread-4 результат должен быть 4 задача завершена для pool-1-thread-3 результат должен быть 3 задача выполнена для pool-1-thread-2 результат должен быть 2 задача выполнена для пула-1-thread-1 результат должен быть 1 результат будущего равен 1

=============================================================

дляexecutorservice результат будет доступен только после выполнения всех задач.

executor completeservice любой доступный результат делает этот возврат.

0 голосов
/ 25 октября 2018

есть еще одно преимущество использования службы завершения: Производительность

, когда вы звоните future.get(), вы ожидаете вращения:

с java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

когда у вас есть долго выполняемая задача, это будет катастрофой для производительности.

с функцией completeservice, как только задача будет выполнена, ее результат будет поставлен в очередь, и вы можете опросить очередь с более низкимизбыточное быстродействие.

завершение обслуживания достигните этого, используя задачу обтекания с done ловушкой.

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
0 голосов
/ 11 июня 2018

Если производитель задачи не заинтересован в результатах, и другой компонент отвечает за обработку результатов асинхронной задачи, выполняемой службой-исполнителем, то вам следует использовать CompletionService. Это поможет вам отделить обработчик результата задачи от производителя задачи. Смотрите пример http://www.zoftino.com/java-concurrency-executors-framework-tutorial

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...