К счастью, я наткнулся на это замечательное объяснение mapPartitions
от Мринала (ответил здесь ).
mapPartitions
применяет функцию к каждому разделу СДР.Следовательно, распараллеливание может использоваться, если разделы распределены по разным узлам.На этих узлах создаются соответствующие Python-экземпляры, необходимые для обработки Python-функций.В то время как foreachPartition
применяет только функцию (например, записывает ваши данные в .csv-файл), mapPartitions
также возвращает новый RDD.Поэтому использование foreachPartition
было для меня неправильным выбором.
Чтобы ответить на мой второй вопрос: такие функции, как map
или UDFs
, создают новый экземпляр Python для каждой строки DataFrame / RDD, в результате чего много накладных расходов.foreachPartition
и mapPartitions
(обе функции RDD) переносят весь раздел в экземпляр Python, следовательно, требуется значительно меньшее количество экземпляров.
Кроме того, использование генераторов также уменьшает объем необходимой памятидля итерации по этим переданным данным раздела (разделы обрабатываются как объекты итератора, тогда как каждая строка затем обрабатывается путем итерации по этому объекту).
Пример может выглядеть следующим образом:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
Надеюсьэто помогает кому-то сталкиваться с похожими проблемами:)