Производитель Потребитель, использующий блокировку Reentrant, не работает - PullRequest
0 голосов
/ 05 июля 2018

Я пытаюсь реализовать потребителя-производителя, используя ReentrantLock, как указано ниже:

class Producer implements Runnable {

    private List<String> data;
    private ReentrantLock lock;

    Producer(List<String> data,ReentrantLock lock)
    {
        this.data = data;
        this.lock = lock;
    }

    @Override
    public void run() {
        int counter = 0;
        synchronized (lock)
        {
            while (true)
            {

                if ( data.size() < 5)
                {
                    counter++;
                    data.add("writing:: "+counter);
                }
                else
                {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}

class Consumer implements Runnable{

    private List<String> data;
    private ReentrantLock lock;

    Consumer(List<String> data,ReentrantLock lock)
    {
        this.data = data;
        this.lock = lock;
    }

    @Override
    public void run() {
        int counter = 0;
        synchronized (lock)
        {
            while (true)
            {

                if ( data.size() > 0)
                {
                    System.out.println("reading:: "+data.get(data.size()-1));
                    data.remove(data.size()-1);
                }
                else
                {
                    System.out.println("Notifying..");
                    lock.notify();
                }
            }
        }

    }
}

public class ProducerConsumer  {

    public static void main(String[] args) {
        List<String> str = new LinkedList<>();
        ReentrantLock lock= new ReentrantLock();
        Thread t1 = new Thread(new Producer(str,lock));
        Thread t2 = new Thread(new Consumer(str,lock));
        t1.start();
        t2.start();
    }
}

Таким образом, он записывает в список только один раз, а затем Потребитель ждет бесконечно. Почему это происходит? Почему не продюсер получает блокировку?

Ответы [ 4 ]

0 голосов
/ 05 июля 2018

Я хочу указать на две сделанные вами ошибки:

  1. Неправильное использование ReentrantLock.

Каждый объект может быть использован для внутренней блокировки, поэтому нет необходимости искать определенный класс Lock. Так как каждый synchronized блок ограничен одним методом, и вы не требуете какого-либо расширенного средства , ReentrantLock здесь избыточно.

  1. Неправильная позиция блоков synchronized.

Как только вы войдете в синхронизированный блок, никто не сможет войти туда, пока вы не покинете его. Очевидно, вы никогда не выйдете из него из-за while(true).

Я бы посоветовал вам удалить ReentrantLock s и выполнить синхронизацию на data.

@Override
public void run() {
    int counter = 0;
    while (true) {
        synchronized (data) {
            if (data.size() < 5) {
                data.add("writing:: " + ++counter);
            } else {
                try {
                    data.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // we are out of the synchornized block here
        // to let others use data as a monitor somewhere else
    }
}
0 голосов
/ 05 июля 2018

Поскольку вы хотите просто Producer для непрерывного получения значений и Consumer для непрерывного использования этих значений и, в конечном итоге, ожидания получения значений, так как их нет, пропустите при синхронизации и блокировке и используйте LinkedBlockingQueue

В продукции вы просто:

queue.put(value)

А у потребителя вы делаете

value=queue.take();

вуаля, потребитель будет принимать значения, если они есть, и будет ждать, пока очередь значений пуста.

Что касается вашего кода:

  1. Вы вообще не используете ReentrantLock. Вы можете заменить его на new Object, и результат будет таким же. wait и notify - это методы Object.
  2. Причина, по которой вам удается производить только одну ценность, заключается в вашем потребителе. По сути, у вас есть что-то вроде этого:

    synchronized(lock){ // aquire lock's monitor
    
    while(true){
    
      lock.notify();
     }
    
    } // release lock's monitor - never doing that, thread never leaves this block
    

Проблема здесь в том, что вы никогда не покидаете синхронизированный блок . Итак, вы звоните notify, но не отпускаете монитор lock, выходя из блока synchronized. Производитель «уведомлен», но для продолжения выполнения он должен получить монитор lock, но может, так как потребитель никогда не выпускает его. Здесь почти классический тупик.

Вы можете себе представить, что в комнате есть пара человек, и только тот, у кого есть палка, может говорить. Итак, первый получает палку с syncronized(stick). Сделайте то, что он должен сделать, и решил, что он должен wait для кого-то другого, поэтому он звонит wait и передает флешку обратно. Теперь второй человек может говорить, выполнять свою работу и решает, что тот, кто дал ему палку, может продолжить. Он звонит notify - теперь он должен пройти палку, оставив блок synchronized(stick). Если он этого не сделает, первый человек не сможет продолжить - это то, что происходит в вашем случае.

0 голосов
/ 05 июля 2018

@ Антониосс прав. Вы не используете ReentrantLock правильно, и вместо этого вы можете просто заменить его на объект. Если вы хотите использовать ReentrantLock вместо этого (который более актуален), я бы предложил что-то вроде:

package Multithreading;


import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Producer implements Runnable{

  private List<String> data;
  private ReentrantLock lock;

  Producer(List<String> data,ReentrantLock lock)
  {
    this.data = data;
    this.lock = lock;
  }

  @Override
  public void run() {
    int counter = 0;
    while (true)
    {


        try {
          lock.lock();
          if ( data.size() < 5)
          {
            counter++;
            data.add("writing:: " + counter);
          }
        }finally {
          lock.unlock();
        }

      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

class Consumer implements Runnable{

  private List<String> data;
  private ReentrantLock lock;

  Consumer(List<String> data,ReentrantLock lock)
  {
    this.data = data;
    this.lock = lock;
  }

  @Override
  public void run() {
    int counter = 0;
    while (true)
    { 
      try {
        lock.lock();
          if ( data.size() > 0) 
          {
            System.out.println("reading:: "+data.get(data.size()-1));
            data.remove(data.size()-1);
          }
       }finally {
         lock.unlock();
       }
       try 
       {
          TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
      }
    }
  }
}

public class ProducerConsumer  {

  public static void main(String[] args) {
    List<String> str = new LinkedList<>();
    ReentrantLock lock= new ReentrantLock();
    Thread t1 = new Thread(new Producer(str,lock));
    Thread t2 = new Thread(new Consumer(str,lock));
    t1.start();
    t2.start();
  }
}

Я удалил вызовы уведомлений, но если вам действительно нужно пускать один поток за раз, просто используйте lock.notify ()

0 голосов
/ 05 июля 2018

Вы приобретаете и отпускаете ReentrantLock с lock() и unlock(), а не с synchronized.

Как указывает Антониосс, это тупик, когда один поток ожидает блокировки, которую другой никогда не снимет (в то время как оба пытаются координировать эту блокировку). Вот одно из решений:

import static java.util.Objects.requireNonNull;

import java.util.LinkedList;
import java.util.List;

class Producer implements Runnable {
    private final List<String> data;

    Producer(List<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        int counter = 0;
        while (true) {
            synchronized (data) {
                if (data.size() < 5) {
                    counter++;
                    data.add("writing:: " + counter);
                } else {
                    try {
                        data.wait();
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        }
    }
}

class Consumer implements Runnable {
    private final List<String> data;

    Consumer(List<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        while (true) {
            synchronized (data) {
                if (data.size() > 0) {
                    System.out.println("reading:: " + data.get(data.size() - 1));
                    data.remove(data.size() - 1);
                }
                data.notify();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        List<String> data = new LinkedList<>();
        Thread t1 = new Thread(new Producer(data));
        Thread t2 = new Thread(new Consumer(data));
        t1.start();
        t2.start();
    }
}

Мы покончили с объектом блокировки и синхронизируемся с самим списком. Производитель синхронизируется внутри цикла while, что приводит к тому, что как только в списке будет хотя бы 5 элементов, производитель будет ждать, передавая монитор в список.

Потребитель также синхронизируется внутри цикла, что очень важно, поскольку в вашем коде он никогда не отказывался от блокировки монитора после того, как получил его. На самом деле, если бы вы сначала начали потребителя (или стали очень невезучими), ничего бы не было произведено. Монитор освобождается при выходе из синхронизированного блока или метода или когда поток удерживает монитор wait() s , но notify() и notifyAll() не отпускайте монитор.

Потребитель читает последний элемент, если таковой имеется, а затем немедленно уведомляет производителя и снимает блокировку. Две заметки:

Во-первых, неясно, какой порядок вы ожидаете получить. Ожидаете ли вы, что производитель произведет 5, потребитель будет потреблять 5 и т. Д., Или вы хотите, чтобы 5 просто был пределом, поэтому слишком большое отставание не может образоваться (это хорошо, это называется противодавлением), но потребитель потреблять предметы с нетерпением, когда они доступны? Эта реализация делает последнее.

Во-вторых, потребитель пытается получить доступ к монитору в списке, как только он его выпустил. Это форма занятого ожидания , тогда потребитель и производитель мчатся, чтобы получить блокировку, и может случиться так, что потребитель часто выигрывает эту гонку, которая становится бессмысленной, когда список пуст. Возможно, было бы целесообразно сделать вызов onSpinWait вне синхронизированного блока, но внутри цикла while в потребителе, в Java 9 или более поздней версии. В более ранних версиях yield может быть подходящим. Но в моих тестах код работает без них.

Антониоосс сделал еще одно предложение - использовать LinkedBlockingQueue, но код в том виде, в котором он стоит, всегда занимает последний элемент, и использование очереди изменило бы это поведение. Вместо этого мы можем использовать deque (двустороннюю очередь), помещая элементы в конец, а также снимая их с конца. Вот как это выглядит:

import static java.util.Objects.requireNonNull;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

class Producer implements Runnable {
    private final BlockingDeque<String> data;

    Producer(BlockingDeque<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        int counter = 0;
        while (true) {
            counter++;
            try {
                data.put("writing:: " + counter);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

class Consumer implements Runnable {
    private final BlockingDeque<String> data;

    Consumer(BlockingDeque<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("reading:: " + data.takeLast());
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
        Thread t1 = new Thread(new Producer(data));
        Thread t2 = new Thread(new Consumer(data));
        t1.start();
        t2.start();
    }
}

Поскольку LinkedBlockingDeque является параллельной структурой данных, нам не нужны никакие синхронизированные блоки или ожидание или уведомление здесь. Мы можем просто попытаться put и takeLast из deque, и он заблокирует, если deque заполнен или пуст, соответственно. Deque создается с мощностью 5, поэтому он применяет противодавление к производителю, если производитель когда-либо продвигается так далеко, как оригинал.

Ничто не мешает производителю производить элементы так быстро, как их может потреблять потребитель, а это означает, что первым элементам, возможно, придется ждать сколь-нибудь долгое время для их потребления. Мне не ясно, было ли это целью вашего кода. Есть способы, которыми вы могли бы достичь этого, либо введя wait() и notify() снова, используя Semaphore s, либо другими способами, но я оставлю это так, поскольку неясно, вы даже этого хотели.

Последнее замечание по InterruptedException. Это произойдет, если кто-то вызовет interrupt() в потоке, но единственное, что удерживает ссылки на потоки производителя и потребителя, это метод main(), и он никогда не прерывает их. Таким образом, исключение не должно происходить здесь, но в случае, если это так или иначе происходит, у меня просто есть выход производителя или потребителя. В более сложном сценарии прерывание потока может использоваться в качестве способа сигнализации о нем, если он спит, или в методе блокировки (или даже вне его, если он явно проверяет его), но мы не используем это здесь.

...