Введение
Перед предложением окончательного решения необходимо ответить на пару вопросов (например, упорядочение элементов в массиве colleagues
после замены некоторых), но я не хочу перетаскивать этослишком долгоДавайте рассмотрим очень распространенный подход для решения подобных проблем.
Решение
Поскольку colleagues
столбец является столбцом массива (и Spark очень эффективен при запросах по строкам), вам следуетПервый explode
(или posexplode
) это.Со строками на элемент массива вы можете внести необходимые изменения и в конце collect_list
вернуть столбец массива.
explode (e: Column): Column Создает новыйстрока для каждого элемента в данном массиве или столбце карты.
posexplode (e: Column): Column Создает новую строку для каждого элемента с положением в данном столбце массива или карты.
Давайте использовать следующий names
набор данных:
val names = Seq((Array("guy1", "guy2", "guy3"), "Thisguy")).toDF("colleagues", "name")
scala> names.show
+------------------+-------+
| colleagues| name|
+------------------+-------+
|[guy1, guy2, guy3]|Thisguy|
+------------------+-------+
scala> names.printSchema
root
|-- colleagues: array (nullable = true)
| |-- element: string (containsNull = true)
|-- name: string (nullable = true)
Давайте explode
, внесите изменения и в конце collect_list
.
val elements = names.withColumn("elements", explode($"colleagues"))
scala> elements.show
+------------------+-------+--------+
| colleagues| name|elements|
+------------------+-------+--------+
|[guy1, guy2, guy3]|Thisguy| guy1|
|[guy1, guy2, guy3]|Thisguy| guy2|
|[guy1, guy2, guy3]|Thisguy| guy3|
+------------------+-------+--------+
Это то, что Spark SQL может справиться с легкостью.Давайте использовать regexp_replace
(Что? Regexp ?! И теперь у вас есть две проблемы:)).
val replaced = elements.withColumn("replaced", regexp_replace($"elements", "guy2", "guy10"))
scala> replaced.show
+------------------+-------+--------+--------+
| colleagues| name|elements|replaced|
+------------------+-------+--------+--------+
|[guy1, guy2, guy3]|Thisguy| guy1| guy1|
|[guy1, guy2, guy3]|Thisguy| guy2| guy10|
|[guy1, guy2, guy3]|Thisguy| guy3| guy3|
+------------------+-------+--------+--------+
В конце давайте сгруппируемся по столбцу исходного массива и используем collect_list
функцию группировки.
val solution = replaced
.groupBy($"colleagues" as "before")
.agg(
collect_list("replaced") as "after",
first("name") as "name")
scala> solution.show
+------------------+-------------------+-------+
| before| after| name|
+------------------+-------------------+-------+
|[guy1, guy2, guy3]|[guy1, guy10, guy3]|Thisguy|
+------------------+-------------------+-------+
Альтернативные решения
Пользовательская функция (UDF)
Кроме того, вы также можете написать пользовательскую пользовательскую функцию, но это не принесет пользыстолько оптимизаций, сколько и в приведенном выше решении, поэтому я бы не рекомендовал его (и будет отображаться только по запросу).
Пользовательский логический оператор
Наилучший подход - написать собственный логический оператор ( LogicalPlan ), который будет делать все это и участвовать в оптимизации, но избегать обменов (введено groupBy
).Тем не менее, это будет довольно продвинутая разработка Spark, которую я еще не сделал.