У меня есть один стол улья.Я хочу создать динамические запросы SQL искры. Во время отправки искры я указываю rulename.на основе правила имя запроса следует генерировать.Во время подачи искры, я должен указать имя правила.Например:
sparks-submit <RuleName> IncorrectAge
Должен сработать мой объектный код scala:
select tablename, filter, condition from all_rules where rulename="IncorrectAge"
Моя таблица: Правила (Таблица ввода)
|---------------------------------------------------------------------------|
| rowkey| rule_name|rule_run_status| tablename |condition|filter |level|
|--------------------------------------------------------------------------|
| 1 |IncorrectAge| In_Progress | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2 | Customer_age| In_Progress | Customer_List | age<25 gender=Female|NA|
|----------------------------------------------------------------------------
Я получаю имя rulename:
select tablename, filter, condition from all_rules where rulename="IncorrectAge";
После выполнения этого запроса я получил результат, подобный следующему:
|----------------------------------------------|
|tablename | filter | condition |
|----------------------------------------------|
|VDP_Vendor_List | gender=Male | age>18 |
|----------------------------------------------|
Теперь я хочу динамически выполнять запрос sql spark
select count(*) from VDP_Vendor_List // first column --tablename
select count(*) from VDP_Vendor_List where gender=Male --tablename and filter
select * from EMP where gender=Male AND age >18 --tablename, filter, condition
Мой код-Spark 2.2 код версии:
import org.apache.spark.sql.{ Row, SparkSession }
import org.apache.log4j._
object allrules {
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local[*]")
.appName("Spark Hive")
.enableHiveSupport().getOrCreate();
import spark.implicits._
val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted hive table to json data
sampleDF.registerTempTable("sampletable")
val allrulesDF = spark.sql("SELECT * FROM sampletable")
allrulesDF.show()
val TotalCount: Long = allrulesDF.count()
println("==============> Total count ======>" + allrulesDF.count())
val df1 = allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
df1.show()
val df2= df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()
println(df2)
// var table_name = ""
// var condition =""
// var filter = "";
// df1.foreach(row=>{
// table_name = row.get(1).toString();
// condition = row.get(2).toString();
// filter = row.get(3).toString();
// })
}
}