Как реализовать несколько производителей и потребителей? - Джава - PullRequest
0 голосов
/ 28 июня 2019

Вот задача: Создать общий класс ArrayBlockingQueue с помощью 2 методов: - put (E e) Вставляет указанный элемент в конец этой очереди, ожидая, когда пространство станет доступным, если очередь заполнена.- take () - Извлекает и удаляет заголовок этой очереди, ожидая при необходимости, пока элемент не станет доступным.Использование методов wait и notify / notifyAll.Емкость должна быть параметром для конструктора класса.Создайте основную программу, которая принимает число потребителей в качестве аргумента времени выполнения.Создавайте темы - Потребитель и Производитель.Когда программа запускается, потоки источника должны совпадать с аргументом, передаваемым программе.Поток производителя должен читать из stdin и отправлять данные в очередь, используя метод put.Потребители должны взять данные и распечатать их в формате «Поток-1 - <данные>»

ВАЖНО: Пожалуйста, использование следующих классов недопустимо - Пулы потоков, Исполнители, Возможность вызова, Выполнение, Будущее

Я не знаю, почему что-то не так с разделом кода ...

До сих пор я делал следующее: Создал класс ArrayBlockingQueue, где я реализую методы put (E e) и take ().Class Producer помещает элементы в очередь, а Class Consumer извлекает элементы из очереди.

Я установил емкость очереди на 5 и добавляю 8 элементов в очередь один за другим.Таким образом, я проверяю, освободит ли потребитель место для следующего элемента.Когда я помещаю все 8 элементов в очередь, я беру все остальные элементы из очереди и распечатываю их.

Проблема в том, что я не знаю, как сделать N - количество производителей и N - количество потребителей.

import java.util.LinkedList;

public class ArrayBlockingQueue<E> {
    private boolean printAll;
    private Object lock = new Object(); //  Alternative I can use ArrayList.class.wait() and ArrayList.class.notifyAll();
    private Integer capacity;
    private LinkedList<E> queue = new LinkedList<>();

    //  The capacity should be a parameter to the class constructor
    ArrayBlockingQueue(Integer capacity) {
        this.capacity = capacity;
    }

    void put(E element) throws InterruptedException {
        //  Do not put elements in the queue if it`s full
        while (capacity == queue.size()) {
            synchronized (lock) {
                lock.wait();
            }
        }
        queue.add(element);
        synchronized (lock) {
            lock.notify();
        }
    }

    E take() throws InterruptedException {
        E element;
        //  Do not take elements while queue is not full
        while ((queue.isEmpty() || queue.size() < capacity) && !printAll) {
            synchronized (lock) {
                lock.wait();
            }
        }
        //  make this thread wait / Else remove() will try to take from an empty queue
        if (queue.isEmpty() && printAll){
            synchronized (lock) {
                lock.wait();
            }
        }
        element = queue.remove();
        synchronized (lock) {
            lock.notify();
        }
        return element;
    }

    void waitThread() throws InterruptedException {
        synchronized (lock) {
            lock.wait();
        }
    }

    synchronized int getQueueSize (){
        return queue.size();
    }

    void setPrintAll(boolean printAll) {
        this.printAll = printAll;
    }

    Integer getCapacity() {
        return capacity;
    }

}




import java.util.Scanner;

public class Producer extends Thread {
    private ArrayBlockingQueue<Integer> queue;

    //  Assigning the queue from main so that
    //  Prod and Cons work with the same resources
    Producer(ArrayBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    public synchronized void run() {
        //  The size of the queue is 5 elements [declared in main] here I am trying to put 8
        //  so the producer HAS to wait for the Consumer to take some elements
        //  in order to put a new element
        System.out.println("\nBlocking queue capacity [" + queue.getCapacity() + "] Trying to add 8 elements.");
        try {
            Scanner scanner = new Scanner(System.in);
            for (int i = 0; i < 8; i++) {
                System.out.println(Thread.currentThread().getName() + "] Enter element:");
                Integer inputElement = scanner.nextInt();
                queue.put(inputElement);
                System.out.println("Queue size [" + queue.getQueueSize() + "]");
            }

            queue.setPrintAll(true);
            queue.waitThread();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}


public class Consumer extends Thread {
    private ArrayBlockingQueue<Integer> queue;

    //  Assigning the queue from main so that
    //  Prod and Cons work with the same resources [same queue]
    Consumer(ArrayBlockingQueue<Integer> queue){
        this.queue = queue;
    }
    public synchronized void run(){
        try {
            while (true) {
                System.out.println(Thread.currentThread().getName() + " - data<" + queue.take() + ">");
                System.out.println("Queue size [" + queue.getQueueSize() + "]");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


public class MainApp {
    public static void main(String[] args) throws InterruptedException {
        if (args.length == 0) {
            throw new RuntimeException("No program arguments detected. Operation aborted.");
        }
        //  Create a main program that accepts number of consumers as runtime argument.
        Long numberOfConsumers = Long.valueOf(args[0]);
        //  Create ArrayBlockingQueue with capacity of 5 elements
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        //  Creating new thread Producer and passing the queue
        //  Creating new thread Consumer and passing the queue

        Producer producerOne = new Producer(queue);
        Consumer consumerOne = new Consumer(queue);


        producerOne.start();
        consumerOne.start();

        producerOne.join();
        consumerOne.join();
    }
}

Это мои результаты сейчас.Все отлично работает с 1 производителем и 1 потребителем.Как сделать несколько плюсов и минусов.

Blocking queue capacity [5] Trying to add 8 elements.
Thread-0] Enter element:
10
Queue size [1]
Thread-0] Enter element:
20
Queue size [2]
Thread-0] Enter element:
30
Queue size [3]
Thread-0] Enter element:
40
Queue size [4]
Thread-0] Enter element:
50
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<10>
Queue size [4]
60
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<20>
Queue size [4]
70
Queue size [5]
Thread-0] Enter element:
Thread-1 - data<30>
Queue size [4]
80
Queue size [5]
Thread-1 - data<40>
Queue size [4]
Thread-1 - data<50>
Queue size [3]
Thread-1 - data<60>
Queue size [2]
Thread-1 - data<70>
Queue size [1]
Thread-1 - data<80>
Queue size [0]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...