Я сомневаюсь, List
- хорошая структура данных для вашего варианта использования.Поскольку вы передаете данные между потоками, BlockingQueue
выглядит намного более естественным.
В качестве примера приведена реализация со следующими допущениями:
существует 1 box
продюсер, который:
1) генерирует box
со случайным groupId
в диапазоне от 1 до 3
2) помещает box
в один lane
есть 3 box
потребителей, и каждый из них:
1) получает box
от lane
2) потреблять коробку, если groupId
потребителя совпадает с рамкой groupId
дополнительные рамки не используются (только ядро Java)
import java.util.*;
import java.util.concurrent.*;
class Answer {
public static void main(String[] args) {
LinkedBlockingQueue<Box> lane = new LinkedBlockingQueue<>();
Producer p = new Producer(lane);
Consumer c1 = new Consumer(1, lane);
Consumer c2 = new Consumer(2, lane);
Consumer c3 = new Consumer(3, lane);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
new Thread(c3).start();
}
}
class Producer implements Runnable {
private final LinkedBlockingQueue<Box> lane;
Producer(LinkedBlockingQueue<Box> lane) {
this.lane = lane;
}
public void run() {
try {
while (true) {
lane.put(new Box(produceGroupId()));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt(); // set interrupt flag
}
}
int produceGroupId() {
// generate random int between 1 and 3
return ThreadLocalRandom.current().nextInt(1, 4);
}
}
class Consumer implements Runnable {
private final int groupId;
private final BlockingQueue<Box> lane;
Consumer(int groupId, BlockingQueue<Box> lane) {
this.groupId = groupId;
this.lane = lane;
}
public void run() {
while (true) {
Box box = lane.peek();
if (box != null && box.getGroupId() == this.groupId) {
consume(lane.poll());
}
}
}
void consume(Box box) {
System.out.println("Consumer " + groupId + " received " + box + " for proxessing.");
}
}
class Box {
private final int groupId;
public Box(int groupId) {
this.groupId = groupId;
}
public int getGroupId() {
return this.groupId;
}
@Override
public String toString() {
return "<Box " + groupId + ">";
}
}
Если цель состоит в том, чтобы иметь 3 отдельных полосы , то реализация будет немного другой:
import java.util.*;
import java.util.concurrent.*;
class Answer {
public static void main(String[] args) {
BlockingQueue<Box> lane1 = new LinkedBlockingQueue<Box>();
BlockingQueue<Box> lane2 = new LinkedBlockingQueue<Box>();
BlockingQueue<Box> lane3 = new LinkedBlockingQueue<Box>();
Map<Integer, BlockingQueue<Box>> lanes = new ConcurrentHashMap<Integer, BlockingQueue<Box>>();
lanes.put(1, lane1);
lanes.put(2, lane2);
lanes.put(3, lane3);
Producer p = new Producer(lanes);
Consumer c1 = new Consumer(1, lane1);
Consumer c2 = new Consumer(2, lane2);
Consumer c3 = new Consumer(3, lane3);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
new Thread(c3).start();
}
}
class Producer implements Runnable {
private final Map<Integer, BlockingQueue<Box>> lanes;
Producer(Map<Integer, BlockingQueue<Box>> lanes) {
this.lanes = lanes;
}
public void run() {
try {
while (true) {
int groupId = produceGroupId();
BlockingQueue<Box> lane = lanes.get(groupId);
lane.put(new Box(groupId));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
int produceGroupId() {
// generate random int between 1 and 3
return ThreadLocalRandom.current().nextInt(1, 4);
}
}
class Consumer implements Runnable {
private final int consumerId;
private final BlockingQueue<Box> lane;
Consumer(int consumerId, BlockingQueue<Box> lane) {
this.consumerId = consumerId;
this.lane = lane;
}
public void run() {
try {
while (true) {
consume(lane.take());
}
} catch (InterruptedException ex) {}
}
void consume(Box box) {
System.out.println("Consumer " + consumerId + " received " + box + " for proxessing.");
}
}
class Box {
private final int groupId;
public Box(int groupId) {
this.groupId = groupId;
}
public int getGroupId() {
return this.groupId;
}
@Override
public String toString() {
return "<Box " + groupId + ">";
}
}