Spark: отличайтесь в каждом разделе - PullRequest
0 голосов
/ 11 мая 2018

Я хочу разделить данные, используя ID, и с каждым разделом я хочу

- применить набор операций

-принимай

Выполнение отдельных внутри каждого раздела позволит избежать перетасовки.

val rowRDD = sc.textFile("flatten_test_data")
    .filter(_.nonEmpty)
    .map { l =>
        val arr = l.split("\u0001")
        val id = arr(0)
         val value = arr(1)
         (id,value)
    }.partitionBy(new HashPartitioner(4))
    .persist()

Теперь сделай что-нибудь вроде ...

rowRDD.foreachPartition {records => applyOpers(records)}

applyOpers (набор данных) должен сделать что-то вроде -

dataset.withColumn(udf1).withColumn(udf2).distinct

1 Ответ

0 голосов
/ 12 мая 2018

forEachPartition исполняется на исполнителя.Следовательно, вы не можете получить доступ к SparkContext / SparkSession внутри forEachPartition.

Вы можете использовать mapPartitions() в качестве альтернативы map() & foreach().mapPartitions() вызывается один раз для каждого раздела в отличие от map() & foreach(), который вызывается для каждого элемента в СДР.Основное преимущество заключается в том, что мы можем выполнять инициализацию для каждого раздела вместо каждого элемента.

Мы получаем Iterator в качестве аргумента для mapPartition, через который мы можем выполнять итерацию по всем элементам.в разделе

Например: (Пример на Java, но это должно дать вам представление.)

JavaRDD<Integer> rdd = sc.parallelize(
      Arrays.asList(1, 2, 3, 4, 5));
    FlatMapFunction<Iterator<Integer>, AvgCount> setup = new FlatMapFunction<Iterator<Integer>, AvgCount>() {
      @Override
      public Iterable<AvgCount> call(Iterator<Integer> input) {
        AvgCount a = new AvgCount(0, 0);
        while (input.hasNext()) {
          a.total_ += input.next();
          a.num_ += 1;
        }
        ArrayList<AvgCount> ret = new ArrayList<AvgCount>();
        ret.add(a);
        return ret;
      }
    };
    Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
        @Override
        public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
        }
    };

AvgCount result = rdd.mapPartitions(setup).reduce(combine);
...