Как обработать список объектов при параллельной обработке в Java - PullRequest
0 голосов
/ 05 мая 2018

У меня есть список объектов в Java, как тысячи объектов в списке, и я перебираю список для каждого объекта и дальнейшей обработки. Та же обработка происходит для каждого объекта. Этот последовательный подход требует много времени для обработки, поэтому я хочу достичь с параллельной обработкой в ​​Java. Я проверил среду исполнения в Java, но застрял в ней.

Я подумал об одном подходе для реализации моего требования.

Я хочу реализовать некоторое фиксированное количество минимальных объектов, которые будут обрабатываться каждым потоком, чтобы каждый поток выполнял свою работу и обрабатывал объекты быстро. Как я могу достичь этого? Или, если какой-либо другой подход для реализации моего требования, пожалуйста, поделитесь.

Например:

Список объектов = новый список ();

For (Объект объект: объекты) { // Делаем какую-то общую операцию для всех Объекты

}

Ответы [ 3 ]

0 голосов
/ 05 мая 2018

Вы можете использовать ThreadPoolExecutor, он позаботится о балансе нагрузки. Задачи будут распределяться по разным потокам.

Вот пример:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String[] args) {

        // Fixed thread number
        ExecutorService service = Executors.newFixedThreadPool(10);

        // Or un fixed thread number
        // The number of threads will increase with tasks
        // ExecutorService service = Executors.newCachedThreadPool(10);

        List<Object> objects = new ArrayList<>();
        for (Object o : objects) {
            service.execute(new MyTask(o));
        }

        // shutdown
        // this will get blocked until all task finish
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static class MyTask implements Runnable {
        Object target;

        public MyTask(Object target) {
            this.target = target;
        }

        @Override
        public void run() {
            // business logic at here
        }
    }
}
0 голосов
/ 15 апреля 2019

Разделение списка на несколько подсписков и использование многопоточности для обработки каждый подсписок параллелен.

public class ParallelProcessListElements {
    public void processList (int numberofthreads,List<Object>tempList, 
            Object obj, Method method){

                final int sizeofList=tempList.size();
                final int sizeofsublist = sizeofList/numberofthreads;
                List<Thread> threadlist = new ArrayList<Thread>();

                for(int i=0;i<numberofthreads;i++) {
                    int firstindex = i*sizeofsublist;
                    int lastindex = i*sizeofsublist+sizeofsublist;
                    if(i==numberofthreads-1)
                        lastindex=sizeofList;

                    List<Object> subList=tempList.subList(firstindex,lastindex );

                    Thread th = new Thread(()->{
                                try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();}
                            });

                    threadlist.add(th);
                }

                threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}});
    }

}

public class Demo {
    public static void main(String[] args) {

        List<Object> tempList= new ArrayList<Object>();
        /**
         * Adding values to list... For Demo purpose..
         */
        for(int i=0;i<500;i++)
            tempList.add(i);

        ParallelProcessListElements process = new ParallelProcessListElements();
        final int numberofthreads = 5;
        Object obj = new Demo();
        Method method=null;

        try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {}
        /**
         * Method Call...
         */
        process.processList(numberofthreads,tempList,obj,method);
    }

    public void printList(List<Integer>list) {
        /**
         * Business logic to process the list...
         */
        list.forEach(item->{
            try{Thread.sleep(1000);}catch(Exception e) {}
            System.out.println(item);
            });
    }
}

0 голосов
/ 05 мая 2018

Существует много вариантов параллельной обработки списка:

Использовать параллельный поток :

objects.stream().parallel().forEach(object -> {
    //Your work on each object goes here, using object
})

Используйте службу исполнителя для отправки задач, если вы хотите использовать пул с большим количеством потоков, чем пул с форк-объединением:

ExecutorService es = Executors.newFixedThreadPool(10);
for(Object o: objects) {
    es.submit(() -> {
        //code here using Object o...
    }
}

Этот предыдущий пример по сути аналогичен традиционной службе исполнителя, выполняющей задачи в отдельных потоках.

В качестве альтернативы этому вы также можете отправить, используя завершаемое будущее:

//You can also just run a for-each and manually add each
//feature to a list
List<CompletableFuture<Void>> futures = 
    objects.stream().map(object -> CompletableFuture.runAsync(() -> {
    //Your work on each object goes here, using object
})

Затем вы можете использовать объект futures для проверки состояния каждого выполнения, если это необходимо.

...