Синхронизация часто убивает ускорение параллельных циклов for. Поэтому параллельным циклам for часто требуются свои личные данные и механизм сокращения, чтобы свести все потоки частных данных к единому результату.
Итак, я расширил Parallel.For версии Weimin Xiao
с помощью механизма сокращения.
public class Parallel {
public static interface IntLoopBody {
void run(int i);
}
public static interface LoopBody<T> {
void run(T i);
}
public static interface RedDataCreator<T> {
T run();
}
public static interface RedLoopBody<T> {
void run(int i, T data);
}
public static interface Reducer<T> {
void run(T returnData, T addData);
}
private static class ReductionData<T> {
Future<?> future;
T data;
}
static final int nCPU = Runtime.getRuntime().availableProcessors();
public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
ExecutorService executor = Executors.newFixedThreadPool(nCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (final T param : parameters) {
futures.add(executor.submit(() -> loopBody.run(param) ));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static void For(int start, int stop, final IntLoopBody loopBody) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i=start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
futures.add(executor.submit(() -> {
for (int j = iStart; j < iStop; j++)
loopBody.run(j);
}));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>();
for (int i = start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
final ReductionData<T> rd = new ReductionData<T>();
rd.data = creator.run();
rd.future = executor.submit(() -> {
for (int j = iStart; j < iStop; j++) {
loopBody.run(j, rd.data);
}
});
redData.add(rd);
}
for (ReductionData<T> rd : redData) {
try {
rd.future.get();
if (rd.data != null) {
reducer.run(result, rd.data);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
Вот простой пример теста: счетчик параллельных символов с использованием несинхронизированной карты.
import java.util.*;
public class ParallelTest {
static class Counter {
int cnt;
Counter() {
cnt = 1;
}
}
public static void main(String[] args) {
String text = "More formally, if this map contains a mapping from a key k to a " +
"value v such that key compares equal to k according to the map's ordering, then " +
"this method returns v; otherwise it returns null.";
Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();
// first sequentially
for(int i=0; i < text.length(); i++) {
char c = text.charAt(i);
Counter cnt = charCounter1.get(c);
if (cnt == null) {
charCounter1.put(c, new Counter());
} else {
cnt.cnt++;
}
}
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
}
// now parallel without synchronization
Parallel.For(0, text.length(), charCounter2,
// Creator
() -> new TreeMap<Character, Counter>(),
// Loop Body
(i, map) -> {
char c = text.charAt(i);
Counter cnt = map.get(c);
if (cnt == null) {
map.put(c, new Counter());
} else {
cnt.cnt++;
}
},
// Reducer
(result, map) -> {
for(Map.Entry<Character, Counter> entry: map.entrySet()) {
Counter cntR = result.get(entry.getKey());
if (cntR == null) {
result.put(entry.getKey(), entry.getValue());
} else {
cntR.cnt += entry.getValue().cnt;
}
}
}
);
// compare results
assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
Map.Entry<Character, Counter> entry2 = it2.next();
assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
}
System.out.println("Well done!");
}
}