Очередь с несколькими потребителями потребляет очередь FIFO в целом - PullRequest
0 голосов
/ 09 июня 2019

Поскольку я пытаюсь изучить многопоточную часть программирования JAVA, у меня возникает следующая проблема при работе с One Producer - Multiple Consumer code.

То, чего я пытаюсь добиться, - это то, что несколько потребительских потоков выводят элементы из очереди в порядке их размещения в очереди. другими словами, заставьте потребительские потоки поддерживать FIFO в целом.

final BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

Runnable rb = new Runnable() {
    public void run() {
        try {
            System.out.println(deque.takeLast());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
};

deque.putFirst("a");
deque.putFirst("b");
deque.putFirst("c");
deque.putFirst("d");

ExecutorService pool = Executors.newFixedThreadPool(4);
pool.submit(rb);
pool.submit(rb);
pool.submit(rb);
pool.submit(rb);

НА ЧТО Я ИЩУ: б с д

ЧТО ЭТО НАСТОЯЩИМ РЕЗУЛЬТАТАМ: б с д

ИЛИ в случайном порядке

Какие-нибудь простые решения для решения этой проблемы? Спасибо!

1 Ответ

1 голос
/ 10 июня 2019

В вашем случае проблема в том, что

System.out.println(deque.takeLast());

на самом деле две инструкции, которые вместе не являются атомарными.Представьте себе такой сценарий:

  1. Поток 1 получает строку из очереди.
  2. Поток 2 принимает строку из очереди.
  3. Поток 2 печатает значение.
  4. Поток 1 выводит значение.

Так что все зависит от того, как операционная система будет управлять выполнением потоков.

В вашем случае одним из возможных решений было бы добавить ключевое слово synchronized к run method:

Runnable rb = new Runnable() {
    public synchronized void run() {
         try {
              String s = deque.takeLast();
              System.out.println(s);
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
    }
};

Это будет синхронизироваться с экземпляром анонимного класса, который вы создали здесь.Так как вы передаете тот же runnable ExecutorService - он должен работать.Или вы можете синхронизировать ваш queue объект, поскольку ваш runnable, который имеет доступ к объекту очереди, будет выполняться во многих потоках, так как вы передали его в ExecutorService:

Runnable rb = new Runnable() {
    public void run() {
        synchronized (deque) {
             try {
                 String s = deque.takeLast();
                 System.out.println(s);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        }
    }
};

Также помните о закрытииваш пул потоков, потому что теперь ваше приложение никогда не выйдет.

...