Доступ к HashMap внутри flatMapToPair - PullRequest
0 голосов
/ 10 апреля 2020

Редактировать: уже решено с помощью RDD.collectAsMap()

Я пытаюсь воспроизвести решение проблемы со страниц 28-30 из http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf

У меня есть HashMap, который я создаю вне функции карты. HashMap содержит следующие данные:

{1:2, 2:3, 3:2, 4:2, 5:3}

Ранее определенная RDD предыдущаяRDD имела тип:

JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>

содержит данные:

1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]

Я пытаюсь для создания нового RDD с flatMapToPair:

JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
    @Override
    public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
        Integer count;
        ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
        count = hashMap.get(integerIterableTuple2._1);
        for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
            Integer tcount = hashMap.get(t._2);
            if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
                list.add(t);
            }
        }
        return list.iterator();
    }
});

Но при этом hashMap.get(t._2) внутри для l oop большую часть времени получает значения NULL. Я проверил, что правильные значения находятся внутри HashMap.

Есть ли способ правильно получить значения HashMap внутри функции Spark?

Ответы [ 2 ]

0 голосов
/ 10 апреля 2020

У меня была похожая проблема с переменными класса. Вы можете попробовать сделать вашу переменную локальной или объявить еще одну, например так:

 Map localMap = hashMap;
 JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(
   ...
      Integer tcount = localMap.get(t._2);
   ...
 );

Я думаю, это связано с механизмом сериализации искры. Вы можете прочитать больше об этом здесь .

0 голосов
/ 10 апреля 2020

должно работать. Spark должен захватить вашу переменную, сериализовать ее и отправить каждому работнику с каждой задачей. Вы можете попытаться передать эту карту

sc.broadcast(hashMap)

и использовать результат вместо hashMap. Это также более эффективно с точки зрения памяти (общее хранилище на исполнителя).

...