Java параллельный поток для заполнения массива - PullRequest
0 голосов
/ 24 декабря 2018

У меня есть очень большая хеш-карта, которая полна простых чисел.

var mapA = new HashMap<Integer, Long>();

Мне нужно выполнить тяжелые вычисления, поэтому я использую параллельные потоки:

var res = new ArrayList<Integer();

mapA.entrySet()
        .parallelStream()
        .forEach( x -> {

            var values = mapA.entrySet()
                                    .parallelStream()
                                    .filter( /*conditions*/ )
                                    .map(y -> y.getKey())
                                    .toArray();                 

            Arrays.stream(values)
                      .parallel()
                      .sorted()
                      .forEach(val -> {

                           synchronized (this) {
                                res.add(x.getKey());
                                res.add((Integer) val);
                           }

                      });


        });

Как видите, существует res, который является массивом, который находится за пределами области потока.Мне нужно, чтобы цикл был параллельным, иначе расчет может занять минуты и минуты.Это необходимо?

.forEach(val -> {

    synchronized (this) {
        res.add(x.getKey());
        res.add((Integer) val);
    }

});

Я добавил synchronized, потому что, поскольку поток работает параллельно, я не хочу иметь условия гонки в случае, если 2 или более потоков добавляют данные в res в то же время.

Я попытался удалить синхронизированный (это), и программа по-прежнему работает нормально.Но как я могу быть уверен, что он всегда будет работать нормально?

Спасибо


Я собираюсь добавить сюда весь код, если необходимо:

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DiffieHellman {

    private static final int LIMIT = 65536;

    private final long p;
    private final long g;

    public DiffieHellman(long p, long g) {
        this.p = p;
        this.g = g;
    }

    public List<Integer> tryBruteForce(long publicA, long publicB) {
        List<Integer> res = new ArrayList<Integer>();

        var mapA = new HashMap<Integer, Long>(
                IntStream
                        .rangeClosed(0, LIMIT)
                        .parallel()
                        .boxed()
                        .collect(
                                Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
                        )
        );

        var mapB = new HashMap<Integer, Long>(
                IntStream
                        .rangeClosed(0, LIMIT)
                        .parallel()
                        .boxed()
                        .collect(
                                Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
                        )
        );

        mapA.entrySet()
                    .parallelStream()
                    .forEach( x -> {

                        var values = mapB.entrySet()
                                        .parallelStream()
                                        .filter( y -> y.getValue().equals(x.getValue()))
                                        .map(Map.Entry::getKey)
                                        .toArray(Integer[]::new);

                        Arrays.stream(values)
                                .parallel()
                                .sorted()
                                .forEach(val -> {
                                        res.add(x.getKey());
                                        res.add((Integer) val);
                                });


                    });

        return res;
    }

}

1 Ответ

0 голосов
/ 24 декабря 2018

Естественно, вы можете просто использовать синхронизированную коллекцию, как указывалось в других ответах, но это может быть недостаточно эффективным из-за разногласий и все еще довольно громоздко для записи.

Вместо этого вы можете подойти к проблеме внемного иначе, используя Stream API идиоматически.

Во-первых, вложенные операции могут выполняться в одном поточном конвейере:

mapB.entrySet()
            .parallelStream()
            .filter(y -> y.getValue().equals(x.getValue()))
            .map(y -> y.getKey())
            .sorted()
            .forEach(val -> {

                synchronized (this) {
                    res.add(x.getKey());
                    res.add((Integer) val);
                }
            });

Во-вторых, чтобы избежать проблем с параллелизмом, самый простой способбыло бы отбросить императивный подход и использовать декларативность Stream API.

Чтобы сделать это, не нужно вручную for-each и затем add элементы к результату, но позволить Stream управлять этим.

Здесь вы хотите создать новую последовательность, заменив каждый элемент mapA entrySet () пользовательской последовательностью:

List<Integer> res = mapA.entrySet()
      .parallelStream()
      .flatMap(x -> mapB.entrySet().stream()
         .filter(y -> y.getValue().equals(x.getValue()))
         .map(Map.Entry::getKey)
         .sorted()
         .flatMap(v -> Stream.of(x.getKey(), v)))
      .collect(Collectors.toList());

Вложенный parallelStream можно опустить, поскольку flatMap все равно звонит sequential() в потоке.

...