Неявное обнаружение схемы в столбце Spark DataFrame в формате JSON - PullRequest
0 голосов
/ 14 февраля 2019

Я пишу задание ETL Spark (2.4) в Scala для чтения ; файлов CSV, разделенных с шаблоном глобуса на S3.Данные загружаются в DataFrame и содержат столбец (скажем, он называется custom) со строкой в ​​формате JSON ( несколько уровней вложенности ).Цель состоит в том, чтобы автоматически вывести схему из этого столбца, чтобы ее можно было структурировать для приемника записи в файлах Parquet в S3.

Этот пост ( Как запросить столбец данных JSON с помощью Spark DataFrames?) предполагает, что schema_of_json из Spark 2.4 может вывести схему из столбца или строки в формате JSON.

Вот что я пробовал:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

Но вышеприведенное не 't работает и вызывает это исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

Имейте в виду, я отфильтровываю нулевые значения на custom для этого кадра данных.


РЕДАКТИРОВАТЬ: весь код ниже.

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

И пример JSON:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}

1 Ответ

0 голосов
/ 14 февраля 2019

Это показатель того, что custom не является допустимым вводом для schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

. Вы должны вернуться к своим данным и убедиться, что custom действительно String.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...