Вы приобретаете и отпускаете ReentrantLock
с lock()
и unlock()
, а не с synchronized
.
Как указывает Антониосс, это тупик, когда один поток ожидает блокировки, которую другой никогда не снимет (в то время как оба пытаются координировать эту блокировку). Вот одно из решений:
import static java.util.Objects.requireNonNull;
import java.util.LinkedList;
import java.util.List;
class Producer implements Runnable {
private final List<String> data;
Producer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
counter++;
data.add("writing:: " + counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
return;
}
}
}
}
}
}
class Consumer implements Runnable {
private final List<String> data;
Consumer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
synchronized (data) {
if (data.size() > 0) {
System.out.println("reading:: " + data.get(data.size() - 1));
data.remove(data.size() - 1);
}
data.notify();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> data = new LinkedList<>();
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
Мы покончили с объектом блокировки и синхронизируемся с самим списком. Производитель синхронизируется внутри цикла while, что приводит к тому, что как только в списке будет хотя бы 5 элементов, производитель будет ждать, передавая монитор в список.
Потребитель также синхронизируется внутри цикла, что очень важно, поскольку в вашем коде он никогда не отказывался от блокировки монитора после того, как получил его. На самом деле, если бы вы сначала начали потребителя (или стали очень невезучими), ничего бы не было произведено. Монитор освобождается при выходе из синхронизированного блока или метода или когда поток удерживает монитор wait()
s , но notify()
и notifyAll()
не отпускайте монитор.
Потребитель читает последний элемент, если таковой имеется, а затем немедленно уведомляет производителя и снимает блокировку. Две заметки:
Во-первых, неясно, какой порядок вы ожидаете получить. Ожидаете ли вы, что производитель произведет 5, потребитель будет потреблять 5 и т. Д., Или вы хотите, чтобы 5 просто был пределом, поэтому слишком большое отставание не может образоваться (это хорошо, это называется противодавлением), но потребитель потреблять предметы с нетерпением, когда они доступны? Эта реализация делает последнее.
Во-вторых, потребитель пытается получить доступ к монитору в списке, как только он его выпустил. Это форма занятого ожидания , тогда потребитель и производитель мчатся, чтобы получить блокировку, и может случиться так, что потребитель часто выигрывает эту гонку, которая становится бессмысленной, когда список пуст. Возможно, было бы целесообразно сделать вызов onSpinWait
вне синхронизированного блока, но внутри цикла while в потребителе, в Java 9 или более поздней версии. В более ранних версиях yield
может быть подходящим. Но в моих тестах код работает без них.
Антониоосс сделал еще одно предложение - использовать LinkedBlockingQueue, но код в том виде, в котором он стоит, всегда занимает последний элемент, и использование очереди изменило бы это поведение. Вместо этого мы можем использовать deque (двустороннюю очередь), помещая элементы в конец, а также снимая их с конца. Вот как это выглядит:
import static java.util.Objects.requireNonNull;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
class Producer implements Runnable {
private final BlockingDeque<String> data;
Producer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
counter++;
try {
data.put("writing:: " + counter);
} catch (InterruptedException e) {
break;
}
}
}
}
class Consumer implements Runnable {
private final BlockingDeque<String> data;
Consumer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
try {
System.out.println("reading:: " + data.takeLast());
} catch (InterruptedException e) {
break;
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
Поскольку LinkedBlockingDeque
является параллельной структурой данных, нам не нужны никакие синхронизированные блоки или ожидание или уведомление здесь. Мы можем просто попытаться put
и takeLast
из deque, и он заблокирует, если deque заполнен или пуст, соответственно. Deque создается с мощностью 5, поэтому он применяет противодавление к производителю, если производитель когда-либо продвигается так далеко, как оригинал.
Ничто не мешает производителю производить элементы так быстро, как их может потреблять потребитель, а это означает, что первым элементам, возможно, придется ждать сколь-нибудь долгое время для их потребления. Мне не ясно, было ли это целью вашего кода. Есть способы, которыми вы могли бы достичь этого, либо введя wait()
и notify()
снова, используя Semaphore
s, либо другими способами, но я оставлю это так, поскольку неясно, вы даже этого хотели.
Последнее замечание по InterruptedException
. Это произойдет, если кто-то вызовет interrupt()
в потоке, но единственное, что удерживает ссылки на потоки производителя и потребителя, это метод main()
, и он никогда не прерывает их. Таким образом, исключение не должно происходить здесь, но в случае, если это так или иначе происходит, у меня просто есть выход производителя или потребителя. В более сложном сценарии прерывание потока может использоваться в качестве способа сигнализации о нем, если он спит, или в методе блокировки (или даже вне его, если он явно проверяет его), но мы не используем это здесь.