Предполагая, что ваше условие применяется к элементам массива с таким же индексом, возможно фильтровать массивы с лямбда-функциями в SQL начиная с Spark 2.4.0, но это все еще не доступно через API других языков ивам нужно использовать expr()
. Вы просто заархивируете три массива, а затем отфильтруете получившийся массив структур:
scala> df.show()
+---+------------+--------------------+------------+
| ID| COL1| COL2| COL3|
+---+------------+--------------------+------------+
| 1|[A, B, C, A]|[101, 102, 103, 104]|[P, Q, R, S]|
+---+------------+--------------------+------------+
scala> df.select($"ID", expr(s"""
| filter(
| arrays_zip(COL1, COL2, COL3),
| e -> e.COL1 == "A" AND CAST(e.COL2 AS integer) % 2 == 0
| ).COL3 AS result
| """)).show()
+---+------+
| ID|result|
+---+------+
| 1| [S]|
+---+------+
Поскольку для предоставления выражения SQL в виде столбца используется expr()
, он также работает с PySpark:
>>> from pyspark.sql.functions import expr
>>> df.select(df.ID, expr("""
... filter(
... arrays_zip(COL1, COL2, COL3),
... e -> e.COL1 == "A" AND CAST(e.COL2 AS integer) % 2 == 0
... ).COL3 AS result
... """)).show()
+---+------+
| ID|result|
+---+------+
| 1| [S]|
+---+------+