Написание собственного потребительского потока - что я не вижу? - PullRequest
2 голосов
/ 14 марта 2012

Итак, у меня есть BlockingQueue, который я заполняю данными (из многих тем). Я хочу собрать эти данные в группы, скажем, 1000, а затем передать их куда-то еще. Итак, я написал класс потока, который опрашивает конец очереди, и когда у него достаточно элементов, он отправляет агрегированные данные.

Я ожидал найти что-то в java.util.concurrent, чтобы помочь с этим. Единственный способ сделать это с помощью java.util.concurrent - сделать так, чтобы каждая вставка в очередь добавляла выполняемую задачу, которая затем добавлялась бы в набор агрегации, но для меня это действительно неэффективно.

С потоками, опрашивающими стратегию очереди, скажем, у меня есть 5 потоков, каждый поток может агрегировать в локальной памяти (порядок на самом деле не важен), а затем отключаться. Очередь и пункт назначения - единственные точки соприкосновения для борьбы - 1 поток может одновременно опросить очередь блокировки. Пункт назначения, вероятно, никогда не будет в споре.

При использовании подхода, основанного на задачах, с использованием Executor все потоки будут совместно использовать точку агрегации, так что они будут постоянно бороться, не говоря уже о том, что синхронизированные / одновременные варианты коллекций работают медленнее.

Кажется очевидным, что несколько потоков всегда опрашивают BlockingQueue. Недостатком является то, что теперь мне нужно написать все их запуски, остановки, мне нужно разобраться со случаем, когда поток умирает, и т. Д. Это все похоже на шаблон, который я ожидаю найти в java.util.concurrent или, возможно, apache библиотека.

Я действительно так далеко от резервации? Класс, чтобы всегда иметь запущенные потоки x и перезапустить их, если они потерпят неудачу? Есть ли другой очевидный (эффективный) подход, которого я просто не вижу?

Ответы [ 2 ]

3 голосов
/ 14 марта 2012

Попробуйте это.

public class Consumer<DATA> {

    private List<DATA> dataList = new ArrayList<DATA>();

    private ExecutorService threadPool = Executors.newFixedThreadPool(5);

    public synchronized void consume(DATA data) {

        dataList.add(data);

        if(dataList.size() >= 1000) {

            threadPool.submit(new ConsumerWorker(data));
        }
    }

}

Мы по существу накапливаем данные в контексте потока производителя, пока они не достигнут желаемого предела.Затем мы отправляем пакет данных в пул потоков, который будет ставить в очередь или выполнять ConsumerWorker (s) в зависимости от их доступности.Вы также можете настроить поведение пула потоков.Например, использование newCachedThreadPool () удалит неактивные потоки.

2 голосов
/ 14 марта 2012

Если бы я реализовал это, у меня просто был бы один поток, который вызывает take() (не опрос) в очереди блокировки, пока он не получит полный пакет, а затем передаст этот пакет вашему коду обработки. если логика пакетной обработки потенциально длинна, это, вероятно, будет отдельный пул рабочих потоков. Ваш пост длинный и в нем рассказывается о том, как отправлять runnables с каждым элементом (?), о различных предполагаемых спорных вопросах и о других вещах, за которыми я не следил полностью. не уверен, почему это должно быть более сложным, чем то, что я только что описал. (это будет использовать BlockingQueue и Executors из java.util.concurrent и не потребует прямого управления потоками).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...