Спасибо @Ivan и @ Andreas.
@ Иван - В его комментарии я понял, как ведет себя шаблон Producer Consumer.@Andreas - в своем комментарии предложено использование Phaser.(Вместо этого я использовал Cyclic Barrier, поскольку количество зарегистрированных потоков не меняется динамически)
В обоих комментариях приведен пример кода ниже.Пожалуйста, предлагайте импровизацию, если есть какой-либо или лучший способ справиться с этим.
Основной класс
public static void main(String[] args)
{
SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
new Thread(new Producer(sharedSpace)).start();
Consumer consumerRunnable = new Consumer(sharedSpace);
new Thread(consumerRunnable).start();
CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
}
Производитель
private SharedSpace sharedSpace;
public Producer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
@Override
public void run() {
for(int i=0;i<3;i++)
{
int value = (int) (Math.random()*30);
sharedSpace.addValue(value);
}
}
Очередь, общая для производителя и потребителя
private BlockingQueue<Integer> queue;
public SharedSpace(BlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
public BlockingQueue<Integer> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void addValue(int value)
{
try {
queue.put(value);
System.out.println(System.nanoTime()+" Producer added value "+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getValue() throws InterruptedException
{
return queue.take();
}
Потребитель
private SharedSpace sharedSpace;
private Integer value;
public Consumer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
public Integer getValue() {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
@Override
public void run()
{
try {
setValue(sharedSpace.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
EndUser
CyclicBarrier barrier;
Consumer consumer;
public EndUser(CyclicBarrier barrier) {
super();
this.barrier = barrier;
}
public EndUser(CyclicBarrier barrier, Consumer consumer) {
super();
this.barrier = barrier;
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public CyclicBarrier getBarrier() {
return barrier;
}
public void setBarrier(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try
{
while(true)
{
System.out.println(consumer.getValue());
barrier.await();
}
}
catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
Вывод [Потребитель не читает из Producer, пока все EndUser не получили свои данные]
Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0