Java для одного производителя - PullRequest
0 голосов
/ 26 декабря 2018

Я новичок в Java Concurrency и пытаюсь достичь / внедрить Single Producer [P1] и Multiple Consumer [C1, C2, C3].

Идея заключается в том, что производитель [P1] оценивает ценность и потребителейC1, C2, C3 все запускают свою задачу, чтобы прочитать значение индивидуально, как указано P1.Как только C1, C2, C3 считывают значения, P1 снова помещает новые данные.Затем C1, C2, C3 считывает данные, и этот цикл продолжается.

Wait Notify отлично работает для одного производителя с одним потребителем, но в этом случае концепция уведомления с несколькими потребителями для одного производителя не выглядит хорошей стратегией,Как мне подойти к этой проблеме.

1 Ответ

0 голосов
/ 28 декабря 2018

Спасибо @Ivan и @ Andreas.

@ Иван - В его комментарии я понял, как ведет себя шаблон Producer Consumer.@Andreas - в своем комментарии предложено использование Phaser.(Вместо этого я использовал Cyclic Barrier, поскольку количество зарегистрированных потоков не меняется динамически)

В обоих комментариях приведен пример кода ниже.Пожалуйста, предлагайте импровизацию, если есть какой-либо или лучший способ справиться с этим.

Основной класс

    public static void main(String[] args)
    {
        SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
        new Thread(new Producer(sharedSpace)).start();


        Consumer consumerRunnable = new Consumer(sharedSpace);
        new Thread(consumerRunnable).start();

        CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);

        new Thread(new EndUser(barrier,consumerRunnable)).start();
        new Thread(new EndUser(barrier,consumerRunnable)).start();
        new Thread(new EndUser(barrier,consumerRunnable)).start();
    }

Производитель

private SharedSpace sharedSpace;

public Producer(SharedSpace sharedSpace) {
    super();
    this.sharedSpace = sharedSpace;
}

public SharedSpace getSharedSpace() {
    return sharedSpace;
}

public void setSharedSpace(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
}

@Override
public void run() {

    for(int i=0;i<3;i++)
    {
        int value = (int) (Math.random()*30);
        sharedSpace.addValue(value);
    }


}

Очередь, общая для производителя и потребителя

private BlockingQueue<Integer> queue;

    public SharedSpace(BlockingQueue<Integer> queue) {
        super();
        this.queue = queue;
    }

    public BlockingQueue<Integer> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    public void addValue(int value)
    {
        try {
            queue.put(value);
            System.out.println(System.nanoTime()+" Producer added value "+value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getValue() throws InterruptedException
    {
            return queue.take();


    }

Потребитель

private SharedSpace sharedSpace;

    private Integer value;

    public Consumer(SharedSpace sharedSpace) {
        super();
        this.sharedSpace = sharedSpace;
    }

    public SharedSpace getSharedSpace() {
        return sharedSpace;
    }

    public void setSharedSpace(SharedSpace sharedSpace) {
        this.sharedSpace = sharedSpace;
    }

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

    @Override
    public void run() 
    {

        try {
            setValue(sharedSpace.getValue());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

EndUser

CyclicBarrier barrier;

Consumer consumer;

public EndUser(CyclicBarrier barrier) {
    super();
    this.barrier = barrier;
}

public EndUser(CyclicBarrier barrier, Consumer consumer) {
    super();
    this.barrier = barrier;
    this.consumer = consumer;
}


public Consumer getConsumer() {
    return consumer;
}

public void setConsumer(Consumer consumer) {
    this.consumer = consumer;
}


public CyclicBarrier getBarrier() {
    return barrier;
}


public void setBarrier(CyclicBarrier barrier) {
    this.barrier = barrier;
}


@Override
public void run() {
    try
    {
        while(true)
        {
            System.out.println(consumer.getValue());
            barrier.await();
        }
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }

}

Вывод [Потребитель не читает из Producer, пока все EndUser не получили свои данные]

Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0
...