Java - BlockingQueue останавливает многопоточное приложение - PullRequest
2 голосов
/ 07 апреля 2020

Я делаю приложение, которое состоит из двух потоков: один из них записывает значение в LinkedBlockingQueue, другой читает. Я использую ScheduledExecutorService для выполнения этой операции в некоторый период в секундах. Проблема в том, что мое приложение зависает при использовании метода BlockingQueue, и я не могу понять, почему.

Это общий ресурс:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}

Это читатель

Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
    try {
        semaphore.acquire();
        System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Writer:

Runnable writer = ()->{
    res.q.add("hi");
};

Полный код:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        Res res = new Res();
        Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
        Runnable reader = ()->{
            try {
                semaphore.acquire();
                System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Runnable writer = ()->{
            res.q.add("hi");
        };
        Random rnd = new Random();

        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(writer,time, TimeUnit.SECONDS);
        }
        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(reader,time, TimeUnit.SECONDS);
        }

        executorService.shutdown();
}

Должно быть напечатано двадцать строк "hi [number]", но в какой-то строке оно застывает. Например, мой текущий отпечаток:

hi 1
hi 2
hi 3
hi 4
hi 5

Я узнал, если я увеличу количество потоков newScheduledThreadPool(20), он начинает работать, но как я могу сделать это с двумя потоками? Спасибо!

1 Ответ

2 голосов
/ 07 апреля 2020

Немного сложно следовать вашему коду, хотя в то же время очевидно, что происходит. Вы можете одновременно запускать два потока из-за Executors.newScheduledThreadPool(2);. Обе эти темы reader темы.

Итак, Thread-1 вошел в блок try и получил разрешение семафора через semaphore.acquire();, но очередь была пуста - как таковая она блокируется на res.q.take(). Следующий поток - Thread-2 - тоже поток чтения, но он не может получить permit, поскольку он уже занят Thread-1 и заблокирован на semaphore.acquire();. Поскольку у вас нет места для других потоков (ваш пул заблокирован, работая с этими двумя потоками), нет авторов, которые бы поместили что-то в вашу очередь и, таким образом, разблокировали бы Thread-1 (чтобы res.q.take() работал).

Добавление большего количества рабочих потоков просто задерживает проблему - вы можете оказаться в том же положении, что и раньше.

...