forEachPartition
исполняется на исполнителя.Следовательно, вы не можете получить доступ к SparkContext / SparkSession внутри forEachPartition.
Вы можете использовать mapPartitions()
в качестве альтернативы map()
& foreach()
.mapPartitions()
вызывается один раз для каждого раздела в отличие от map()
& foreach()
, который вызывается для каждого элемента в СДР.Основное преимущество заключается в том, что мы можем выполнять инициализацию для каждого раздела вместо каждого элемента.
Мы получаем Iterator
в качестве аргумента для mapPartition
, через который мы можем выполнять итерацию по всем элементам.в разделе
Например: (Пример на Java, но это должно дать вам представление.)
JavaRDD<Integer> rdd = sc.parallelize(
Arrays.asList(1, 2, 3, 4, 5));
FlatMapFunction<Iterator<Integer>, AvgCount> setup = new FlatMapFunction<Iterator<Integer>, AvgCount>() {
@Override
public Iterable<AvgCount> call(Iterator<Integer> input) {
AvgCount a = new AvgCount(0, 0);
while (input.hasNext()) {
a.total_ += input.next();
a.num_ += 1;
}
ArrayList<AvgCount> ret = new ArrayList<AvgCount>();
ret.add(a);
return ret;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
@Override
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount result = rdd.mapPartitions(setup).reduce(combine);