Как остановить поток производителя, если потребительский поток сталкивается с исключением - PullRequest
2 голосов
/ 27 сентября 2019

У меня такая ситуация с потребителем производителя, когда я использую очередь-массив.Как остановить поток производителя, если потребительский поток сталкивается с исключением.Мне нужно, чтобы продюсер перестал ждать, пока очередь не опустеет.я вызвал исключение принудительного выполнения.Но программа не выходит.Производители продолжают ждать, пока очередь не станет пустой.Может кто-нибудь, пожалуйста, помогите

public class ServiceClass implements Runnable{

    private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
    private static final int CONSUMER_COUNT = 1;
    private boolean isConsumerInterrupted = false;

    private boolean isConsumer = false;
    private static boolean producerIsDone = false;

    public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
        this.isConsumer = consumer;
        this.isConsumerInterrupted = isConsumerInterrupted;
    }

    public static void main(String[] args) {

        long startTime = System.nanoTime();

        ExecutorService producerPool = Executors.newFixedThreadPool(1);
        producerPool.submit(new ServiceClass(false,false)); // run method is
                                                         // called   
        // create a pool of consumer threads to parse the lines read
        ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            consumerPool.submit(new ServiceClass(true,false)); // run method is
                                                            // called
        }
        producerPool.shutdown();
        consumerPool.shutdown();

        while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
        }

        long endTime = System.nanoTime();
        long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
        System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");

    }



    @Override
    public void run() {
        if (isConsumer) {
            consume();
        } else {
            readFile(); //produce data by reading a file
        }
    }

    private void readFile() {
        //Path file = Paths.get("c:/temp/my-large-file.csv");
        try
        {

            for(int i =0;i<10000;i++) {
                if(isConsumerInterrupted) {
                    break;
                }
                processQueue.put(i);
                System.out.println("produced:" + i+"------"+processQueue.size());

            }
            //Java 8: Stream class


        } catch (Exception e){
            e.printStackTrace();
        }

        producerIsDone = true; // signal consumer
        System.out.println(Thread.currentThread().getName() + " producer is done");
    }

    private void consume() {
        try {
            while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {

                System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
                throw new RuntimeException();

                //System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());                
            }
            System.out.println(Thread.currentThread().getName() + " consumer is done");
        } catch (Exception e) {
            isConsumerInterrupted=true;
        }


    }




}

Ответы [ 2 ]

1 голос
/ 27 сентября 2019

Вы используете флаг isConsumerInterrupted для завершения потока источника.Это не верно.Потребитель не потребляет элементы из очереди, и Producer продолжает производить, пока очередь не заполнится, а затем начинает блокировку, пока очередь не станет не полной.Затем, когда потребитель генерирует исключение RuntimeException, он устанавливает флаг, и поток источника не получает возможности проверить флаг, поскольку ни один из потребителей не потребляет элементы из очереди, чтобы источник мог выйти из состояния ожидания.Один из вариантов - использовать будущее и отменять его, когда потребитель создает исключение, противоречащее установке флага.Поскольку processQueue.put реагирует на прерывание, он успешно завершит поток производителя.InterruptedException выбрасывается - если прервано во время ожидания.Вот как это выглядит.

private static Future<?> producerFuture = null;
public static void main(String[] args) {

    // Remainder omitted.
    producerFuture = producerPool.submit(new ServiceClass(false, false)); 
    // ...
}

private void consume() {
    try {
        // ...
    } catch (Exception e) {
        producerFuture.cancel(true);
    }
}
0 голосов
/ 27 сентября 2019

Идея модели производитель / потребитель состоит в том, чтобы отделить производителей от потребителей.Таким образом, ваше желание остановить производителя, когда потребитель мёртв, не согласуется с основной мотивацией модели производитель / потребитель.

Но если вам действительно нужно убить производителя, если очередь заполнена, то вместо queue.put(), который блокируетнеограниченное время вы можете использовать queue.offer(E e, long timeout, TimeUnit unit), который будет ожидать тайм-аут, если очередь заполнена, и возвращать false, если ему не удалось добавить элемент в очередь.Таким образом, в коде вашего производителя вы можете обработать этот возвращаемый результат.

...