Переименовать ключ во вложенной схеме Spark DataFrame (Scala) - PullRequest
0 голосов
/ 18 января 2019

У меня есть сценарий использования, который должен прочитать вложенную схему JSON и записать ее обратно как Parquet (). Моя схема изменяется в зависимости от дня, когда я читаю данные, поэтому я не знаю точная схема заранее ), так как в некоторых из моих ключей гнезда у меня есть какой-то символ, например пробел, когда я хочу сохранить его как паркет, я получаю исключение с жалобой на специальный символ ,;{}()\\n\\t=

Это пример схемы, это не настоящие ключи схемы, динамические и изменяющиеся изо дня в день

  val nestedSchema = StructType(Seq(
    StructField("event_time", StringType),
    StructField("event_id", StringType),
    StructField("app", StructType(Seq(
      StructField("environment", StringType),
      StructField("name", StringType),
      StructField("type", StructType(Seq(
        StructField("word tier", StringType), ### This cause problem when you save it as Parquet
        StructField("level", StringType)
    ))
 ))))))

val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

myDF.printSchema

Выход

root
 |-- event_time: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- app: struct (nullable = true)
 |    |-- environment: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: struct (nullable = true)
 |    |    |-- word tier: string (nullable = true)
 |    |    |-- level: string (nullable = true)

Попытка сохранить как паркет

myDF.write
          .mode("overwrite")
          .option("compression", "snappy")
          .parquet("PATH/TO/DESTINATION")

Я нашел какое-то решение, подобное этому

myDF.toDF(myDF
          .schema
          .fieldNames
          .map(name => "[ ,;{}()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)
.write
              .mode("overwrite")
              .option("compression", "snappy")
              .parquet("PATH/TO/DESTINATION")

Но он работает только с родительскими ключами, а не с вложенными. Есть ли рекурсивное решение для этого?

Мой вопрос не является дубликатом этого вопроса Поскольку моя схема является динамической, и я не знаю, каковы мои ключи. Он изменяется в зависимости от данных, которые я читаю, поэтому мое решение должно быть общим, мне нужно как-то рекурсивно создать ту же структуру схемы, но с ключом с правильным именем.

1 Ответ

0 голосов
/ 19 января 2019

По сути, вы должны создать выражение Column, которое приведёт ваш ввод к типу с очищенными именами полей. Для этого вы можете использовать функцию org.apache.spark.sql.functions.struct, которая позволяет комбинировать другие Column для построения столбца структурного типа. Примерно так должно работать:

  import org.apache.spark.sql.{functions => f}

  def sanitizeName(s: String): String = s.replace(" ", "_")

  def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
    st.fields.map { sf =>
      val sanitizedName = sanitizeName(sf.name)
      val sanitizedField = sf.dataType match {
        case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
        case _ => context(sf.name)
      }
      sanitizedField.as(sanitizedName)
    }: _*
  )

Вы используете это так:

val df: DataFrame = ...

val appFieldType = df.schema("app").asInstanceOf[StructType]  // or otherwise obtain the field type
df.withColumn(
  "app",
  sanitizeFieldNames(appFieldType, df("app")(_))
)

Для вашего типа эта рекурсивная функция будет возвращать столбец типа

f.struct(
  df("app")("environment").as("environment"),
  df("app")("name").as("name"),
  f.struct(
    df("app")("type")("word tier").as("word_tier"),
    df("app")("type")("level").as("level")
  ).as("type")
)

, который затем присваивается полю «app», заменяя то, что там присутствует.

Однако у этого решения есть ограничение. Он не поддерживает вложенные массивы или карты: если у вас есть схема со структурами внутри массивов или карт, этот метод не будет преобразовывать какие-либо структуры внутри массивов и карт. При этом в Spark 2.4 они добавили функции, которые выполняют операции с коллекциями, поэтому возможно, что в Spark 2.4 эта функция может быть обобщена для поддержки вложенных массивов и карт.

Наконец, можно сделать то, что вы хотите с mapPartitions. Сначала вы пишете рекурсивный метод, который очищает только StructType вашего поля:

def sanitizeType(dt: DataType): DataType = dt match {
  case st: StructType => ...  // rename fields and invoke recursively
  case at: ArrayType => ...  // invoke recursively
  case mt: MapType => ...  // invoke recursively
  case _ => dt  // simple types do not have anything to sanitize
}

Во-вторых, вы применяете очищенную схему к вашему фрейму данных. Есть два основных способа сделать это: безопасный mapPartitions и один, использующий внутренний Spark API.

С mapPartitions это просто:

df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))

Здесь мы применяем операцию mapPartitions и явно указываем выходной кодер. Помните, что схемы в Spark не свойственны данным: они всегда связаны с конкретным фреймом данных. Все данные внутри фрейма данных представлены в виде строк без меток на отдельных полях, только позиции. Пока ваша схема имеет одинаковые типы на тех же позициях (но с потенциально разными именами), она должна работать так, как вы ожидаете.

mapPartitions приводит к нескольким дополнительным узлам в логическом плане. Чтобы избежать этого, можно создать экземпляр Dataset[Row] напрямую с помощью специального кодера:

new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))

Это позволит избежать ненужных mapPartitions (что, как правило, приводит к шагам deserialize-map-serialize в плане выполнения запроса), но это может быть небезопасно; Я лично не проверял это сейчас, но это могло бы работать на вас.

...