Проблема производитель-потребитель с изюминкой - PullRequest
2 голосов
/ 30 апреля 2011
  • Производитель конечен, как и потребитель.
    • Проблема в том, когда остановиться, не как запустить.
  • Связь может происходить через любой тип BlockingQueue.
    • Нельзя полагаться на отравление очереди (PriorityBlockingQueue)
    • Нельзя полагаться на блокировку очереди (SynchronousQueue)
    • Не может полагаться исключительно на предложение / опрос (SynchronousQueue)
    • Возможно, существуют еще более экзотические очереди.

Создает очередь seq на другом (предположительно ленивом) seq s. В очереди seq создаст конкретный seq в фоновом режиме и может получить до n пунктов опережает потребителя. n-or-q может быть целым буфером n размер или экземпляр java.util.concurrent BlockingQueue. Заметка что чтение из последовательности может заблокировать, если читатель опережает производитель.

http://clojure.github.com/clojure/clojure.core-api.html#clojure.core/seque

Мои попытки до сих пор + некоторые тесты: https://gist.github.com/934781

Решения в Java или Clojure приветствуются.

Ответы [ 2 ]

0 голосов
/ 01 мая 2011

Боюсь, не совсем ответ, но несколько замечаний и больше вопросов.Мой первый ответ будет: используйте clojure.core/seque.Производителю необходимо как-то сообщить конец последовательности, чтобы потребитель знал, когда остановиться, и я предполагаю, что количество произведенных элементов заранее неизвестно.Почему вы не можете использовать маркер EOS (если это то, что вы подразумеваете под отравлением очереди)?

Если я правильно понимаю вашу альтернативную реализацию seque, она сломается, когда элементы будут удалены из очереди за пределами вашей функции, поскольку channel и q в этом случае будут не в шаге: канал будет содержать больше элементов #(.take q), чем элементов в q, что приведет к его блокировке.Могут быть способы обеспечения того, чтобы channel и q всегда были в шаге, но это, вероятно, потребовало бы реализации вашего собственного класса Queue, и это добавляет столько сложности, что я сомневаюсь, что оно того стоит.Кроме того, ваша реализация не делает различий между нормальным EOS и ненормальным завершением очереди из-за прерывания потока - в зависимости от того, что вы используете, вы можете узнать, что есть что.Лично мне не нравится использовать исключения таким способом - использовать исключения для исключительных ситуаций, а не для нормального управления потоком.

0 голосов
/ 30 апреля 2011
class Reader {

    private final ExecutorService ex = Executors.newSingleThreadExecutor();
    private final List<Object> completed = new ArrayList<Object>();
    private final BlockingQueue<Object> doneQueue = new LinkedBlockingQueue<Object>();
    private int pending = 0;

    public synchronized Object take() {
        removeDone();
        queue();
        Object rVal;
        if(completed.isEmpty()) {
            try {
                rVal = doneQueue.take();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            pending--;
        } else {
            rVal = completed.remove(0);
        }
        queue();
        return rVal;
    }

    private void removeDone() {
        Object current = doneQueue.poll();
        while(current != null) {
            completed.add(current);
            pending--;
            current = doneQueue.poll();
        }
    }

    private void queue() {
        while(pending < 10) {
            pending++;
            ex.submit(new Runnable() {

                @Override
                public void run() {
                    doneQueue.add(compute());
                }

                private Object compute() {
                    //do actual computation here
                    return new Object();
                }
            });
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...