Пользовательская сортировка на основе содержимого внешнего массива с помощью Scala / Java API - PullRequest
2 голосов
/ 08 мая 2019

У меня есть следующие данные: -

+-------------+
|    card type|
+-------------+
|ColonialVoice|
| SuperiorCard|
|        Vista|
|  Distinguish|
+-------------+

У меня есть массив произвольного порядка, я хочу упорядочить набор данных, как указано в массиве.

[ "Distinguish", "Vista", "ColonialVoice", "SuperiorCard"]

Ожидаемый результат: -

+-------------+
|    card type|
+-------------+
|  Distinguish|
|        Vista|
|ColonialVoice|
| SuperiorCard|
+-------------+

Как добиться вышеупомянутой пользовательской сортировки с помощью API Java Spark. Может ли кто-нибудь помочь выше, используя любой API.

Ответы [ 2 ]

1 голос
/ 08 мая 2019

Возможное решение, если у вас есть веская причина для этого:

  1. создать DataFrame из этого массива
  2. добавить к этому столбец monotonically_increasing_id DataFrame
  3. присоедините это DataFrame к вашему исходному DataFrame в столбце card type
  4. порядок по столбцу monotonically_increasing_id
  5. опустить столбец monotonically_increasing_id

В Scala это будет:

import org.apache.spark.sql.functions.monotonically_increasing_id

val spark = ...
val df = ...
val order = Array("Distinguish", "Vista", "ColonialVoice", "SuperiorCard")
import spark.implicits._

val orderDF = order.toSeq.toDF
val orderDFWithId = orderDF.withColumn("id", monotonically_increasing_id)
val joined = df.join(orderDFWithId, Seq("card type"), "left_outer")
val sortedDF = joined.orderBy("id").drop("id")
0 голосов
/ 08 мая 2019

Здесь другой подход, который извлечет нужный индекс массива из card type и затем назначит его в новый столбец.Мы можем добиться этого, используя функции Spark array и array_position, представленные в Spark 2.4:

import org.apache.spark.sql.functions.{array_position, array, udf, lit}
val cardTypes = Seq("Distinguish", "Vista", "ColonialVoice", "SuperiorCard")

val df = Seq(
("ColonialVoice"),
("SuperiorCard"),
("Vista"),
("Distinguish"))
.toDF("card_type")

df.withColumn("card_indx", 
              array_position(array(cardTypes.map(t => lit(t)):_*), $"card_type"))
              .orderBy("card_indx")
              .drop("card_indx")
              .show

Сначала мы создаем массив из содержимого cardType Seq с помощью array(cardTypes.map(t => lit(t)):_*), затем извлекаем и присваиваем индекстекущий card_type в новый столбец card_indx.Наконец, мы упорядочиваем по card_indx.

Вывод:

+-------------+
|    card_type|
+-------------+
|  Distinguish|
|        Vista|
|ColonialVoice|
| SuperiorCard|
+-------------+

Для Spark <2.4.0 массив_положение недоступно, и вы можете использовать udf: </p>

val getTypesIndx = udf((types: Seq[String], cardt: String) => cardTypes.indexOf(cardt))

df.withColumn("card_indx", getTypesIndx(array(cardTypes.map(t => lit(t)):_*), $"card_type"))
              .orderBy("card_indx")
              .drop("card_indx")
              .show
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...