Java ExecutorService - масштабирование - PullRequest
1 голос
/ 29 ноября 2011

Я пытаюсь написать программу на Java, используя ExecutorService и ее функцию invokeAll. У меня вопрос: функция invokeAll решает задачи одновременно? Я имею в виду, если у меня будет два процессора, будет ли два рабочих одновременно? Потому что я не могу правильно масштабировать. Требуется то же время, чтобы решить проблему, если я дам newFixedThreadPool(2) или 1.

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map - это класс, который реализует Callable, а wp - это вектор частичных решений, класс, который содержит некоторую информацию в разное время.

Почему не масштабируется? В чем может быть проблема?

Это код для PartialSolution:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

Это код для карты:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

Так что в Map я беру каждую строку из фрагмента и ищу для каждого выражения количество появлений, а также сохраняю номер строки. После обработки всего фрагмента в том же PartialSolution я сохраняю информацию в хэш-карте и возвращаю новый PartialSolution. На следующем шаге я объединяю PartialSolutions с тем же fileName и представляю их в классе Callable Reduce, который аналогичен map, но отличается тем, что он выполняет другие операции, но возвращает также PartialSolution.

Это код для запуска задач карты:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

В задании я создаю задание типа Map и в списке получаю их. Я не знаю, как прочитать дамп потока JVM. Я надеюсь, что это достаточно хорошо, какую информацию я дал вам. Я работаю в NetBeans 7.0.1, если это поможет.

Спасибо, Alex

Ответы [ 3 ]

2 голосов
/ 29 ноября 2011

Что я хочу знать, так это если метод invokeAll, если я создал ExcutorService с 10 потоками, будет решать 10 задач одновременно или будет решать по одной за раз?

Если вы отправите десять задач в ExecutorService с десятью потоками, он выполнит их все одновременно. Могут ли они действовать полностью параллельно и независимо друг от друга, зависит от того, что они делают. Но у каждого из них будет свой поток.

И еще один вопрос, если я скажу list.get (i) .get (), это вернет PartialSolution после того, как он будет решен?

Да, он будет блокироваться до тех пор, пока не будет выполнено вычисление (если это еще не сделано) и вернет свой результат.

Я действительно не понимаю, почему время не улучшается, если я использую 2 темы вместо 1.

Нам нужно увидеть больше кода. Синхронизируются ли они на некоторых общих данных? Сколько времени занимают эти задачи? Если они очень короткие, вы можете не заметить никакой разницы. Если они занимают больше времени, посмотрите дамп потока JVM, чтобы убедиться, что все они запущены.

0 голосов
/ 30 января 2016

Java 8 представила еще один API в Исполнители - newWorkStealingPool для создания пула кражи работы.Вам не нужно создавать RecursiveTask и RecursiveAction, но вы все равно можете использовать ForkJoinPool.

public static ExecutorService newWorkStealingPool()

Создает пул потоков для кражи работ, используя все доступные процессоры в качестве целевого уровня параллелизма..

По умолчанию в качестве параметра параллелизма будет использовано количество ядер ЦП.Если у вас есть основные ЦП, вы можете иметь 8 потоков для обработки очереди рабочих задач.

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

Любой ExecutorServiceПроизводительность или ForkJoinPool или ThreadPoolExecutor была бы хорошей, если бы у вас не было общих данных и общей блокировки (синхронизации) и межпотокового взаимодействия.Если бы все задачи были независимы друг от друга в очереди задач, производительность была бы улучшена.

ThreadPoolExecutor конструктор для настройки и управления рабочим процессом задач:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

Посмотрите на связанные вопросы SE:

Как правильно использовать Java Executor?

Fork / Join Java против ExecutorService - когда и какой использовать?

0 голосов
/ 30 ноября 2011

Если вы создадите пул потоков с двумя потоками, то две задачи будут выполняться одновременно.

Я вижу две вещи, из-за которых два потока могут занимать столько же времени, сколько один поток.

Если только одна из ваших задач на карте занимает большую часть вашего времени, то дополнительные потоки не заставят эту задачу выполняться быстрее. Он не может закончиться быстрее, чем самая медленная работа.

Другая возможность состоит в том, что ваша задача карты часто читает из общего вектора. Это может вызвать достаточно разногласий, чтобы отменить усиление наличия двух потоков.

Вы должны поднять это в jvisualvm, чтобы увидеть, что делает каждый поток.

...