Как решить производителя-потребителя с помощью семафоров? - PullRequest
1 голос
/ 27 ноября 2011

Мне нужно закодировать проблему, аналогичную производителю-потребителю, который должен использовать семафоры. Я попробовал пару решений, и ни одно из них не сработало. Сначала я попробовал решение в Википедии, но оно не сработало. Мой текущий код выглядит примерно так:

Метод прогона потребителя:

    public void run() {
    int i=0;
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    String s = new String();
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        try {
            Thread.sleep(1000);///10000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.encheBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i] == null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            QuantidadeBuffer.quantidade--;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            int identificador;
            identificador=buffer[i].getIdentificador()[0];
            s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
            //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            buffer[i]= null;
        }
        // RC
        this.mutex.up();
        //this.esvaziaBuffer.up();
        System.out.println(s);
  //            lock.up();
    }
}

Метод прогона производителя:

    public void run() {
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    int i=0;
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        // Produz Item
        try {
            Thread.sleep(500);//50000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.esvaziaBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i]!=null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            int identificador[]=new int[Pedido.getTamanho_identificador()];
            identificador[0]=i;
            buffer[i]=new Pedido();
            Produtor.buffer[i].setIdentificador(identificador);
            Produtor.buffer[i].setTexto("pacote de dados");
            QuantidadeBuffer.quantidade++;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            i++;
        }
        // RC
        this.mutex.up();
        //this.encheBuffer.up();
    }
    //this.encheBuffer.up();
}

В приведенном выше коде потребительский поток прочитал позицию, а затем другой поток прочитал ту же позицию без заполнения производителем позиции, что-то вроде этого:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1

Ответы [ 4 ]

9 голосов
/ 28 ноября 2011

Кажется, вы используете мьютекс, а не семафор?

При использовании мьютекса у вас есть только двоичная синхронизация - блокировка и разблокировка одного ресурса.Семафоры имеют значение, которое вы можете сигнализировать или приобретать.

Вы пытаетесь заблокировать / разблокировать весь буфер, но это неправильный путь, потому что, как вы видите, блокирует либо производитель, либо потребитель, икогда считыватель заблокировал его, производитель не может заполнить буфер (потому что он должен сначала заблокировать).

Вместо этого следует создать семпафор, а затем, когда производитель записывает один пакет или блок данных, он может сигнализироватьсемафор.Затем потребители могут пытаться приобрести семафор, поэтому они будут ждать, пока производитель не сообщит, что пакет записан.При сигнале записанного пакета один из потребителей будет разбужен, и он будет знать, что он может прочитать один пакет.Он может прочитать пакет, а затем вернуться к попытке получения на семафор.Если за это время производитель написал другой пакет, он снова подал сигнал, и тогда один из потребителей продолжит читать другой пакет.Etc ...

Например:

(Производитель) - Написать один пакет - Semaphore.release (1)

(Потребитель xN) - Semaphore.acquire (1)- Прочитать один пакет

