spark sql - как написать динамический запрос в spark sql - PullRequest
0 голосов
/ 08 октября 2018

У меня есть один стол улья.Я хочу создать динамические запросы SQL искры. Во время отправки искры я указываю rulename.на основе правила имя запроса следует генерировать.Во время подачи искры, я должен указать имя правила.Например:

sparks-submit  <RuleName> IncorrectAge 

Должен сработать мой объектный код scala:

select tablename, filter, condition from all_rules where rulename="IncorrectAge"

Моя таблица: Правила (Таблица ввода)

|---------------------------------------------------------------------------|
| rowkey|  rule_name|rule_run_status| tablename     |condition|filter |level|
|--------------------------------------------------------------------------|
| 1    |IncorrectAge| In_Progress  | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2    | Customer_age| In_Progress  | Customer_List | age<25 gender=Female|NA| 
|----------------------------------------------------------------------------

Я получаю имя rulename:

 select tablename, filter, condition from all_rules where rulename="IncorrectAge";

После выполнения этого запроса я получил результат, подобный следующему:

   |----------------------------------------------|
   |tablename        | filter         | condition |
   |----------------------------------------------|
   |VDP_Vendor_List  | gender=Male     | age>18   |
   |----------------------------------------------|

Теперь я хочу динамически выполнять запрос sql spark

select count(*) from VDP_Vendor_List  // first column --tablename     
       select count(*) from VDP_Vendor_List where gender=Male  --tablename and filter
        select * from EMP where gender=Male  AND  age >18       --tablename, filter, condition

Мой код-Spark 2.2 код версии:

         import org.apache.spark.sql.{ Row, SparkSession }
         import org.apache.log4j._

object allrules {
  def main(args: Array[String]) {      
    val spark = SparkSession.builder().master("local[*]")
      .appName("Spark Hive")
      .enableHiveSupport().getOrCreate();

    import spark.implicits._
    val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted  hive table to json data
 sampleDF.registerTempTable("sampletable")
 val allrulesDF = spark.sql("SELECT * FROM sampletable")

  allrulesDF.show()
  val TotalCount: Long = allrulesDF.count()
  println("==============>  Total count ======>" + allrulesDF.count())

  val df1 =  allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
 df1.show()
 val df2=   df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()           
    println(df2)



//                             var table_name = ""
//                             var condition =""
   //                              var filter = "";
  //              df1.foreach(row=>{    
  //                                   table_name = row.get(1).toString();
  //                                   condition = row.get(2).toString();
  //                                   filter = row.get(3).toString();                             
  //                              })

   }
 }

Ответы [ 2 ]

0 голосов
/ 02 января 2019

Вы можете передать свой аргумент вашему классу драйвера следующим образом:

    object DriverClass
    {
       val log = Logger.getLogger(getClass.getName)
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
          if (args == null || args.isEmpty || args.length != 2) {
                log.error("Invalid number of arguments passed.")
                log.error("Arguments Usage: <Rule Name> <Rule Type>)
                log.error("Stopping the flow")
                System.exit(1)
            }
         import spark.implicits._
         val ruleName: String = String.valueOf(args(0).trim())
         val ruleType: String = String.valueOf(args(1).trim())
         val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
         val metadataDF=spark.sql(processSQL)
         val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
    val finalSql_1="select count(*) from $tblnm"  // first column    
    val finalSql_2="select count(*) from $tblnm" where $fltr"
    val finalSql_3="select * from EMP where $fltr  AND  $cndtn"
    spark.sql(finalSql_1).show()
    spark.sql(finalSql_2).show()
    spark.sql(finalSql_3).show()
    }
}
0 голосов
/ 08 октября 2018

Вы можете передать аргументы из spark-submit в ваше приложение:

bin/spark-submit --class allrules something.jar tablename filter condition

, тогда в основной функции у вас будут параметры:

def main(args: Array[String]) : Unit = {

   // args(0), args(1) ... there are your params

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...