Это мой способ достижения этого.Для всего этого решения я использовал спарк-оболочку.Вот некоторые предварительные условия:
Загрузите эту банку с json-serde
Извлеките ZIP-файл в любую папку1011 *
Теперь запустите spark-shell с помощью этой команды
spark-shell --jars path/to/jars/json-serde-cdh5-shim-1.3.7.3.jar,path/to/jars/json-serde-1.3.7.3.jar,path/to/jars/json-1.3.7.3.jar
Ваш документ Json:
{
"cust": "Retails",
"tables": [
{
"Name":"customer",
"table_NAME":"cust",
"param1":"cust_id",
"val":"112",
"op":"cust_name"
},
{
"Name":"sales",
"table_NAME":"sale",
"param1":"country",
"val":"ind",
"op":"monthly_sale"
}]
}
Свернутая версия:
{"cust": "Retails","tables":[{"Name":"customer","table_NAME":"cust","param1":"cust_id","val":"112","op":"cust_name"},{"Name":"sales","table_NAME":"sale","param1":"country","val":"ind","op":"monthly_sale"}]}
Я поместил этот JSON в этот / tmp / sample.json
Теперь собираюсь spark-sql part:
Создание таблицы на основе схемы json
sql("CREATE TABLE json_table(cust string,tables array<struct<Name: string,table_NAME:string,param1:string,val:string,op:string>>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'")
Теперь загрузите данные json втаблица
sql("LOAD DATA LOCAL INPATH '/tmp/sample.json' OVERWRITE INTO TABLE json_table")
Теперь я буду использовать концепцию бокового обзора улья Вид сбоку
val ans=sql("SELECT myCol FROM json_table LATERAL VIEW explode(tables) myTable as myCol").collect
Схемавозвращенного результата:
ans.printSchema
root
|-- table: struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- table_NAME: string (nullable = true)
| |-- param1: string (nullable = true)
| |-- val: string (nullable = true)
| |-- op: string (nullable = true)
Результат ans.show
ans.show
+--------------------+
| table|
+--------------------+
|[customer,cust,cu...|
|[sales,sale,count...|
+--------------------+
Теперь я предполагаю, что может быть два типанапример, cust_id имеет число типа и страна имеет StrТип .Я добавляю метод для определения типа данных на основе их значения.например,
def isAllDigits(x: String) = x forall Character.isDigit
Примечание: Вы можете использовать свой собственный способ идентификации этого
7.Now создание запроса на основе jsonданные
ans.foreach(f=>{
val splitted_string=f.toString.split(",")
val op=splitted_string(4).substring(0,splitted_string(4).size-2)
val table_NAME=splitted_string(1)
val param1 = splitted_string(2)
val value = splitted_string(3)
if(isAllDigits(value)){
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"="+value)
}else{
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"='"+value+"'")
}
})
Вот результат, который я получил:
SELECT cust_name FROM cust WHERE cust_id=112
SELECT monthly_sale FROM sale WHERE country='ind'