ExecutorService - выполнять каждую задачу с определенным сроком - PullRequest
0 голосов
/ 07 ноября 2018

Я создаю ExecutorService для выполнения некоторых задач, выполнение которых в обычных условиях, как ожидается, займет одну минуту, но ни при каких обстоятельствах не должно выполняться более двух минут с момента запуска задачи .

Мой код выглядит следующим образом:

ExecutorService executorService = Executors.newFixedThreadPool(10);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();



for (String singleTask: taskList) { 
                futuresList.add(executorService.submit( new Runnable(){      
                       @Override
                       public void run(){
                        try {
                            performTask(p1, singleTask, p3);
                        } catch (IOException | InterruptedException | ExecutionException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                }
              }
         }));       

    }



for(Future<?> future : futures) {
    future.get(120, TimeUnit.SECONDS); 
}

Это будет блокировать до указанного времени ожидания, а затем двигаться дальше. Мои проблемы заключаются в следующем:

1) Если task1 блокируется в течение двух минут и task2 также блокируется в течение двух минут - тогда task2 будет "заблокирован" в общей сложности в течение 4 минут (поскольку future.get(120, TimeUnit.SECONDS); не вызывается для задачи 2, пока task1 заканчивает блокировку) - даже если обе задачи были отправлены и начали выполняться одновременно

2) Если я отправляю более 10 задач, задача 11+ может никогда не блокироваться в течение требуемого периода времени, , если предыдущие задачи не завершены к тому времени, когда future.get(120, TimeUnit.SECONDS); вызывается для 11-й задачи

Моя цель состоит в том, чтобы каждая отдельная задача выполнялась в течение максимум двух минут, независимо от количества задач в списке и независимо от того, сколько задач поставлено перед ним или после него.

Спасибо

Ответы [ 3 ]

0 голосов
/ 08 ноября 2018

ExecutorService#invokeAll может быть ключом здесь.

Вопрос немного сложен для понимания (может быть, даже , потому что вы пытались описать это точно? ;-)). Поэтому я создал пример, пытаясь обернуть вокруг него голову. Даже если это не , что вы намеревались, возможно, вы сможете объяснить, насколько это отличается от вашей цели, чтобы вопрос мог быть прояснен или другие могли написать лучший ответ.

В этом примере задачи создаются как Callable объекты, которые помещаются в список. Такой список можно передать на ExecutorService#invokeAll. (В вашем случае вы, вероятно, могли бы создать эти экземпляры из ваших Runnable задач с помощью Executors#callable). Всего создано 5 заданий. По умолчанию для выполнения каждой задачи требуется 2000 мс. Задача "C" нечетная и занимает 8000 мс. Максимальное время выполнения должно быть 5000 мс.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceLimitTaskTime
{
    private static Map<String, Long> taskSubmitMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskStartMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskFinishedMs = 
        new ConcurrentHashMap<String, Long>();

    public static void main(String[] args) throws Exception
    {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<String> tasks = Arrays.asList("A", "B", "C", "D", "E");

        List<Callable<String>> callables = new ArrayList<Callable<String>>();
        for (String task : tasks)
        {
            taskSubmitMs.put(task, System.currentTimeMillis());

            callables.add(new Callable<String>()
            {
                @Override
                public String call()
                {
                    taskStartMs.put(task, System.currentTimeMillis());

                    long durationMs = 2000;
                    if (task.equals("C"))
                    {
                        durationMs = 8000;
                    }

                    performTask(task, durationMs);
                    if (!Thread.currentThread().isInterrupted())
                    {
                        taskFinishedMs.put(task, System.currentTimeMillis());
                    }
                    return task;
                }
            });
        }

        List<Future<String>> futures = 
            executorService.invokeAll(callables, 5000, TimeUnit.MILLISECONDS);

        for (Future<String> future : futures)
        {
            try
            {
                future.get();
            }
            catch (CancellationException e) 
            {
                System.out.println("One task was cancelled");
            }
        }

        for (String task : tasks)
        {
            Long submitMs = taskSubmitMs.get(task);
            Long startMs = taskStartMs.get(task);
            Long finishedMs = taskFinishedMs.get(task);

            if (finishedMs != null)
            {
                long waitMs = startMs - submitMs;
                long runMs = finishedMs - startMs;
                long totalMs = finishedMs - submitMs;
                System.out.printf(
                    "Task %-3s waited %5d ms and ran %5d ms, total %5d ms\n", 
                    task, waitMs, runMs, totalMs);
            }
            else
            {
                System.out.printf(
                    "Task %-3s was cancelled\n", task);

            }
        }

    }

    private static void performTask(String task, long durationMs)
    {
        System.out.println("Executing " + task);
        try
        {
            Thread.sleep(durationMs);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
        }
        System.out.println("Executing " + task + " DONE");
    }

}

