Пул потоков, который связывает задачи для данного идентификатора с тем же потоком - PullRequest
16 голосов
/ 17 ноября 2011

Существуют ли реализации пула потоков (в Java), обеспечивающие выполнение всех задач с одним и тем же логическим идентификатором в одном потоке?

Логика, которую я использую, заключается в том, что если в определенном потоке уже выполняется задание для данного логического идентификатора, то новые задачи с таким же идентификатором планируются в том же потоке. Если нет потоков, выполняющих задачу для того же идентификатора, тогда можно использовать любой поток.

Это позволило бы выполнять задачи для несвязанных идентификаторов параллельно, но задачи для одного и того же идентификатора выполнялись последовательно и в представленном порядке.

Если нет, есть ли какие-либо предложения о том, как я могу расширить ThreadPoolExecutor, чтобы получить такое поведение (если это вообще возможно)?

UPDATE

Потратив больше времени на размышления об этом, я фактически не требую, чтобы задачи с одним и тем же логическим идентификатором выполнялись в одном потоке, просто чтобы они не выполнялись в одно и то же время.

Примером этого может служить система, которая обрабатывает заказы для клиентов, где можно обрабатывать несколько заказов одновременно, но не для одного и того же клиента (и все заказы для одного и того же клиента должны были быть обработаны в заказе ).

Подход, который я сейчас использую, состоит в том, чтобы использовать стандартный ThreadPoolExecutor с настроенным BlockingQueue, а также обернуть Runnable пользовательской оберткой. Логика оболочки Runnable:

  1. Атомная попытка добавить идентификатор в параллельный «запущенный» набор (ConcurrentHashMap), чтобы проверить, выполняется ли в данный момент задача с тем же идентификатором.
    • если добавить не удалось, переместите задание обратно в начало очереди и немедленно вернитесь
    • , если получится, продолжайте
  2. Запустить задачу
  3. Удалить связанный идентификатор задачи из набора «Выполнение»

Методы очереди poll() затем возвращают только те задачи, у которых есть идентификатор, которого в данный момент нет в наборе «выполняется».

Проблема в том, что я уверен, что будет много угловых случаев, о которых я даже не думал, поэтому потребуется много испытаний.

Ответы [ 6 ]

3 голосов
/ 08 июля 2015

Создайте массив служб-исполнителей, запускающих по одному потоку каждый, и назначьте им записи в очереди с помощью хэш-кода идентификатора вашего элемента. Массив может быть любого размера, в зависимости от того, сколько потоков вы хотите использовать.

Это ограничит использование нами из службы executor, но все же позволит использовать ее возможность для отключения единственного потока, когда он больше не нужен (с allowCoreThreadTimeOut(true)) и перезапускает его по мере необходимости. Кроме того, все вещи в очереди будут работать без перезаписи.

1 голос
/ 17 ноября 2011

Самая простая идея может быть такой:

Иметь фиксированную карту BlockingQueue с.Используйте механизм хеширования, чтобы выбрать очередь на основе идентификатора задачи.Алгоритм хеширования должен выбирать одну и ту же очередь для одинаковых идентификаторов.Запустите один отдельный поток для каждой очереди.каждый поток выберет одну задачу из собственной выделенной очереди и выполнит ее.

ps соответствующее решение сильно зависит от типа работы, которую вы назначаете потокам

ОБНОВЛЕНИЕ

Хорошо, как насчет этой безумной идеи, пожалуйста, потерпите меня:)

Скажем, у нас есть ConcurrentHashMap, который содержит ссылки id -> OrderQueue

ID1->Q1, ID2->Q2, ID3->Q3, ...

Значениечто теперь каждый id связан со своей собственной очередью.OrderQueue - это пользовательская очередь блокировки с дополнительным логическим флагом - isAssociatedWithWorkingThread.

Существует также обычный BlockingQueue, который мы сейчас назовем amortizationQueue, позже вы увидите, как он будет использоваться.

Далее у нас есть N рабочих потоков.Каждый рабочий поток имеет свою собственную рабочую очередь, которая представляет собой BlockingQueue, содержащий идентификаторы, связанные с этим потоком.

Когда приходит новый идентификатор, мы делаем следующее:

create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue

При обновлениидля существующего идентификатора мы делаем следующее:

pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
    put this OrderQueue to amortizationQueue

Каждый рабочий поток выполняет следующее:

take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue

Довольно просто.Теперь самое интересное - похищение работы:)

Если в какой-то момент какой-то рабочий поток оказывается с пустой рабочей очередью, то он делает следующее:

go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution

И там тоже+1 дополнительный поток, который обеспечивает работу по амортизации:

while (true)
    take next OrderQueue from amortizationQueue
    if queue is not empty and isAssociatedWithWorkingThread == false
         set isAssociatedWithWorkingThread=true
         pick any working thread and add the id to it's working queue

Придется потратить больше времени на размышления, если вам удастся избежать использования флага AtomicBoolean для isAssociatedWithWorkingThread или возникнет необходимость в блокировке операции дляотметьте / измените этот флаг.

