Сравните значение в DF массив массив искры - PullRequest
0 голосов
/ 11 марта 2020

Мне нужно решить следующую проблему с помощью spark / scala

У меня есть эта DF

+--------------+--------------------+
|co_tipo_arquiv|          errorCodes|
+--------------+--------------------+
|            05|[10531, 20524, 10...|

эта схема:

root
 |-- co_tipo_arquiv: string (nullable = true)
 |-- errorCodes: array (nullable = true)
 |    |-- element: string (containsNull = true)

Мне нужно проверьте, есть ли какой-либо из кодов в моем списке ошибок (list_erors) в df в столбце errorCodes

val list_erors = List("10531","10144")

я пробую это, но не работает

dfNire.filter (col ( "errorCodes"). ISIN (list_erors)). показать ()

Ответы [ 2 ]

1 голос
/ 11 марта 2020

Spark 2.4 +

Вы можете использовать функцию array_intersect с массивом ошибок.

val list_errors = Array("10531","10144")

df.withColumn("intersect", array_intersect(col("errors"), lit(list_errors))).show(false)

Тогда результат будет следующим:

+---+---------------------+---------+
|id |errors               |intersect|
+---+---------------------+---------+
|05 |[10531, 20524, 11111]|[10531]  |
+---+---------------------+---------+

где имя столбца является временным для моего теста.

0 голосов
/ 11 марта 2020

Если вы хотите проверить / перечислить, содержит ли массив list_errors, тогда:

df.show()
//+------------------+
//|        errorCodes|
//+------------------+
//|[10531, 20254, 10]|
//|              [10]|
//+------------------+


def is_exists_any(s: Seq[String]): UserDefinedFunction = udf((c: collection.mutable.WrappedArray[String]) => c.toList.intersect(s).nonEmpty)

val list_errors = Seq("10531", "10144")

df.withColumn("is_exists",is_exists_any(list_errors)(col("errorCodes"))).filter(col("is_exists") === true).show()

//+------------------+---------+
//|        errorCodes|is_exists|
//+------------------+---------+
//|[10531, 20254, 10]|     true|
//+------------------+---------+

Другой способ получить строки без использования udf - использовать array_intersect, а затем только перечислить строки, где size of array не 0.

df.withColumn("is_exists", array_intersect(col("errorCodes"), lit(list_errors))).
filter(size(col("is_exists")) !==0).
show()
//+------------------+---------+
//|        errorCodes|is_exists|
//+------------------+---------+
//|[10531, 20254, 10]|  [10531]|
//+------------------+---------+
...