Другой метод, где вы получаете схему динамически, используя пример записи JSON.Строка мусора анализируется с помощью функции регулярного выражения regexp_extract ()
Проверьте это:
scala> val df = Seq(( """<1>2019-03-20T20:59:59Z daily_report.txt[102852]: { "ts": "1553115599", "data": {"field1": "value11", "field2": "value12"} }"""),
| ("""<2>2019-03-20T20:59:59Z daily_report.txt[102852]: { "ts": "1553115599", "data": {"field1": "value21", "field2": "value22"} }"""),
| ("""<3>2019-03-20T20:59:59Z daily_report.txt[102852]: { "ts": "1553115599", "data": {"field1": "value31", "field2": "value32"} }""")).toDF("data_garb")
df: org.apache.spark.sql.DataFrame = [data_garb: string]
scala> val json_str = """{ "ts": "1553115599", "data": {"field1": "value11", "field2": "value12"} }"""
json_str: String = { "ts": "1553115599", "data": {"field1": "value11", "field2": "value12"} }
scala> val dfj = spark.read.json(Seq(json_str).toDS)
dfj: org.apache.spark.sql.DataFrame = [data: struct<field1: string, field2: string>, ts: string]
scala> dfj.schema
res44: org.apache.spark.sql.types.StructType = StructType(StructField(data,StructType(StructField(field1,StringType,true), StructField(field2,StringType,true)),true), StructField(ts,StringType,true))
scala> val df2=df.withColumn("newc",regexp_extract('data_garb,""".*?(\{.*)""",1)).withColumn("newc",from_json('newc,dfj.schema)).drop("data_garb")
df2: org.apache.spark.sql.DataFrame = [newc: struct<data: struct<field1: string, field2: string>, ts: string>]
scala> df2.show(false)
+--------------------------------+
|newc |
+--------------------------------+
|[[value11, value12], 1553115599]|
|[[value21, value22], 1553115599]|
|[[value31, value32], 1553115599]|
+--------------------------------+
Подстановочный знак позволяет выбирать отдельные поля
scala> df2.select($"newc.*").show(false)
+------------------+----------+
|data |ts |
+------------------+----------+
|[value11, value12]|1553115599|
|[value21, value22]|1553115599|
|[value31, value32]|1553115599|
+------------------+----------+
scala>
Или выможет запрашивать вложенные поля, явно упоминая их
scala> df2.select($"newc.ts",$"newc.data.field1",$"newc.data.field2").show(false)
+----------+-------+-------+
|ts |field1 |field2 |
+----------+-------+-------+
|1553115599|value11|value12|
|1553115599|value21|value22|
|1553115599|value31|value32|
+----------+-------+-------+
scala>