Динамическая подготовка и выполнение запросов в спарк - PullRequest
0 голосов
/ 27 ноября 2018

В Spark этот json находится в фрейме данных (DF), теперь нам нужно перейти к таблицам (в json на основе cust), нам нужно прочитать первый блок таблиц и подготовить SQL-запрос.Например: SELECT CUST_NAME FROM CUST WHERE CUST_ID =112

мы должны выполнить этот запрос в базе данных и сохранить его, чтобы получить файл json.

{
     "cust": "Retails",
     "tables": [
        {
             "Name":"customer",
             "table_NAME":"cust",
             "param1":"cust_id",  
             "val":"112",
             "op":"cust_name"
        },
        {
             "Name":"sales",
             "table_NAME":"sale",
             "param1":"country",  
             "val":"ind",
             "op":"monthly_sale"
         }]
}

 root |-- cust: string (nullable = true) 
      |-- tables: array (nullable = true) 
      | |-- element: struct (containsNull = true) 
      | | |-- Name: string (nullable = true) 
      | | |-- op: string (nullable = true) 
      | | |-- param1: string (nullable = true) 
      | | |-- table_NAME: string (nullable = true) 
      | | |-- val: string (nullable = true) 

то же самое для второго блока таблиц.Пример: SELECT MONTHLY_SALE FROM SALE WHERE COUNTRY = 'IND'

должен выполнить этот запрос в БД и сохранить этот результат также в вышеуказанном файле json.

Каков наилучший подход для этого?есть идеи?

1 Ответ

0 голосов
/ 27 ноября 2018

Это мой способ достижения этого.Для всего этого решения я использовал спарк-оболочку.Вот некоторые предварительные условия:

  1. Загрузите эту банку с json-serde

  2. Извлеките ZIP-файл в любую папку1011 *

  3. Теперь запустите spark-shell с помощью этой команды

    spark-shell --jars path/to/jars/json-serde-cdh5-shim-1.3.7.3.jar,path/to/jars/json-serde-1.3.7.3.jar,path/to/jars/json-1.3.7.3.jar
    

Ваш документ Json:

{
 "cust": "Retails",
 "tables": [
    {
         "Name":"customer",
         "table_NAME":"cust",
         "param1":"cust_id",  
         "val":"112",
         "op":"cust_name"
    },
    {
         "Name":"sales",
         "table_NAME":"sale",
         "param1":"country",  
         "val":"ind",
         "op":"monthly_sale"
     }]
}

Свернутая версия:

{"cust": "Retails","tables":[{"Name":"customer","table_NAME":"cust","param1":"cust_id","val":"112","op":"cust_name"},{"Name":"sales","table_NAME":"sale","param1":"country","val":"ind","op":"monthly_sale"}]}

Я поместил этот JSON в этот / tmp / sample.json

Теперь собираюсь spark-sql part:

  1. Создание таблицы на основе схемы json

    sql("CREATE TABLE json_table(cust string,tables array<struct<Name: string,table_NAME:string,param1:string,val:string,op:string>>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'")
    
  2. Теперь загрузите данные json втаблица

    sql("LOAD DATA LOCAL INPATH  '/tmp/sample.json' OVERWRITE INTO TABLE json_table")
    
  3. Теперь я буду использовать концепцию бокового обзора улья Вид сбоку

    val ans=sql("SELECT myCol FROM json_table LATERAL VIEW explode(tables) myTable as myCol").collect
    
  4. Схемавозвращенного результата:

        ans.printSchema
        root
         |-- table: struct (nullable = true)
         |    |-- Name: string (nullable = true)
         |    |-- table_NAME: string (nullable = true)
         |    |-- param1: string (nullable = true)
         |    |-- val: string (nullable = true)
         |    |-- op: string (nullable = true)
    
  5. Результат ans.show

         ans.show
         +--------------------+
         |               table|
         +--------------------+
         |[customer,cust,cu...|
         |[sales,sale,count...|
         +--------------------+
    
  6. Теперь я предполагаю, что может быть два типанапример, cust_id имеет число типа и страна имеет StrТип .Я добавляю метод для определения типа данных на основе их значения.например,

    def isAllDigits(x: String) = x forall Character.isDigit
    

    Примечание: Вы можете использовать свой собственный способ идентификации этого

7.Now создание запроса на основе jsonданные

    ans.foreach(f=>{
val splitted_string=f.toString.split(",")
val op=splitted_string(4).substring(0,splitted_string(4).size-2)
val table_NAME=splitted_string(1)
val param1 = splitted_string(2)
val value = splitted_string(3)
if(isAllDigits(value)){
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"="+value)
}else{
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"='"+value+"'")
}
})

Вот результат, который я получил:

SELECT cust_name FROM cust WHERE cust_id=112
SELECT monthly_sale FROM sale WHERE country='ind'
...