Резюме, напечатанное в конце, показывает этот результат:

Task A   waited    16 ms and ran  2002 ms, total  2018 ms
Task B   waited     3 ms and ran  2002 ms, total  2005 ms
Task C   was cancelled
Task D   waited  2005 ms and ran  2000 ms, total  4005 ms
Task E   waited  2005 ms and ran  2000 ms, total  4005 ms

Это показывает, что

  • Запуски, которые запускались немедленно, выполнялись в течение 2000 мс
  • Задачи, которые должны были ждать других, также выполнялись в течение 2000 мс (но всего 4000 мс)
  • Задание, которое заняло слишком много времени, было отменено через 5000 мс
0 голосов
/ 09 ноября 2018

ОК, я не уверен, что мой вопрос был полностью понят, но я попытаюсь ответить на свой вопрос с помощью решения, которое я придумал (это может помочь прояснить вопрос другим). Я думаю, что @Peter Lawrey избежал этого ответа, но ответ был слишком коротким, чтобы знать наверняка.

        int timeLimitOfIndividualTaskInSeconds = 120;
        int fixedThreadPoolCount = 10;

        ExecutorService executorService = Executors.newFixedThreadPool(fixedThreadPoolCount);
        ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();

        for (String singleTask: taskList) {

            futuresList.add(executorService.submit( new Runnable(){      
                @Override
                public void run(){
                    try {
                        executeTask(singleTask);
                    } catch (IOException | InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }));        

        }

        long beforeTimeInMilli = System.currentTimeMillis();
        long beforeTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeTimeInMilli);
        int counter = 0;

        long timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;

        for(Future<?> future : futuresList) {
            if (counter % fixedThreadPoolCount == 0) {

                // resets time limit to initial limit since next batch of tasks are beginning to execute
                timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;
            }

            try {
                future.get(timeoutInSeconds, TimeUnit.SECONDS);
            } catch (Exception e){
                e.printStackTrace();
                future.cancel(true); //stops the underlying task
            }

            counter++;

            long afterTimeInMilli = System.currentTimeMillis();
            long afterTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(afterTimeInMilli);

            long taskDurationInSeconds = afterTimeInSeconds - beforeTimeInSeconds;
            timeoutInSeconds = timeoutInSeconds - taskDurationInSeconds;

        }   

Это гарантирует две вещи:

1) Все задачи, которые были отправлены и , запущенные в одно и то же время (т. Е. "Та же партия"), будут выполняться в течение max из 120 секунд (но если какое-либо задание будет выполнено до 120 секунд, оно не продолжит блокировку)

2) Предыдущие задачи в том же пакете не будут приводить к выполнению последующих задач в этом пакете в течение более 120 секунд (поскольку мы вычитаем время выполнения предыдущих задач из значения времени ожидания в последующих задачах)

Я нахожу это простое и элегантное решение - но, конечно, я рад услышать от любого, кто может улучшить или прокомментировать это решение.

0 голосов
/ 07 ноября 2018

Вы можете потратить время с System.currentTimeMillis (); затем вы добавляете максимальное время, например, 120_000 миллисекунд. Когда вы ждете, вы вычитаете текущее время. То есть Вы только ждете, пока не будет достигнуто максимальное время.

...