Обновите значения в столбце на основе значений столбца другого фрейма данных в PySpark - PullRequest
0 голосов
/ 22 октября 2018

У меня есть два фрейма данных в PySpark: df1

+---+-----------------+
|id1|           items1|
+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

и df2:

+---+-----------------+
|id2|           items2|
+---+-----------------+
|001|              [B]|
|002|              [A]|
|003|              [C]|
|004|              [E]|
+---+-----------------+ 

Я хотел бы создать новый столбец в df1, которыйобновит значения в столбце items1, так что он будет содержать только те значения, которые также появляются (в любой строке) items2 в df2.Результат должен выглядеть следующим образом:

+---+-----------------+----------------------+
|id1|           items1|        items1_updated|
+---+-----------------+----------------------+
|  0|     [B, C, D, E]|             [B, C, E]|
|  1|        [E, A, C]|             [E, A, C]|
|  2|     [F, A, E, B]|             [A, E, B]|
|  3|        [E, G, A]|                [E, A]|
|  4|  [A, C, E, B, D]|          [A, C, E, B]|
+---+-----------------+----------------------+

Обычно я использовал бы collect (), чтобы получить список всех значений в столбце items2, а затем использовал бы udf, примененный к каждой строке в items1, чтобы получитьпересечение.Но данные очень большие (более 10 миллионов строк), и я не могу использовать collect (), чтобы получить такой список.Есть ли способ сделать это, сохраняя данные в формате фрейма данных?Или как-то иначе без использования collect ()?

1 Ответ

0 голосов
/ 22 октября 2018

Первое, что вы хотите сделать, это explode значения в df2.items2, чтобы содержимое массивов находилось в отдельных строках:

from pyspark.sql.functions import explode
df2 = df2.select(explode("items2").alias("items2"))
df2.show()
#+------+
#|items2|
#+------+
#|     B|
#|     A|
#|     C|
#|     E|
#+------+

(Предполагается, чточто значения в df2.items2 различны - если нет, вам необходимо добавить df2 = df2.distinct().)

Опция 1 : использовать crossJoin:

СейчасВы можете crossJoin новый df2 вернуться к df1 и сохранить только те строки, где df1.items1 содержит элемент в df2.items2.Мы можем достичь этого, используя pyspark.sql.functions.array_contains и этот трюк , который позволяет нам использовать значение столбца в качестве параметра .

После фильтрации сгруппируйте по id1 и items1 и агрегируйте, используя pyspark.sql.functions.collect_list

from pyspark.sql.functions import expr, collect_list

df1.alias("l").crossJoin(df2.alias("r"))\
    .where(expr("array_contains(l.items1, r.items2)"))\
    .groupBy("l.id1", "l.items1")\
    .agg(collect_list("r.items2").alias("items1_updated"))\
    .show()
#+---+---------------+--------------+
#|id1|         items1|items1_updated|
#+---+---------------+--------------+
#|  1|      [E, A, C]|     [A, C, E]|
#|  0|   [B, C, D, E]|     [B, C, E]|
#|  4|[A, C, E, B, D]|  [B, A, C, E]|
#|  3|      [E, G, A]|        [A, E]|
#|  2|   [F, A, E, B]|     [B, A, E]|
#+---+---------------+--------------+

Опция 2 : Разнесение df1.items1 и левое соединение:

Другой вариант - explode содержимое items1 в df1 и сделать левое соединение.После объединения мы должны сделать аналогичную группировку и агрегирование, как указано выше.Это работает, потому что collect_list будет игнорировать значения null, введенные несоответствующими строками

df1.withColumn("items1", explode("items1")).alias("l")\
    .join(df2.alias("r"), on=expr("l.items1=r.items2"), how="left")\
    .groupBy("l.id1")\
    .agg(
        collect_list("l.items1").alias("items1"),
        collect_list("r.items2").alias("items1_updated")
    ).show()
#+---+---------------+--------------+
#|id1|         items1|items1_updated|
#+---+---------------+--------------+
#|  0|   [E, B, D, C]|     [E, B, C]|
#|  1|      [E, C, A]|     [E, C, A]|
#|  3|      [E, A, G]|        [E, A]|
#|  2|   [F, E, B, A]|     [E, B, A]|
#|  4|[E, B, D, C, A]|  [E, B, C, A]|
#+---+---------------+--------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...