Вот задача: Создать общий класс ArrayBlockingQueue с помощью 2 методов: - put (E e) Вставляет указанный элемент в конец этой очереди, ожидая, когда пространство станет доступным, если очередь заполнена.- take () - Извлекает и удаляет заголовок этой очереди, ожидая при необходимости, пока элемент не станет доступным.Использование методов wait и notify / notifyAll.Емкость должна быть параметром для конструктора класса.Создайте основную программу, которая принимает число потребителей в качестве аргумента времени выполнения.Создавайте темы - Потребитель и Производитель.Когда программа запускается, потоки источника должны совпадать с аргументом, передаваемым программе.Поток производителя должен читать из stdin и отправлять данные в очередь, используя метод put.Потребители должны взять данные и распечатать их в формате «Поток-1 - <данные>»
ВАЖНО: Пожалуйста, использование следующих классов недопустимо - Пулы потоков, Исполнители, Возможность вызова, Выполнение, Будущее
Я не знаю, почему что-то не так с разделом кода ...
До сих пор я делал следующее: Создал класс ArrayBlockingQueue, где я реализую методы put (E e) и take ().Class Producer помещает элементы в очередь, а Class Consumer извлекает элементы из очереди.
Я установил емкость очереди на 5 и добавляю 8 элементов в очередь один за другим.Таким образом, я проверяю, освободит ли потребитель место для следующего элемента.Когда я помещаю все 8 элементов в очередь, я беру все остальные элементы из очереди и распечатываю их.
Проблема в том, что я не знаю, как сделать N - количество производителей и N - количество потребителей.
import java.util.LinkedList;
public class ArrayBlockingQueue<E> {
private boolean printAll;
private Object lock = new Object(); // Alternative I can use ArrayList.class.wait() and ArrayList.class.notifyAll();
private Integer capacity;
private LinkedList<E> queue = new LinkedList<>();
// The capacity should be a parameter to the class constructor
ArrayBlockingQueue(Integer capacity) {
this.capacity = capacity;
}
void put(E element) throws InterruptedException {
// Do not put elements in the queue if it`s full
while (capacity == queue.size()) {
synchronized (lock) {
lock.wait();
}
}
queue.add(element);
synchronized (lock) {
lock.notify();
}
}
E take() throws InterruptedException {
E element;
// Do not take elements while queue is not full
while ((queue.isEmpty() || queue.size() < capacity) && !printAll) {
synchronized (lock) {
lock.wait();
}
}
// make this thread wait / Else remove() will try to take from an empty queue
if (queue.isEmpty() && printAll){
synchronized (lock) {
lock.wait();
}
}
element = queue.remove();
synchronized (lock) {
lock.notify();
}
return element;
}
void waitThread() throws InterruptedException {
synchronized (lock) {
lock.wait();
}
}
synchronized int getQueueSize (){
return queue.size();
}
void setPrintAll(boolean printAll) {
this.printAll = printAll;
}
Integer getCapacity() {
return capacity;
}
}
import java.util.Scanner;
public class Producer extends Thread {
private ArrayBlockingQueue<Integer> queue;
// Assigning the queue from main so that
// Prod and Cons work with the same resources
Producer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}
public synchronized void run() {
// The size of the queue is 5 elements [declared in main] here I am trying to put 8
// so the producer HAS to wait for the Consumer to take some elements
// in order to put a new element
System.out.println("\nBlocking queue capacity [" + queue.getCapacity() + "] Trying to add 8 elements.");
try {
Scanner scanner = new Scanner(System.in);
for (int i = 0; i < 8; i++) {
System.out.println(Thread.currentThread().getName() + "] Enter element:");
Integer inputElement = scanner.nextInt();
queue.put(inputElement);
System.out.println("Queue size [" + queue.getQueueSize() + "]");
}
queue.setPrintAll(true);
queue.waitThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer extends Thread {
private ArrayBlockingQueue<Integer> queue;
// Assigning the queue from main so that
// Prod and Cons work with the same resources [same queue]
Consumer(ArrayBlockingQueue<Integer> queue){
this.queue = queue;
}
public synchronized void run(){
try {
while (true) {
System.out.println(Thread.currentThread().getName() + " - data<" + queue.take() + ">");
System.out.println("Queue size [" + queue.getQueueSize() + "]");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class MainApp {
public static void main(String[] args) throws InterruptedException {
if (args.length == 0) {
throw new RuntimeException("No program arguments detected. Operation aborted.");
}
// Create a main program that accepts number of consumers as runtime argument.
Long numberOfConsumers = Long.valueOf(args[0]);
// Create ArrayBlockingQueue with capacity of 5 elements
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Creating new thread Producer and passing the queue
// Creating new thread Consumer and passing the queue
Producer producerOne = new Producer(queue);
Consumer consumerOne = new Consumer(queue);
producerOne.start();
consumerOne.start();
producerOne.join();
consumerOne.join();
}
}
Это мои результаты сейчас.Все отлично работает с 1 производителем и 1 потребителем.Как сделать несколько плюсов и минусов.
Blocking queue capacity [5] Trying to add 8 elements.
Thread-0] Enter element:
10
Queue size [1]
Thread-0] Enter element:
20
Queue size [2]
Thread-0] Enter element:
30
Queue size [3]
Thread-0] Enter element:
40
Queue size [4]
Thread-0] Enter element:
50
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<10>
Queue size [4]
60
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<20>
Queue size [4]
70
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<30>
Queue size [4]
80
Queue size [5]
Thread-1 - data<40>
Queue size [4]
Thread-1 - data<50>
Queue size [3]
Thread-1 - data<60>
Queue size [2]
Thread-1 - data<70>
Queue size [1]
Thread-1 - data<80>
Queue size [0]