Сортировка по ключу с карты в наборе данных - PullRequest
1 голос
/ 01 мая 2019

Я хочу упорядочить по времени некоторые авро-файлы, полученные из HDFS.

Схема моих файлов avro:

Заголовки: Map [String, String], тело: String

Теперь сложная часть заключается в том, что отметка времени является одной из ключ / значение с карты. Итак, у меня есть временная метка на карте:

ключ_1 -> значение_1, ключ_2 -> значение_2, отметка времени -> 1234567, ключ_n -> value_n

Обратите внимание, что тип значений - String.

Я создал класс case для создания набора данных с этой схемой:

case class Root(headers : Map[String,String], body: String)

Создание моего набора данных:

val ds = spark
          .read
          .format("com.databricks.spark.avro")
          .load(pathToHDFS)
          .as[Root]

Я действительно не знаю, как начать с этой проблемы, так как я могу получить только заголовки столбцов и тело. Как я могу получить вложенные значения для окончательной сортировки по отметке времени?

Я бы хотел сделать что-то вроде этого:

ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")

Небольшая точность: я не хочу терять какие-либо данные из моего начального набора данных, просто операция сортировки.

Я использую Spark 2.3.0.

Ответы [ 3 ]

2 голосов
/ 01 мая 2019

Вы можете использовать Scala sortBy, который принимает функцию.Я бы посоветовал вам явно объявить val ds как Vector (или другую коллекцию), чтобы вы могли видеть применимые функции в IntelliJ (если вы используете IntelliJ), и он обязательно скомпилируется.

См.мой пример ниже, основанный на вашем коде:

  case class Root(headers : Map[String,String], body: String)

  val ds: Vector[Root] = spark
    .read
    .format("com.databricks.spark.avro")
    .load(pathToHDFS)
    .as[Root]

  val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse

Редактировать: добавлено обратное (при условии, что вы хотите, чтобы он по убыванию).Внутри функции, которую вы передаете в качестве аргумента, вы также поместите обработку в отметку времени.

1 голос
/ 01 мая 2019

Загруженный Dataset должен выглядеть примерно так, как показано в примере набора данных ниже:

case class Root(headers : Map[String, String], body: String)

val ds = Seq(
  Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
  Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS

Вы можете просто найти Map с помощью клавиши timestamp, cast значение до Long и выполнить orderBy следующим образом:

ds.
  withColumn("ts", $"headers"("timestamp").cast("Long")).
  orderBy("ts").
  show(false)
// +-------------------------------------------------+-----+----------+
// |headers                                          |body |ts        |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+

Обратите внимание, что $"headers"("timestamp") - это то же самое, что и метод столбца apply (т.е. $"headers".apply("timestamp")).

В качестве альтернативы вы также можете использовать getItem для доступа к Map по клавише, например:

$"headers".getItem("timestamp")
0 голосов
/ 01 мая 2019
import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp

case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)

implicit val rootCodec: Encoder[Root] = Encoders.product[Root]

val avroDS:Dataset[Root] = spark.read
                                .format("com.databricks.spark.avro")
                                .load(pathToHDFS)
                                .as[Root]

val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))

Этот фрагмент кода будет напрямую приводить ваши данные Avro к Dataset[Root].Вам не нужно полагаться на импорт sparksession.implicits, и вы исключите шаг приведения вашего поля timestamp к TimestampType .Внутренне, тип данных Spark Timestamp реализован с использованием java.sql.Timestamp.

...