Потребитель не работает в моем простом коде производителя / потребителя / очереди в Java - PullRequest
0 голосов
/ 14 декабря 2018

Я пытаюсь реализовать простую систему производителя / потребителя в Java 11. По сути, я беру два потока для каждого плюс глобальную очередь, просто следующим образом:

  • Очередь с глобальным приоритетом.
  • Первый поток, производитель, запускает HTTP-сервер, прослушивает входящие HTTP-сообщения и, получив сообщение, pushes отправляет его в очередь (с шагом queue.size)
  • Второй поток, потребитель, непрерывно peeks очередь.Если есть задание (job ! = null), отправляет HTTP-запрос куда-то и при успешном получении опрашивает его из очереди (queue.size() с уменьшением).

Скелет выглядит так:

Основной класс:

public class Manager
{
    private Consumer consumer;
    private Producer producer;
    Queue queue;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Класс производителя:

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
            //HTTP server starts, listens, and adds to the queue upon receiving a Job
            server.start();
            Manager.queue.add(new Job());
    }
}

Класс потребителя:

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
    // Thread.sleep(1);

        while(true)
        {
            //get an object off the queue
            Job job= Manager.queue.peek();
            //do some stuff with the object
        }
    }
}

Producer и queue Работы - всехорошо.Но проблема с Consumer.Код потребителя выше (с циклом while(true)) не просматривает элемент.Но когда я добавляю цикл Thread.sleep(x) перед while(true), даже если x=1 ms, он работает и успешно захватывает элемент.

В чем проблема?Теоретически, цикл while(true) не должен быть проблемой!Почему не видно и peek предмет?!

Ответы [ 2 ]

0 голосов
/ 15 декабря 2018

PriorityQueue является не поточно-ориентированным, тогда как PriorityBlockingQueue является .Пока вы не используете какие-либо методы, определенные в интерфейсе BlockingQueue, эти две реализации являются взаимозаменяемыми.Простое изменение PriorityQueue на PriorityBlockingQueue должно решить вашу проблему.

0 голосов
/ 14 декабря 2018

Причина проблемы: несинхронизированное чтение и запись из очереди и в очередь.

Здесь происходит то, что оба потока, работающие на разных ядрах ЦП, работают со своей собственной копией очереди,таким образом, производитель может добавлять материал, и эти изменения, вероятно, даже распространяются в ОЗУ, но потребитель никогда ничего не проверяет в ОЗУ, поскольку у него есть собственная кэшированная копия этой очереди, ведьма остается пустой.

Thread.sleep() все работает, потому что при пробуждении поток должен получить все данные из ОЗУ, где он, вероятно, изменился.

Правильный способ сделать это - получить доступ к очереди только при синхронизации по ней какследует:

у производителя:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
}

и у потребителя:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        Job job=Manager.queue.pop();
    }
}

И, наконец, последний штрих: while (true) - все невероятно неэффективно, выможет сделать что-нибудь, используя Object.wait() и Object.notify()

В производителе:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
     Manager.queue.notify();
}

и в Потребителе:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        while (Manager.queue.peek() == null) {
            Manager.queue.wait();
        }
        Job job=Manager.queue.pop();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...