Итак, у меня есть BlockingQueue, который я заполняю данными (из многих тем). Я хочу собрать эти данные в группы, скажем, 1000, а затем передать их куда-то еще. Итак, я написал класс потока, который опрашивает конец очереди, и когда у него достаточно элементов, он отправляет агрегированные данные.
Я ожидал найти что-то в java.util.concurrent, чтобы помочь с этим. Единственный способ сделать это с помощью java.util.concurrent - сделать так, чтобы каждая вставка в очередь добавляла выполняемую задачу, которая затем добавлялась бы в набор агрегации, но для меня это действительно неэффективно.
С потоками, опрашивающими стратегию очереди, скажем, у меня есть 5 потоков, каждый поток может агрегировать в локальной памяти (порядок на самом деле не важен), а затем отключаться. Очередь и пункт назначения - единственные точки соприкосновения для борьбы - 1 поток может одновременно опросить очередь блокировки. Пункт назначения, вероятно, никогда не будет в споре.
При использовании подхода, основанного на задачах, с использованием Executor все потоки будут совместно использовать точку агрегации, так что они будут постоянно бороться, не говоря уже о том, что синхронизированные / одновременные варианты коллекций работают медленнее.
Кажется очевидным, что несколько потоков всегда опрашивают BlockingQueue. Недостатком является то, что теперь мне нужно написать все их запуски, остановки, мне нужно разобраться со случаем, когда поток умирает, и т. Д. Это все похоже на шаблон, который я ожидаю найти в java.util.concurrent или, возможно, apache библиотека.
Я действительно так далеко от резервации? Класс, чтобы всегда иметь запущенные потоки x и перезапустить их, если они потерпят неудачу? Есть ли другой очевидный (эффективный) подход, которого я просто не вижу?