0 голосов
/ 30 августа 2018

Мне нужно реализовать подобное решение, и предложение о создании массива служб-исполнителей к h22 кажется мне лучшим подходом с одним предупреждением, что я буду принимать модуль % идентификатора (либо необработанный идентификатор, предполагающийэто long / int или хэш-код) относительно некоторого желаемого максимального размера и использования этого результата в качестве нового идентификатора, чтобы у меня был баланс между тем, что в итоге получилось слишком много объектов службы исполнителя, при этом получая при этом достаточный объем параллелизмав обработке.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceRouter {

    private List<ExecutorService> services;
    private int size;

    public ExecutorServiceRouter(int size) {
        services = new ArrayList<ExecutorService>(size);
        this.size = size;
        for (int i = 0; i < size; i++) {
            services.add(Executors.newSingleThreadExecutor());
        }
    }

    public void route(long id, Runnable r) {
        services.get((int) (id % size)).execute(r);
    }

    public void shutdown() {
        for (ExecutorService service : services) {
            service.shutdown();
        }
    }

}
0 голосов
/ 08 июня 2016

Наш подход аналогичен тому, что есть в обновлении исходного вопроса.У нас есть класс-обертка, который является runnable, который содержит очередь (LinkedTransferQueue), которую мы называем RunnableQueue.Runnable очередь имеет базовый API:

public class RunnableQueue implements Runnable
{
  public RunnableQueue(String name, Executor executor);
  public void run();

  public void execute(Runnable runnable);
}

Когда пользователь отправляет первый Runnable через вызов execute, RunnableQueue ставит себя в очередь на исполнителя.Последующие вызовы для выполнения помещаются в очередь внутри RunnableQueue.Когда исполняемая очередь исполняется ThreadPool (через его метод run), он начинает «истощать» внутреннюю очередь, последовательно выполняя запускаемые функции одну за другой.Если во время выполнения вызывается execute для RunnableQueue, новые runnables просто добавляются во внутреннюю очередь.Как только очередь очищается, метод run запускаемой очереди завершается, и он «покидает» пул исполнителей.Ополаскивание, повтор.

У нас есть другие оптимизации, которые делают такие вещи, как позволяют только запускать некоторое количество исполняемых файлов (например, четыре) до того, как RunnableQueue повторно отправляет себя в пул исполнителей.

Единственная действительно сложная задачанемного внутри, и это не так сложно) это синхронизировать, когда он публикуется исполнителю или нет, чтобы он не перепечатывал, или пропускал, когда он должен публиковать.

В целом мы считаем, что это работаетдовольно хорошо.«ID» (семантический контекст) для нас - это работоспособная очередь.Потребность, которая у нас есть (т. Е. Плагин), имеет ссылку на RunnableQueue, а не пул исполнителей, поэтому он вынужден работать исключительно через RunnableQueue.Это не только гарантирует, что все обращения выполняются последовательно (ограничение потока), но и позволяет RunnableQueue «модерировать» загрузку работы плагина.Кроме того, он не требует централизованной структуры управления или других спорных вопросов.

0 голосов
/ 08 июля 2015

Мне недавно пришлось столкнуться с подобной ситуацией.

В итоге у меня получился дизайн, похожий на ваш.Единственным отличием было то, что «текущим» была карта, а не набор: карта от ID до очереди Runnables.Когда обертка вокруг запускаемого задания видит, что его идентификатор присутствует на карте, она добавляет выполняемое задание в очередь идентификатора и немедленно возвращается.В противном случае идентификатор добавляется на карту с пустой очередью, и задача выполняется.

Когда задача выполнена, оболочка снова проверяет очередь идентификатора.Если очередь не пуста, запускаемый выбирается.В противном случае он будет удален с карты, и мы закончили.

Я оставлю отключение и отмену в качестве упражнения для читателя:)

0 голосов
/ 17 ноября 2011

Расширение ThreadPoolExecutor было бы довольно сложно. Я бы предложил вам перейти на систему производитель-потребитель. Вот что я предлагаю.

  1. Вы можете создавать типовые потребительские системы производителей. Проверьте код, указанный в этот вопрос .
  2. Теперь у каждой из этих систем будет очередь и поток Single Consumer , который будет последовательно обрабатывать задачи в очереди
  3. Теперь создайте пул таких отдельных систем .
  4. Когда вы отправляете задачу для связанного идентификатора, посмотрите, существует ли уже система, помеченная для этого связанного идентификатора, которая в настоящее время обрабатывает задачи, если да, то отправьте задачи,
  5. Если он не обрабатывает какие-либо задачи, пометьте эту систему новым связанным идентификатором и отправьте задачу.
  6. Таким образом, одна система будет обслуживать только один логический идентификатор.

Здесь я предполагаю, что связанный идентификатор представляет собой логическую группу отдельных идентификаторов, и системы потребителей-производителей будут созданы для связанных идентификаторов, а НЕ отдельных идентификаторов.

...