Самая простая идея может быть такой:
Иметь фиксированную карту BlockingQueue
с.Используйте механизм хеширования, чтобы выбрать очередь на основе идентификатора задачи.Алгоритм хеширования должен выбирать одну и ту же очередь для одинаковых идентификаторов.Запустите один отдельный поток для каждой очереди.каждый поток выберет одну задачу из собственной выделенной очереди и выполнит ее.
ps соответствующее решение сильно зависит от типа работы, которую вы назначаете потокам
ОБНОВЛЕНИЕ
Хорошо, как насчет этой безумной идеи, пожалуйста, потерпите меня:)
Скажем, у нас есть ConcurrentHashMap
, который содержит ссылки id -> OrderQueue
ID1->Q1, ID2->Q2, ID3->Q3, ...
Значениечто теперь каждый id
связан со своей собственной очередью.OrderQueue
- это пользовательская очередь блокировки с дополнительным логическим флагом - isAssociatedWithWorkingThread
.
Существует также обычный BlockingQueue
, который мы сейчас назовем amortizationQueue
, позже вы увидите, как он будет использоваться.
Далее у нас есть N
рабочих потоков.Каждый рабочий поток имеет свою собственную рабочую очередь, которая представляет собой BlockingQueue
, содержащий идентификаторы, связанные с этим потоком.
Когда приходит новый идентификатор, мы делаем следующее:
create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue
При обновлениидля существующего идентификатора мы делаем следующее:
pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
put this OrderQueue to amortizationQueue
Каждый рабочий поток выполняет следующее:
take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue
Довольно просто.Теперь самое интересное - похищение работы:)
Если в какой-то момент какой-то рабочий поток оказывается с пустой рабочей очередью, то он делает следующее:
go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution
И там тоже+1 дополнительный поток, который обеспечивает работу по амортизации:
while (true)
take next OrderQueue from amortizationQueue
if queue is not empty and isAssociatedWithWorkingThread == false
set isAssociatedWithWorkingThread=true
pick any working thread and add the id to it's working queue
Придется потратить больше времени на размышления, если вам удастся избежать использования флага AtomicBoolean
для isAssociatedWithWorkingThread
или возникнет необходимость в блокировке операции дляотметьте / измените этот флаг.