Java BlockingQueue с пакетной обработкой? - PullRequest
17 голосов
/ 27 февраля 2012

Меня интересует структура данных, идентичная Java BlockingQueue, за исключением того, что она должна иметь возможность пакетировать объекты в очереди.Другими словами, я бы хотел, чтобы производитель мог помещать объекты в очередь, но иметь блок потребителя на take(), пока очередь не достигнет определенного размера (размера пакета).

Затем один разочередь достигла размера пакета, производитель должен заблокировать put(), пока потребитель не израсходует все элементы в очереди (в этом случае производитель возобновит производство, и блок потребителя, пока партия не будет достигнута снова).

Существует ли подобная структура данных?Или я должен написать это (что я не против), я просто не хочу тратить свое время, если есть что-то там.


ОБНОВЛЕНИЕ

Может быть, немного прояснить ситуацию:

Ситуация всегда будет следующей.Может быть несколько производителей, добавляющих элементы в очередь, но никогда не будет более одного потребителя, берущего элементы из очереди.

Теперь проблема в том, что существует множество таких установок параллельно и последовательно.Другими словами, производители производят товары для нескольких очередей, а сами потребители также могут быть производителями.Это проще воспринимать как ориентированный граф производителей, потребителей-производителей и, наконец, потребителей.

Причина, по которой производители должны блокироваться до тех пор, пока очереди не опустеют (@Peter Lawrey), заключается в том, что каждый из них будетбыть в потокеЕсли вы оставите их для простого производства по мере появления свободного пространства, вы столкнетесь с ситуацией, когда у вас слишком много потоков, пытающихся обрабатывать слишком много вещей одновременно.

Возможно, соединение этого со службой выполнения может решить проблему.проблема?

Ответы [ 4 ]

13 голосов
/ 27 февраля 2012

Я бы предложил вам использовать BlockingQueue.drainTo (Collection, int) . Вы можете использовать его с take (), чтобы обеспечить минимальное количество элементов.

Преимущество использования этого подхода состоит в том, что размер вашей партии динамически увеличивается с рабочей нагрузкой, и производителю не нужно блокировать, когда потребитель занят. то есть самооптимизируется для задержки и пропускной способности.


Для реализации в точности так, как просили (что я считаю плохой идеей), вы можете использовать SynchronousQueue с занятым потребляющим потоком.

т.е. потребляющий поток делает

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

Производитель будет блокироваться, когда потребитель занят.

2 голосов
/ 05 июля 2012

Вот быстрая (= простая, но не полностью протестированная) реализация, которая, я думаю, может подойти для ваших запросов - вы должны иметь возможность расширить ее для поддержки интерфейса полной очереди, если вам нужно.

для повышения производительности вы можете переключиться на ReentrantLock вместо использования "синхронизированного" ключевого слова ..

public class BatchBlockingQueue<T> {

    private ArrayList<T> queue;
    private Semaphore readerLock;
    private Semaphore writerLock;
    private int batchSize;

    public BatchBlockingQueue(int batchSize) {
        this.queue = new ArrayList<>(batchSize);
        this.readerLock = new Semaphore(0);
        this.writerLock = new Semaphore(batchSize);
        this.batchSize = batchSize;
    }

    public synchronized void put(T e) throws InterruptedException {
        writerLock.acquire();
        queue.add(e);
        if (queue.size() == batchSize) {
            readerLock.release(batchSize);
        }
    }

    public synchronized T poll() throws InterruptedException {
        readerLock.acquire();
        T ret = queue.remove(0);
        if (queue.isEmpty()) {
            writerLock.release(batchSize);
        }
        return ret;
    }

}

Надеюсь, вы найдете его полезным.

1 голос
/ 05 июля 2012

Похоже, как RingBuffer работает в паттерне LMAX Disruptor. Подробнее см. http://code.google.com/p/disruptor/.

Очень грубое объяснение - ваша основная структура данных - это RingBuffer. Производители последовательно помещают данные в кольцевой буфер, и потребители могут извлекать столько данных, сколько производитель поместил в буфер (по сути, пакетирование). Если буфер заполнен, производитель блокируется, пока потребитель не завершит работу и не освободит слоты в буфере.

1 голос
/ 27 февраля 2012

Не то, что я знаю. Если я правильно понимаю, вы хотите, чтобы либо производитель работал (пока потребитель заблокирован), пока он не заполнил очередь, либо потребитель не работал (пока производитель блокирует), пока он не очистил очередь. Если это так, могу ли я предположить, что вам нужна не структура данных, а механизм для блокировки одной стороны, в то время как другая работает в режиме мьютекса. Вы можете заблокировать объект для этого и внутренне иметь логику того, является ли он полным или пустым, чтобы снять блокировку и передать ее другой стороне. Короче говоря, вы должны написать это сами:)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...