Кажется, вы используете мьютекс, а не семафор?
При использовании мьютекса у вас есть только двоичная синхронизация - блокировка и разблокировка одного ресурса.Семафоры имеют значение, которое вы можете сигнализировать или приобретать.
Вы пытаетесь заблокировать / разблокировать весь буфер, но это неправильный путь, потому что, как вы видите, блокирует либо производитель, либо потребитель, икогда считыватель заблокировал его, производитель не может заполнить буфер (потому что он должен сначала заблокировать).
Вместо этого следует создать семпафор, а затем, когда производитель записывает один пакет или блок данных, он может сигнализироватьсемафор.Затем потребители могут пытаться приобрести семафор, поэтому они будут ждать, пока производитель не сообщит, что пакет записан.При сигнале записанного пакета один из потребителей будет разбужен, и он будет знать, что он может прочитать один пакет.Он может прочитать пакет, а затем вернуться к попытке получения на семафор.Если за это время производитель написал другой пакет, он снова подал сигнал, и тогда один из потребителей продолжит читать другой пакет.Etc ...
Например:
(Производитель) - Написать один пакет - Semaphore.release (1)
(Потребитель xN) - Semaphore.acquire (1)- Прочитать один пакет
Если у вас несколько потребителей, то потребители (не производитель) должны блокировать буфер при чтении пакета (но не при получении семафора) для предотвращения гонки.В приведенном ниже примере производитель также блокирует список, поскольку все находятся на одной и той же JVM.
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
public class Semaphores {
static Object LOCK = new Object();
static LinkedList list = new LinkedList();
static Semaphore sem = new Semaphore(0);
static Semaphore mutex = new Semaphore(1);
static class Consumer extends Thread {
String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
try {
while (true) {
sem.acquire(1);
mutex.acquire();
System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
mutex.release();
}
} catch (Exception x) {
x.printStackTrace();
}
}
}
static class Producer extends Thread {
public void run() {
try {
int N = 0;
while (true) {
mutex.acquire();
list.add(new Integer(N++));
mutex.release();
sem.release(1);
Thread.sleep(500);
}
} catch (Exception x) {
x.printStackTrace();
}
}
}
public static void main(String [] args) {
new Producer().start();
new Consumer("Alice").start();
new Consumer("Bob").start();
}
}