Как сделать ограничение потока в Java - PullRequest
13 голосов
/ 28 декабря 2011

Допустим, у меня есть 1000 файлов для чтения, и из-за некоторых ограничений я хочу читать максимум 5 файлов параллельно. И, как только один из них закончится, я хочу, чтобы новый стартовал.

У меня есть основная функция, у которой есть список файлов, и я пытаюсь изменить счетчик всякий раз, когда завершается один поток. но это не работает!

Есть предложения?

Ниже приведен основной цикл функций

for (final File filename : folder.listFiles()) {

Object lock1 = new Object();
new myThread(filename, lock1).start();
counter++;
while (counter > 5);
}

Ответы [ 4 ]

22 голосов
/ 28 декабря 2011

Нерестовые нити, подобные этому, не подходят. Используйте ExecutorService и укажите пул равным 5. Поместите все файлы во что-то вроде BlockingQueue или другой потокобезопасной коллекции, и все исполняющие могут просто poll() это по своему желанию.

public class ThreadReader {

    public static void main(String[] args) {
        File f = null;//folder
        final BlockingQueue<File> queue = new ArrayBlockingQueue<File>(1000);
        for(File kid : f.listFiles()){
            queue.add(kid);
        }

        ExecutorService pool = Executors.newFixedThreadPool(5);

        for(int i = 1; i <= 5; i++){
            Runnable r = new Runnable(){
                public void run() {
                    File workFile = null;
                    while((workFile = queue.poll()) != null){
                        //work on the file.
                    }
                }
            };
            pool.execute(r);
        }
    }
}
3 голосов
/ 28 декабря 2011

Подход в ответе Кайлара правильный.Используйте классы executor, предоставляемые библиотеками классов Java, вместо того, чтобы реализовывать пул потоков с нуля (плохо).


Но я подумал, что было бы полезно обсудить код в вашем вопросе и почему он не 'т работа.(Я заполнил некоторые части, которые вы пропустили как можно лучше ...)

public class MyThread extends Thread {

    private static int counter;

    public MyThread(String fileName, Object lock) {
        // Save parameters in instance variables
    }

    public void run() {
        // Do stuff with instance variables
        counter--;
    }

    public static void main(String[] args) {
        // ...
        for (final File filename : folder.listFiles()) {
            Object lock1 = new Object();
            new MyThread(filename, lock1).start();
            counter++;
            while (counter > 5);
        }
        // ...
    }
}

ОК, так что же с этим не так?Почему это не работает?

Ну, первая проблема в том, что в main вы читаете и пишете counter без какой-либо синхронизации.Я предполагаю, что он также обновляется рабочими потоками - иначе код не имеет смысла.Это означает, что есть большая вероятность, что основные потоки не увидят результат обновлений, сделанных дочерними потоками.Другими словами, while (counter > 5); может быть бесконечным циклом.(На самом деле, это довольно вероятно. JIT-компилятору разрешено генерировать код, в котором counter > 5 просто проверяет значение counter, оставленное в регистре после предыдущего оператора counter++;.

Вторая проблема заключается в том, что ваш цикл while (counter > 5); невероятно бесполезен. Вы говорите JVM опрашивать переменную ... и она будет делать это потенциально МИЛЛИАРДЫ раз в секунду ... запустив один процессор (ядро).Вы не должны этого делать. Если вы собираетесь реализовывать подобные вещи с использованием низкоуровневых примитивов, вы должны использовать Java Object.wait() и Object.notify() методы, например, основной поток ожидает, и каждый рабочий поток уведомляет.

2 голосов
/ 28 декабря 2011

Вы можете использовать ExecutorService в качестве пула потоков И очереди.

ExecutorService pool = Executors.newFixedThreadPool(5);
File f = new File(args[0]);

for (final File kid : f.listFiles()) {
    pool.execute(new Runnable() {
        @Override
        public void run() {
            process(kid);
        }
    });
}
pool.shutdown();
// wait for them to finish for up to one minute.
pool.awaitTermination(1, TimeUnit.MINUTES);
0 голосов
/ 28 декабря 2011

Какой бы метод вы ни использовали для создания нового потока, увеличьте глобальный счетчик, добавьте условный оператор вокруг создания потока, который, если предел достигнут, не создает новый поток, возможно, помещает файлы в очередь (список?), а затем вы можете добавить еще один условный оператор после создания потока, если в очереди есть элементы, для их первой обработки.

...