Создайте столбец в кадре данных искры из вычисления значения строки со всем столбцом. - PullRequest
1 голос
/ 23 сентября 2019

У меня есть фрейм данных:

|id|value|
| 0|    1|
| 1|    3|
| 2|    9|

Я хочу применить функцию к каждой строке, чтобы создать новый столбец.Эта функция должна иметь значение строки в параметре и весь столбец в качестве второго параметра для создания вектора.

Например: вектор сумм значений из столбца со значением строки:

def fu(myValue, myColumn):
    return [myValue + i for i in myColumn]

Чтобы иметь:

|id|value|sums_in_column|
| 0|    1|    [2, 4, 10]|
| 1|    3|    [3, 6, 12]|
| 2|    9|  [10, 12, 18]|

Я знаю, я могу передать одно или несколько значений из строки, чтобы вычислить новый столбец, используя withColumn и udf, чтобы распараллелить выполнение.Но я не понимаю, как я мог передать колонку в качестве аргумента?Возможно ли это?

Ответы [ 2 ]

1 голос
/ 23 сентября 2019

Настройка:

>>> d = [{'id': 0, 'value': 1},{'id': 1, 'value': 3},{'id': 2, 'value': 9}]
>>> df0 = spark.createDataFrame(d)
>>> df0.show()
+---+-----+
| id|value|
+---+-----+
|  0|    1|
|  1|    3|
|  2|    9|
+---+-----+

Шаг 1. Используйте функцию collect_list() для создания массива всех значений в столбце value и добавления этого массива в качестве столбца к исходному кадру данных

>>> from pyspark.sql.functions import *
>>> arr = df0.agg(collect_list(df.value).alias('arr_column'))
>>> df1 = df0.crossJoin(arr)
>>> df1.show()
+---+-----+-------------+
| id|value|   arr_column|
+---+-----+-------------+
|  0|    1|    [1, 3, 9]|
|  1|    3|    [1, 3, 9]|
|  2|    9|    [1, 3, 9]|
+---+-----+-------------+

Cross-join по сути транслирует массив всем исполнителям, так что следите за размером данных, к которому вы хотите применить его.(От вас также может потребоваться установить spark.sql.crossJoin.enabled=true явно при создании контекста Spark, потому что Spark не любит перекрестные соединения именно по этой причине.)

Шаг 2: зарегистрируйте вашу функцию fu как UDF Spark

>>> from pyspark.sql.types import *
>>> fu_udf = udf(fu, ArrayType(IntegerType()))

Шаг 3: Используйте эту UDF для увеличения элементов массива

>>> df3 = df1.withColumn('sums_in_column',fu_udf(df1.value,df1.arr_column))
>>> df3.show()
+---+-----+-------------+--------------+
| id|value|   arr_column|sums_in_column|
+---+-----+-------------+--------------+
|  0|    1|    [1, 3, 9]|    [2, 4, 10]|
|  1|    3|    [1, 3, 9]|    [4, 6, 12]|
|  2|    9|    [1, 3, 9]|  [10, 12, 18]|
+---+-----+-------------+--------------+
1 голос
/ 23 сентября 2019

Вы не можете передать данные целого столбца в UDF, потому что механизм Spark разделяет вычисления и данные на несколько серверов / исполнителей.

Если вы можете адаптировать свой алгоритм для работы на локальном исполнителеподмножество значений столбца, вы можете использовать RDD.mapPartitions для выполнения одной функции над полным разделом данных.

В качестве альтернативы, если вы знаете, что данные столбца могут поместиться в памяти ваших исполнителейвы можете сначала DataFrame.collect () данные столбца и использовать SparkContext.broadcast () , чтобы скопировать его всем исполнителям и использовать ссылку на широковещательную переменную в вашем UDF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...