Если у вас несколько потребителей, то потребители (не производитель) должны блокировать буфер при чтении пакета (но не при получении семафора) для предотвращения гонки.В приведенном ниже примере производитель также блокирует список, поскольку все находятся на одной и той же JVM.

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Semaphores {

    static Object LOCK = new Object();

    static LinkedList list = new LinkedList();
    static Semaphore sem = new Semaphore(0);
    static Semaphore mutex = new Semaphore(1);

    static class Consumer extends Thread {
        String name;
        public Consumer(String name) {
            this.name = name;
        }
        public void run() {
            try {

                while (true) {
                    sem.acquire(1);
                    mutex.acquire();
                    System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                    mutex.release();
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    static class Producer extends Thread {
        public void run() {
            try {

                int N = 0;

                while (true) {
                    mutex.acquire();
                    list.add(new Integer(N++));
                    mutex.release();
                    sem.release(1);
                    Thread.sleep(500);
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        new Producer().start();
        new Consumer("Alice").start();
        new Consumer("Bob").start();
    }
}
0 голосов
/ 11 апреля 2019

Одним из наиболее распространенных шаблонов использования многопоточных приложений является создание асинхронной сети связи. Несколько реальных приложений требуют этого. Есть 2 способа достижения этого: -

  1. Производитель и потребитель тесно связаны. Это не асинхронно, и каждый производитель ждет потребителя и наоборот. Пропускная способность приложения также становится минимальной из двух объектов. Как правило, это никогда не хороший дизайн.
  2. Лучший (и более сложный) способ сделать это - ввести общий буфер между производителем и потребителем. Таким образом, более быстрый производитель или более быстрый потребитель не ограничиваются из-за более медленного аналога. Он также позволяет нескольким производителям и нескольким потребителям подключаться через общий буфер.

enter image description here

Существует несколько способов создания шаблона «производитель-потребитель».

  1. Использование wait / notify / nofityAll, которое было описано в предыдущем модуле "Основы блокировки"
  2. Использование API, предоставляемого Java - java.util.concurrent.BlockingQueue. Мы расскажем об этом подробнее в следующем модуле.
  3. Использование семафоров: это очень удобный способ создания шаблона производитель-потребитель.

    public class ProducerConsumerSemaphore {
    
    private static final int BUFFER_SIZE = 10;
    private static final int MAX_VALUE = 10000;
    private final Stack<Integer> buffer = new Stack<Integer>();
    private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
    private final Semaphore readPermits = new Semaphore(0);
    private final Random random = new Random();
    
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                writePermits.acquireUninterruptibly();
                buffer.push(random.nextInt(MAX_VALUE));
                readPermits.release();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                readPermits.acquireUninterruptibly();
                System.out.println(buffer.pop());
                writePermits.release();
            }
        }
    }
    
    public static void main(String[] args) {
    
        ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
        Producer p1 = obj.new Producer();
        Producer p2 = obj.new Producer();
        Producer p3 = obj.new Producer();
        Consumer c1 = obj.new Consumer();
        Consumer c2 = obj.new Consumer();
        Consumer c3 = obj.new Consumer();
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        Thread t3 = new Thread(p3);
        Thread t4 = new Thread(c1);
        Thread t5 = new Thread(c2);
        Thread t6 = new Thread(c3);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();
    }
    

Мы используем 2 семафора - 1 для потребителей и 1 для производителей.

Количество разрешений, разрешенных для производителя, установлено на максимальный размер буфера.

Каждый производитель использует 1 разрешение на запись и выпускает 1 разрешение на чтение при создании 1 сообщения.

Каждый потребитель потребляет 1 разрешение на чтение и выпускает 1 разрешение на запись для каждого сообщения.

Представьте себе разрешение на копилку на фактическое сообщение. Разрешение на запись передается от производителя к потребителю (и обратно к производителю). Разрешение на чтение передается от Потребителя к Производителю (и обратно к Потребителю). Общее количество сообщений в буфере в любой данный момент времени будет точно равно количеству выданных разрешений на чтение. Если скорость создания сообщений больше, чем скорость потребления сообщений, то в определенный момент количество доступных разрешений на запись будет исчерпано, и все потоки производителя будут заблокированы, пока потребитель не прочитает из буфера и не освободит разрешение на запись. Та же логика существует и наоборот.

enter image description here

Выше приведена более наглядная артикуляция потока сообщений и разрешений в системе. Используя семафоры, мы только абстрагируем кровавые детали и заботу, необходимую для написания фрагмента кода, используя wait / notify / notifyAll. Приведенный выше код можно сравнить с wait et. Аль подход:

Когда поток заблокирован из-за отсутствия разрешений, это эквивалентно вызову wait () для этого семафора.

Когда поток освобождает разрешение, это эквивалентно вызову notifyAll () для этого конкретного семафора.

0 голосов
/ 17 апреля 2017
import java.util.concurrent.Semaphore;


public class ConsumerProducer{

    public static void main(String[] args) {

           Semaphore semaphoreProducer=new Semaphore(1);
           Semaphore semaphoreConsumer=new Semaphore(0);
           System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");

           new Producer(semaphoreProducer,semaphoreConsumer).start();
           new Consumer(semaphoreConsumer,semaphoreProducer).start();

    }


/**
 * Producer Class.
 */
static class Producer extends Thread{

    Semaphore semaphoreProducer;
    Semaphore semaphoreConsumer;


    public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
           this.semaphoreProducer=semaphoreProducer;
           this.semaphoreConsumer=semaphoreConsumer;
    }

    public void run() {
           for(;;){
                  try {
                      semaphoreProducer.acquire();
                      System.out.println("Produced : "+Thread.currentThread().getName());
                      semaphoreConsumer.release();

                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }          
    }
}

/**
 * Consumer Class.
 */
static class Consumer extends Thread{

    Semaphore semaphoreConsumer;
    Semaphore semaphoreProducer;

    public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
           this.semaphoreConsumer=semaphoreConsumer;
           this.semaphoreProducer=semaphoreProducer;
    }

    public void run() {

           for(;;){
                  try {
                      semaphoreConsumer.acquire();
                      System.out.println("Consumed : "+Thread.currentThread().getName());
                      semaphoreProducer.release();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }
    }

}
}
0 голосов
/ 25 февраля 2017
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
/**
 *
 * @author sakshi
 */
public class SemaphoreDemo {

    static Semaphore producer = new Semaphore(1);
    static Semaphore consumer = new Semaphore(0);
    static List<Integer> list = new ArrayList<Integer>();

    static class Producer extends Thread {

        List<Integer> list;

        public Producer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    producer.acquire();

                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }
                System.out.println("produce=" + i);

                list.add(i);
                consumer.release();

            }
        }
    }

    static class Consumer extends Thread {

        List<Integer> list;

        public Consumer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    consumer.acquire();
                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }

                System.out.println("consume=" + list.get(i));

                producer.release();
            }
        }
    }

    public static void main(String[] args) {
        Producer produce = new Producer(list);

        Consumer consume = new Consumer(list);

        produce.start();
        consume.start();
    }
}


output:

produce=0
consume=0
produce=1
consume=1
produce=2
consume=2
produce=3
consume=3
produce=4
consume=4
produce=5
consume=5
produce=6
consume=6
produce=7
consume=7
produce=8
consume=8
produce=9
consume=9
...