Как многопоточность моего последовательного кода Java - PullRequest
0 голосов
/ 14 марта 2019

У меня есть Java-программа, которая выдала список, выполняет некоторые независимые процессы для каждого элемента списка (который включает в себя извлечение текстов из некоторых ресурсов HTTP и вставку их в независимый HashMap) и, наконец, вычисляет некоторые числа на этих HashMaps. Основной фрагмент выглядит так:

    for (int i = 0; i < mylist.size(); i++) {
        long startepoch = getTime(mylist.get(i).time);
        MyItem m = mylist.get(i);
        String index=(i+1)+"";

        process1(index, m.name, startepoch, m.duration);
        //adds to hashmap1

        if(m.name.equals("TEST")) {
            process2(index, m.name, startepoch, m.duration);
        //adds to hashmap2

        } else {
            process3(index, m.name, startepoch, m.duration);
        //adds to hashmap3
            process4(index, m.name, startepoch, m.duration);
        //adds to hashmap4
            process5(index, m.name, startepoch, m.duration);
        //adds to hashmap5
            process6(index, m.name, startepoch, m.duration);
        //adds to hashmap6
        }
    }

    // then start calculation on all hashmaps
    calculate_all();

Поскольку в настоящее время этот фрагмент выполняется последовательно, это может занять около 30 минут для списка из 500 элементов. Как я могу многопоточность моего кода, чтобы сделать его быстрее? И потокобезопасным способом?

Я пытался использовать ExecutorService executorService = Executors.newFixedThreadPool(10);, а затем отправлять каждый отдельный процесс в executorService, оборачивая его, как показано ниже, но проблема заключалась в том, что я не мог знать, когда они finsih так назвать calculate_all(). Так что я не продолжил.

            executorService.submit(new Runnable() {
                public void run() {
                    process2(index, m.name, startepoch, m.duration);
                }
            });

Есть идеи получше?

Ответы [ 2 ]

4 голосов
/ 14 марта 2019

но проблема была в том, что я не мог знать, когда они закончат

Когда вы отправляете что-то Исполнителю, вы получаете Future с результатом (если есть).

Затем вы можете вызвать Future::get из основного потока, чтобы дождаться этих результатов (или просто завершения в вашем случае).

List<Future<?>> completions = executor.invokeAll(tasks);

// later, when you need to wait for completion
for(Future<?> c: completions) c.get();

Еще одна вещь, о которой вам нужно позаботиться, это как сохранить результаты. Если вы планируете, чтобы ваши задачи помещали их в какую-то общую структуру данных, убедитесь, что это потокобезопасно. Вероятно, проще перейти с Runnable на Callable, чтобы задачи могли возвращать результат (который впоследствии можно объединить однопоточным способом в главном потоке).

1 голос
/ 14 марта 2019

Обратите внимание, что многопоточность не обязательно увеличивает скорость. Многопоточность в основном используется для уменьшения циклов простоя ЦП путем предотвращения ненужных снов и т. Д.

Я мало чем могу помочь с тем, что вы предоставили, однако, я думаю, вы можете начать делать что-то вроде этого:

  1. Использовать поточно-ориентированные структуры данных. Это обязательно. Если вы пропустите это шаг, ваше программное обеспечение сломается, в конце концов. И у вас будет очень трудно определить причину. (например, если у вас есть ArrayList, используйте потокобезопасный)
  2. Вы можете начать тестирование многопоточности, удалив цикл for и используя поток для каждого выполнения, вместо этого. Если ваш цикл размер больше, чем количество ваших нитей, вы будете иметь поставить их в очередь.
  3. У вас есть окончательный расчет, который требует от всех других потоков Конец. Вы можете использовать CountDownLatch, wait () / notifyAll () или synchronized () в зависимости от вашей реализации.
  4. Выполните ваш окончательный расчет.

EDIT

В ответ на (2):

Ваше текущее исполнение таково:

for (int i = 0; i < mylist.size(); i++) {
    some_processes();
}

// then start calculation on all hashmaps
calculate_all();

Теперь, чтобы удалить циклы «for», вы можете начать с увеличения циклов «for». например:

// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
    some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
    some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
    some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
    some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
    some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

У этого метода есть несколько недостатков, как вы можете видеть. Например, что если размер списка станет, скажем, 380? Тогда у вас есть праздная нить. Кроме того, что если вы хотите более 5 потоков?

Таким образом, в этот момент вы можете еще больше увеличить количество циклов "for", делая их циклически меньшими и меньшими. При максимальном значении "счетчик циклов" == "счетчик потоков" эффективно удаляется цикл for. Технически, вам нужно количество потоков mylist.size (). Вы можете сделать это следующим образом:

// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());

new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

Обратите внимание, что при таком расположении мы удалили ваш начальный цикл for и вместо этого создали еще один, который просто отправляет новые потоки по мере опустошения очереди. Вы можете проверить примеры BlockingQueue с приложениями производителя и потребителя. Например, смотрите: Примеры BlockingQueue

РЕДАКТИРОВАТЬ 2

Простая реализация Future может выглядеть так:

ExecutorService executorService = Executors.newCachedThreadPool();  
Future future1, future2, future3, future4, future5, future6;  

for (int i = 0; i < mylist.size(); i++) {
    long startepoch = getTime(mylist.get(i).time);
    MyItem m = mylist.get(i);
    String index=(i+1)+"";

    future1 = executorService.submit(new Callable() {...})
    //adds to hashmap1

    future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    if(m.name.equals("TEST")) {
        future2 = executorService.submit(new Callable() {...})
    //adds to hashmap2

        future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    } else {
        future3 = executorService.submit(new Callable() {...})
    //adds to hashmap3
        future4 = executorService.submit(new Callable() {...})
    //adds to hashmap4
        future5 = executorService.submit(new Callable() {...})
    //adds to hashmap5
        future6 = executorService.submit(new Callable() {...})
    //adds to hashmap6

         // Add extra future.get here as above...
    }
}

// then start calculation on all hashmaps
calculate_all();

Не забудьте добавить блок try-catch, иначе вы не сможете оправиться от исключений и сбоев.

// Example try-catch block surrounding a Future.get().
try {
    Object result = future.get();       
} catch (ExecutionException e) {
    //Do something
} catch (InterruptedException e) {
    //Do something
}

Однако, вы можете иметь более сложный, как показано здесь . Эта ссылка также объясняет ответ Тило.

...