Обеспечение порядка выполнения задач в пуле потоков - PullRequest
48 голосов
/ 25 августа 2011

Я читал о шаблоне пула потоков, и я не могу найти обычное решение для следующей проблемы.

Иногда я хочу, чтобы задачи выполнялись последовательно. Например, я читаю куски текста из файла и по какой-то причине мне нужно обрабатывать куски в таком порядке. Поэтому я хочу исключить параллелизм для некоторых задач .

Рассмотрим этот сценарий, когда задачи с * необходимо обрабатывать в том порядке, в котором они были вставлены. Другие задачи можно обрабатывать в любом порядке.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

В контексте пула потоков, без этого ограничения, одиночная очередь ожидающих задач работает нормально, но здесь явно нет.

Я думал о том, чтобы некоторые из потоков работали в определенной для потока очереди, а другие - в "глобальной" очереди. Затем, чтобы последовательно выполнить некоторые задачи, мне просто нужно поместить их в очередь, в которую смотрит один поток. Это делает звучит немного неуклюже.

Итак, настоящий вопрос в этой длинной истории: как бы вы решили это? Как бы вы обеспечили заказ этих задач ?

EDIT

В качестве более общей проблемы предположим, что приведенный выше сценарий становится

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

Я имею в виду, что задачи внутри группы должны выполняться последовательно, но сами группы могут смешиваться. Например, вы можете иметь 3-2-5-4-7.

Еще одна вещь, на которую следует обратить внимание, - это то, что у меня нет доступа ко всем задачам в группе заранее (и я не могу дождаться, пока все они придут, прежде чем начать группу).

Спасибо за ваше время.

Ответы [ 17 ]

2 голосов
/ 31 августа 2011

Это достижимо, ну, насколько я понимаю, ваш сценарий.В основном вам нужно сделать что-то умное, чтобы координировать свои задачи в главном потоке.Java API: ExecutorCompletionService и Callable

Сначала реализуйте свою вызываемую задачу:

public interface MyAsyncTask extends Callable<MyAsyncTask> {
  // tells if I am a normal or dependent task
  private boolean isDependent;

  public MyAsyncTask call() {
    // do your job here.
    return this;
  }
}

Затем в основном потоке используйте CompletionServiceкоординировать выполнение зависимой задачи (т.е. механизм ожидания):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new 
  ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
  if (task.isNormal()) {
    // if it is a normal task, submit it immediately.
    completionExecutor.submit(task);
  } else {
    if (dependentFutureTask == null) {
      // submit the first dependent task, get a reference 
      // of this dependent task for later use.
      dependentFutureTask = completionExecutor.submit(task);
    } else {
      // wait for last one completed, before submit a new one.
      dependentFutureTask.get();
      dependentFutureTask = completionExecutor.submit(task);
    }
  }
}

При этом вы используете одного исполнителя (размер пула потоков 5) для выполнения как обычных, так и зависимых задач, обычные задачи выполняются сразу же, как толькокак представлено, зависимые задачи выполняются одна за другой (ожидание выполняется в главном потоке с помощью вызова get () для Future перед отправкой новой зависимой задачи), поэтому в любой момент времени у вас всегда есть несколько обычных задач и одназависимая задача (если существует), выполняющаяся в одном пуле потоков.

Это только начало, используя ExecutorCompletionService, FutureTask и Semaphore, вы можете реализовать более сложный сценарий координации потоков.

1 голос
/ 01 сентября 2011

Поскольку вам нужно только дождаться завершения одной задачи, прежде чем запускать зависимую задачу, это легко сделать, если вы запланируете зависимую задачу в первой задаче. Итак, во втором примере: в конце задачи 2 запланируйте задачу 7 а также в конце задачи 3 запланируйте задачу 4 и т. д. для 4-> 6 и 6-> 8.

В начале, просто запланируйте задачи 1,2,5,9 ... и остальные должны следовать.

Еще более общая проблема заключается в том, что вам приходится ждать несколько задач, прежде чем можно будет запустить зависимую задачу. Эффективное управление - нетривиальное упражнение.

1 голос
/ 01 сентября 2011

У вас есть два разных вида задач. Смешивание их в одной очереди кажется довольно странным. Вместо одной очереди есть две. Для простоты вы можете использовать ThreadPoolExecutor для обоих. Для последовательных задач просто установите фиксированный размер 1, для задач, которые могут выполняться одновременно, - больше. Я не понимаю, почему это было бы неуклюже. Держите это простым и глупым. У вас две разные задачи, поэтому относитесь к ним соответственно.

1 голос
/ 25 августа 2011

Как бы вы обеспечили заказ этих задач?

push task1
push task2
push task346
push task5

В ответ на редактирование:

push task1
push task27   **
push task3468   *
push task5
push task9
0 голосов
/ 20 октября 2017

Было много ответов, и, очевидно, один из них был принят. Но почему бы не использовать продолжения?

Если у вас есть известное «серийное» условие, то, когда вы ставите в очередь первую задачу с этим условием, удерживайте задачу; и для дальнейших задач вызывайте Task.ContinueWith ().

public class PoolsTasks
{
    private readonly object syncLock = new object();
    private Task serialTask = Task.CompletedTask;


    private bool isSerialTask(Action task) {
        // However you determine what is serial ...
        return true;
    }

    public void RunMyTask(Action myTask) {
        if (isSerialTask(myTask)) {
            lock (syncLock)
                serialTask = serialTask.ContinueWith(_ => myTask());
        } else
            Task.Run(myTask);
    }
}
0 голосов
/ 18 сентября 2018

Пул потоков с упорядоченными и неупорядоченными методами выполнения:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OrderedExecutor {
    private ExecutorService multiThreadExecutor;
    // for single Thread Executor
    private ThreadLocal<ExecutorService> threadLocal = new ThreadLocal<>();

    public OrderedExecutor(int nThreads) {
        this.multiThreadExecutor = Executors.newFixedThreadPool(nThreads);
    }

    public void executeUnordered(Runnable task) {
        multiThreadExecutor.submit(task);
    }

    public void executeOrdered(Runnable task) {
        multiThreadExecutor.submit(() -> {
            ExecutorService singleThreadExecutor = threadLocal.get();
            if (singleThreadExecutor == null) {
                singleThreadExecutor = Executors.newSingleThreadExecutor();
                threadLocal.set(singleThreadExecutor);
            }
            singleThreadExecutor.submit(task);
        });
    }

    public void clearThreadLocal() {
        threadLocal.remove();
    }

}

После заполнения всех очередей threadLocal должен быть очищен. Единственным недостатком является то, что singleThreadExecutor будет создаваться каждый раз, когда метод

executeOrdered (запускаемая задача)

вызывается в отдельном потоке

0 голосов
/ 25 августа 2016

Специально для этой цели существует Java-инфраструктура, которая называется dexecutor (отказ от ответственности: я владелец)

DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("task1", "task2");
    executor.addDependency("task4", "task6");
    executor.addDependency("task6", "task8");

    executor.addIndependent("task3");
    executor.addIndependent("task5");
    executor.addIndependent("task7");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

task1, task3, task5, task7 выполняется параллельно (в зависимости отразмер пула потоков), как только задача 1 завершается, задача 2 запускается, как задача 2 завершает выполнение задачи 4, как только задача 4 завершает выполнение задачи 6 и, наконец, когда задача 6 завершает выполнение задачи 8.

...