Почему BlockingQueue.take () не освобождает поток? - PullRequest
1 голос
/ 21 июля 2011

В этой простой короткой программе вы заметите, что программа зависает навсегда, потому что take () не освобождает поток. Насколько я понимаю, take () приводит к тому, что поток освобождается, даже если сама задача заблокирована в take ().

Отредактировано:

Это работает (спасибо вам всем за исправление автобокса):

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducersConsumers {
    private static int THREAD_COUNT = 5;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final ExecutorService executorPool = Executors.newFixedThreadPool(THREAD_COUNT);
        final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();

        Collection<Future<Long>> collection = new ArrayList<Future<Long>>();


        // producer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    for (int i = 100; i >= 0; i--) {
                        queue.put((long) i);
                    }
                    return -1L;
                }
            }));
        }

        // consumer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    while (true) {
                        Long item = queue.take();
                        if (item.intValue() == 0) {
                            break;
                        }
                    }
                    return 1L;
                }
            }));
        }

        long sum = 0;
        for (Future<Long> item : collection) {
            sum += item.get();
        }

        executorPool.shutdown();
        System.out.println("sum = " + sum);
    }
}

Но если вы поменяете местами вызовы производителя и потребителя, он будет зависать:

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducersConsumers {
    private static int THREAD_COUNT = 5;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final ExecutorService executorPool = Executors.newFixedThreadPool(THREAD_COUNT);
        final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();

        Collection<Future<Long>> collection = new ArrayList<Future<Long>>();


        // consumer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    while (true) {
                        Long item = queue.take();
                        if (item.intValue() == 0) {
                            break;
                        }
                    }
                    return 1L;
                }
            }));
        }

        // producer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    for (int i = 100; i >= 0; i--) {
                        queue.put((long) i);
                    }
                    return -1L;
                }
            }));
        }

        long sum = 0;
        for (Future<Long> item : collection) {
            sum += item.get();
        }

        executorPool.shutdown();
        System.out.println("sum = " + sum);
    }
}

Насколько я понимаю, заказчик не должен иметь значения. Другими словами, есть понятие задачи и потока. Поток не зависит от кода программы, тогда как задача связана с определенной программой. Поэтому в моем примере, когда JVM назначает поток для выполнения задач Callable, если сначала создается экземпляр потребителя, тогда задача будет блокироваться при take (). Как только JVM обнаруживает, что задача заблокирована, она освобождает поток (или, как я понимаю, но не освобождает его) и помещает его обратно в пул рабочих потоков при подготовке к обработке выполняемой задачи (которая в данном случае Производители). Следовательно, в конце создания всех вызываемых элементов должно быть 40 задач, но только 5 потоков; 20 из этих задач заблокированы, 5 из них должны быть запущены, а 15 должны быть в ожидании (для запуска).

Ответы [ 4 ]

2 голосов
/ 21 июля 2011

Я думаю, вы неправильно понимаете, как работают потоки и пулы потоков. В пуле потоков обычно есть очередь рабочих элементов, которая содержит элементы для обработки (в вашем случае Callable<> s).

Он также содержит (максимальное) количество потоков (в вашем случае 5), которые могут работать с этими элементами.

Время жизни активного потока определяется кодом, который он выполняет - обычно это метод. Поток становится «живым», когда начинает выполнять метод, и заканчивается, когда возвращается. Если метод блокирует ожидание какого-либо сигнала, это не означает, что поток может уйти и выполнить какой-то другой метод - это не так, как работают потоки. Вместо этого поток будет заблокирован, пока он не сможет продолжить выполнение и разрешить запуск других потоков.

Метод, который выполняется потоком потоков, обычно выглядит так:

void threadloop()
{
    while (!quit)
    {
        Callable<T> item = null;
        synchronized (workQueue)
        {
            if (workQueue.Count == 0)
                workQueue.wait();

            // we could have been woken up for some other reason so check again
            if (workQueue.Count > 0)
                item = workQueue.pop();
        }
        if (item != null)
             item.Call();
    }
}

Это более или менее псевдокод (я не Java-разработчик), но он должен показать концепцию. Теперь item.Call() выполняет метод, предоставленный пользователем пула. Если этот метод блокируется, то что происходит? Хорошо - поток будет заблокирован при выполнении item.Call() до тех пор, пока метод снова не активируется. Он не может просто уйти и выполнить какой-то другой код произвольно.

1 голос
/ 21 июля 2011

С javadoc :

Извлекает и удаляет заголовок этой очереди, ожидая, если в этой очереди нет элементов.

Этобудет ждать: вы работаете в main, поэтому он останется там.

EDIT: коррекция: блокировка все еще происходит (в потоках пула потоков, а не в main).Происходить не удастся: 20 потоков заблокированы на вызовах take, поэтому вызовы put не выполняются, поэтому Future s никогда не завершаются, поэтому программа зависает.

1 голос
/ 21 июля 2011

Я думаю, вы неправильно поняли, что "блокируется" в BlockingQueue.

Вызов queue.take() блокирует поток, который его вызвал, до тех пор, пока что-то не станет доступным в очереди.Это означает, что поток будет ждать там бесконечно, если не будет прерван, до тех пор, пока элемент не будет добавлен в очередь.

Второй пример кода приводит к зависанию проблемы, поскольку вы добавляете 20 задач, ожидающих появления элемента вBlockingQueue, и исполнитель имеет только 5 потоков - таким образом, первые пять задач приводят к блокировке всех пяти потоков.Этот исполнитель заполнен еще 15 потребительскими задачами.

Добавление задач во второй цикл for для добавления элементов в очередь приводит к 20 задачам, которые никогда не могут быть выполнены, поскольку все потоки в исполнителе застряли в ожидании.

Так что, когдавы говорите так:

Согласно моему пониманию, take () приводит к освобождению потока, даже если само задание заблокировано на take ().

У вас естьнедоразумение, потому что здесь нет разницы между тем, что делает «нить», и тем, что делает «задача».Поток не может быть «освобожден», пока задача заблокирована - это поток, который запускает задачу.Когда поток сталкивается с блокирующим вызовом take(), поток блокируется, точка.

1 голос
/ 21 июля 2011

Я не знаю, что именно вы подразумеваете под release thread, но как только вы блокируете на take() вызывающий поток блокируется и не возвращается в пул.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...