pySpark forEachPartition - где выполняется код - PullRequest
0 голосов
/ 12 апреля 2019

Я использую pySpark в версии 2.3 (не могу обновить до 2.4 в моей текущей системе разработки) и у меня есть следующие вопросы, касающиеся foreachPartition .

Сначала небольшой контекст: насколько я понял, pySpark- UDFs заставляет Python-код выполняться внутри драйвера-процесса, что делает его затратным по производительности. Так как мне нужно применить некоторые Python-функции к моим данным и запретить каждой строке выполнять один вызов функции для драйвера (что делают UDF), у меня возникла идея, по крайней мере, загрузить пригодную для использования группу данных в драйвер и обработать его как Pandas-DataFrame. Затем я прочитал, что указанные foreachPartition процессы применяют функцию ко всем данным в разделе и, следовательно, позволяют параллельную обработку.

Мои вопросы сейчас:

  1. Когда я применяю Python-функцию через foreachPartition, происходит ли выполнение Python-аналога UDFs в процессе драйвера (и поэтому данные раздела передаются по сети моему драйверу)

  2. Обрабатываются ли данные по-прежнему по строкам в пределах foreachPartition (имеется в виду, что каждая строка RDD вызывает новое выполнение Python), или данные раздела обрабатываются сразу (например, весь раздел) передается драйверу и обрабатывается в целом одним выполнением Python, например, как Pandas-DataFrame)?

Заранее спасибо за ваш вклад!

Edit: 1029 ** Мое текущее "в драйвере" - решение выглядит так:

def partition_generator(rdd):
    glom = rdd.glom()
    #Optionally persist glom
    for partition in range(rdd.getNumPartitions()):
        yield glom.map(lambda row: row[partition]).collect()

Ответы [ 2 ]

0 голосов
/ 16 апреля 2019

К счастью, я наткнулся на это замечательное объяснение mapPartitions от Мринала (ответил здесь ).

mapPartitions применяет функцию к каждому разделу СДР.Следовательно, распараллеливание может использоваться, если разделы распределены по разным узлам.На этих узлах создаются соответствующие Python-экземпляры, необходимые для обработки Python-функций.В то время как foreachPartition применяет только функцию (например, записывает ваши данные в .csv-файл), mapPartitions также возвращает новый RDD.Поэтому использование foreachPartition было для меня неправильным выбором.

Чтобы ответить на мой второй вопрос: такие функции, как map или UDFs, создают новый экземпляр Python для каждой строки DataFrame / RDD, в результате чего много накладных расходов.foreachPartition и mapPartitions (обе функции RDD) переносят весь раздел в экземпляр Python, следовательно, требуется значительно меньшее количество экземпляров.

Кроме того, использование генераторов также уменьшает объем необходимой памятидля итерации по этим переданным данным раздела (разделы обрабатываются как объекты итератора, тогда как каждая строка затем обрабатывается путем итерации по этому объекту).

Пример может выглядеть следующим образом:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

Надеюсьэто помогает кому-то сталкиваться с похожими проблемами:)

0 голосов
/ 12 апреля 2019

Пользовательские функции pySpark выполняются рядом с исполнителями - то есть в отдельном экземпляре python для каждого исполнителя, который работает бок о бок и передает данные назад и вперед между механизмом искры (scala) и интерпретатором python.

то же самое верно для обращений к udfs внутри foreachPartition

Edit - после просмотра примера кода

  1. использование RDD не является эффективным способом использования spark - вам следует перейти кнаборы данных
  2. , что делает ваш код синхронизировать все данные с драйвером, это collect ()
  3. foreachParition будет похож на glom
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...