Как использовать автоматическое управление потоками для реализации производителя / потребителя в Java - PullRequest
0 голосов
/ 30 сентября 2019

Мне нужно реализовать схему производитель / потребитель, в которой потребители по соображениям производительности пытаются обрабатывать множество рабочих элементов в одном пакете (каждый из них истощает рабочую очередь).

В настоящий момент я 'я просто создаю фиксированное количество идентичных рабочих, которые работают в одной и той же очереди в цикле. Поскольку некоторые из них могут умереть, мне нужно позаботиться о их замене.

Я бы хотел использовать fixedThreadPool для управления заменой потоков, но в моем случае не отображается схема Executor, как хотелось быдетализация производителя и потребителей не совпадают - только потребители могут собирать подходящую партию работы.

Каковы мои варианты управления пулом потоков (фиксированного размера), когда мои рабочие элементы не могут бытьвыражается в виде Runnables / Callables?

(В качестве альтернативы, могу ли я каким-то образом соблюсти мое требование по пакетированию произведенных рабочих элементов вместе и при этом иметь возможность использовать службу Executor?)

Ответы [ 2 ]

1 голос
/ 02 октября 2019

Один из подходов заключается в том, чтобы производители / потребители имели значение Runnable s и использовали BlockingQueue для передачи любых данных между ними.

Например, вот упрощенная реализация производителей, которые генерируют String элементов для queue, и потребителей, которые читают элементы в пакетах:

class ProducerConsumerPing {
    private static final class PingProducer implements Runnable {
        private final BlockingQueue<String> queue;

        PingProducer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        public void run() {
            while (true) {
                queue.offer("ping");
            }
        }
    }

    private static final class PingConsumer implements Runnable {

        private final BlockingQueue<String> queue;
        private final int batchSize;

        PingConsumer(BlockingQueue<String> queue, int batchSize) {
            this.queue = queue;
            this.batchSize = batchSize;
        }

        public void run() {
            while (true) {
                List<String> batch = new ArrayList<>();
                queue.drainTo(batch, batchSize);
                System.out.println("Consumed: " + batch);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService producers = Executors.newFixedThreadPool(10);
        ExecutorService consumers = Executors.newFixedThreadPool(10);
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        for (int i = 0; i < 10; i++) {
            producers.submit(new PingProducer(queue));
        }

        for (int i = 0; i < 10; i++) {
            consumers.submit(new PingConsumer(queue, 10));
        }

        producers.shutdown();
        producers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        consumers.shutdown();
        consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }
}

Примечания:

  • В примере я использую String в качестве work items, но, конечно, вы можете поставить любой Object в очередь

  • Потребительдозирование достигается с помощью BlockingQueue.drainTo (Collection, int)

0 голосов
/ 02 октября 2019

Я решил это, сохранив фактические рабочие элементы в BlockingQueue, но продюсер отправил задачу уведомления Исполнителю, который направляет рабочий поток для опустошения очереди. Когда задачи уведомлений начнут стоять в очереди, некоторые работники смогут получить несколько рабочих элементов из BlockingQueue, а некоторые не получат ни одного, что достаточно для моих целей пакетирования.

...