Загружаю столбец, в котором хранится строка JSON из таблицы mysql в виде архива Spark. Я хотел бы иметь возможность использовать Spark Sql на нем - PullRequest
0 голосов
/ 29 апреля 2020

Вот как мой код

    val query = """
    (select id, data as b_data from gtest) t
    """

    val df = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.0.22:3306/db")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("useSSL", "false")
      .option("user", "dba")
      .option("password", "pwd")
      .option("dbtable",query)
      .load()

    df.createOrReplaceTempView("tbl")

Поле b_data в таблице tbl mysql имеет тип varchar и имеет JSON, который выглядит как показано ниже (только пример), он может быть вложенным и я хотел бы иметь возможность использовать эту JSON без необходимости использования фиксированной схемы, поскольку определение схемы вручную нецелесообразно, поскольку JSON может быть большим и вложенным.

{"id" : 100, "details" : {"fn" : "sample", "ln" : "data"}}

Что Я хотел бы иметь возможность сделать следующее

%sql
select id, b_data.id, b_data.details.fn from tbl

Некоторая информация

df.printSchema

root
 |-- id: integer (nullable = true)
 |-- b_data: string (nullable = true)
spark.version

res89: String = 2.4.5

Исключение, которое я получаю при выполнении запроса sql, выглядит следующим образом

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
....
....
....

Caused by: org.apache.spark.sql.AnalysisException: Can't extract value from b_data#4270: need struct type but got string; line 1 pos 10
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73)
......


Я предполагаю, что тип столбца DataFrame должен быть изменен с String на DataStruct ?? Я потерян в этой точке.

1 Ответ

0 голосов
/ 29 апреля 2020

Возможно, тип данных b_data является строкой (JSON строка), поэтому вы не можете запросить его. Вы можете прочитать строку json перед созданием временного представления.

Вы можете предоставить пользовательскую схему во время чтения или получить схему с помощью функции schema_of_json

val schema = schema_of_json(lit(df.select($"b_data").as[String].first))
val resultDF = df.withColumn("b_data_new", from_json($"b_data", schema))

resultDF.createOrReplaceTempView("tbl")

Теперь вы можете запросить как

select b_data_new.id, b_data_new.details.fn from tbl
...