PySpark выполняет обычную Python функцию в каждой строке DataFrame - PullRequest
1 голос
/ 13 февраля 2020

У меня Spark DataFrame DF1 с миллионами строк. Каждая строка имеет до 100 столбцов.

col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

Кроме того, у меня есть еще один DataFrame DF2 , где у меня есть сотни строк с именами и столбцами тела. Имя содержит имя функции, тело содержит простой Python код, логическая функция, которая возвращает истину или ложь. Эти функции внутри их логики c могут ссылаться на любой столбец в одной строке из DF1.

func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

Мне нужно объединить оба этих кадра данных - DF1 с DF2 и применить каждую функцию из Df2 к каждой строке в DF1. Каждая функция должна иметь возможность принимать параметры из DF1, скажем, словарный массив с парами ключ / значение, которые представляют имя / значение всех столбцов соответствующей строки из DF1.

Я знаю, как объединить DF1 и DF2 Кроме того, я понимаю, что выполнение функций Python не будет работать распределенным образом. Это нормально сейчас. Это временное решение. Мне просто нужно распределить все строки из DF1 по рабочим узлам и применить каждую Python функцию к каждой строке DF1 в различных задачах Apache Spark-приложения. Оцените их eval() и передайте словарный массив с парами ключ / значение внутри, как я упоминал выше.

В общем, каждая функция Python - это тег, который я хотел бы назначить строке в DF1 если определенная функция вернула истину. Например, в результате получается DataFrame DF3 :

col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

Возможно ли это с PySpark, и если да, не могли бы вы показать пример, как этого можно достичь? Функции UDF с Map из DF.columns в качестве входного параметра - правильный путь к go или это можно сделать более простым способом? Есть ли у Spark какие-либо ограничения на количество функций UDF (число), которые можно зарегистрировать в один момент времени?

1 Ответ

1 голос
/ 13 февраля 2020

Вы можете достичь этого, используя SQL выражений, которые можно оценить, используя expr. Однако вы не сможете присоединиться к двум фреймам данных, поскольку выражения SQL не могут быть оценены как значения столбцов (см. post ), поэтому необходимо собрать функции в список (как у вас есть только сотни строк, это может уместиться в памяти).

Вот рабочий пример, который вы можете адаптировать под свои требования:

data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
    .show(truncate=False)

После добавления столбца массива tags, filter используется функция для удаления нулевые значения, которые соответствуют неудовлетворенным выражениям. Эта функция доступна только начиная с Spark 2.4+, вам придется использовать и UDF для более старых версий.

Дает:

+----+----+----+----+----+----+---------------------+
|col1|col2|col3|col4|col5|col6|tags                 |
+----+----+----+----+----+----+---------------------+
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |
+----+----+----+----+----+----+---------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...