Сравните и удалите элементы из столбцов данных несовместимых массивов в Spark / PySpark - PullRequest
1 голос
/ 30 января 2020

Я новичок в Spark и не могу найти решение своей проблемы, любые предложения или помощь очень важны.

У меня есть Pyspark. sql .dataframe с двумя столбцами массива, которые есть строки в нем. Оба массива столбцов имеют непоследовательную длину, а в некоторых строках также есть пустые записи. Мне нужно сравнить эти два столбца и удалить для каждой строки в столбце B элемент массива, если он найден в этой строке в массиве в столбце OVERRIDE.

+---------------+---------------+
|    OVERRIDE   |         B     |
+---------------+---------------+
|          ['a']|      ['a','b']|
|           null|          ['b']|
|           null|      ['a','c']|
|      ['d','g']|      ['d','g']|
|           null|           null|
|          ['f']|          ['f']|
+---------------+---------------+

должно выглядеть следующим образом в конце:

+---------------+---------------+
|    OVERRIDE   |         B     |
+---------------+---------------+
|          ['a']|          ['b']|
|           null|          ['b']|
|           null|      ['a','c']|
|      ['d','g']|           null|
|           null|           null|
|          ['f']|           null|
+---------------+---------------+

Я пытался с

from pyspark.sql.functions import array_remove, array_intersect

df = df.withColumn('B', array_remove(df.B, df.OVERRIDE))

, а также

df = df.withColumn('B', array_remove(df.B, array_intersect(df.OVERRIDE, df.B)))

, но узнал, что array_remove () не может перебирать столбец, вместо этого просто может взять один элемент (например, 'a'), чтобы удалить его, а затем во всех строках столбца B.

Должен ли я создать функцию udf, и если да, то как мне это сделать?

Ответы [ 2 ]

1 голос
/ 30 января 2020

Если вы работаете с Spark> = 2.4.0, вы можете использовать встроенную функцию array_except (a, b) . Функция вернет все элементы, которые присутствуют в a, но отсутствуют в b. Хотя функция работает только тогда, когда оба параметра не имеют нулевых значений, поэтому перед ее использованием нам нужно заменить null пустым массивом .

Вот версия python:

from pyspark.sql.functions import array_except, when, array, col

df = spark.createDataFrame([
  [["a"], ["a", "b"]],
 [None, ["b"]],
 [None, ["a", "c"]],
 [["d", "g"], ["d", "g"]],
 [["f"], ["f"]]
], ["OVERRIDE", "B"])

df.withColumn("OVERRIDE", when(col("OVERRIDE").isNull(), array()).otherwise(col("OVERRIDE"))) \
  .withColumn("diff", array_except(col("B"), col("OVERRIDE"))) \
  .show()

// +--------+------+------+
// |OVERRIDE|     B|  diff|
// +--------+------+------+
// |     [a]|[a, b]|   [b]|
// |      []|   [b]|   [b]|
// |      []|[a, c]|[a, c]|
// |  [d, g]|[d, g]|    []|
// |     [f]|   [f]|    []|
// +--------+------+------+

И Scala один:

import org.apache.spark.sql.functions.{array_except, when, array}

val df = Seq(
 (Seq("a"), Seq("a", "b")),
 (null, Seq("b")),
 (null, Seq("a", "c")),
 (Seq("d", "g"), Seq("d", "g")),
 (Seq("f"), Seq("f"))
).toDF("OVERRIDE", "B")

df.withColumn("OVERRIDE", when($"OVERRIDE".isNull, array()).otherwise($"OVERRIDE"))
  .withColumn("diff", array_except($"B", $"OVERRIDE"))
  .show

// +--------+------+------+
// |OVERRIDE|     B|  diff|
// +--------+------+------+
// |     [a]|[a, b]|   [b]|
// |      []|   [b]|   [b]|
// |      []|[a, c]|[a, c]|
// |  [d, g]|[d, g]|    []|
// |     [f]|   [f]|    []|
// +--------+------+------+
1 голос
/ 30 января 2020

Вы можете сделать это, используя udf

@udf(returnType=ArrayType(StringType()))
def removeFromRight(override,b):
if(override==None or b==None):
    return b

filtered_list=[x for x in b if x not in override]
if(len(filtered_list)==0):
    filtered_list=None
return filtered_list

test1=test.withColumn("new_overridden_col",removeFromRight(col("override"),col("b")))    
test1.show()

//output of test1
+--------+------+------------------+
|override|     b|new_overridden_col|
+--------+------+------------------+
|     [a]|[a, b]|               [b]|
|    null|   [b]|               [b]|
|    null|[a, c]|            [a, c]|
|  [d, g]|  null|              null|
|    null|  null|              null|
|     [f]|  null|              null|
+--------+------+------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...