Я пишу задание ETL Spark (2.4) в Scala для чтения ;
файлов CSV, разделенных с шаблоном глобуса на S3.Данные загружаются в DataFrame и содержат столбец (скажем, он называется custom
) со строкой в формате JSON ( несколько уровней вложенности ).Цель состоит в том, чтобы автоматически вывести схему из этого столбца, чтобы ее можно было структурировать для приемника записи в файлах Parquet в S3.
Этот пост ( Как запросить столбец данных JSON с помощью Spark DataFrames?) предполагает, что schema_of_json
из Spark 2.4 может вывести схему из столбца или строки в формате JSON.
Вот что я пробовал:
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
Но вышеприведенное не 't работает и вызывает это исключение:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]
Имейте в виду, я отфильтровываю нулевые значения на custom
для этого кадра данных.
РЕДАКТИРОВАТЬ: весь код ниже.
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
/**
* RandomName entry point.
*
* @author Random author
*/
object RandomName {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("RandomName")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
.getOrCreate
import spark.implicits._
val randomName: RandomName = new RandomName(spark)
val df: sql.DataFrame = randomName.read().filter($"custom".isNotNull)
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
df.show
spark.stop
}
}
class RandomName(private val spark: SparkSession) {
/**
* Reads CSV files from S3 and creates a sql.DataFrame.
*
* @return a sql.DataFrame
*/
def read(): sql.DataFrame = {
val tableSchema = StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true),
StructField("c", DateType, true),
StructField("custom", StringType, true)
))
spark.read
.format("csv")
.option("sep", ";")
.option("header", "true")
.option("inferSchema", "true")
.schema(tableSchema)
.load("s3://random-bucket/*")
}
}
И пример JSON:
{
"lvl1": {
"lvl2a": {
"lvl3a": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
},
"lvl2b": {
"lvl3a": {
"lvl4a": "ramdom_data"
},
"lvl3b": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
}
}
}