Я использую Streaming для получения записей о звонках от брокера Kakfa каждые 10 минут. Я хочу вставить эти записи в какую-нибудь соблазнительную (глобальную?) И продолжать вставлять, как только получу от Kakfa.
Обратите внимание, что Я не хочу хранить в улье . После каждой вставки я хочу проверить, превысило ли количество звонков по определенному номеру 20 (например). Ниже код, который я написал, который конвертирует каждые rdd
в df
, а затем создает временное представление. Тем не менее, я думаю, что представление будет содержать только последние RDD
. Как продолжать вставлять записи в том же виде и запустить sql позже?
val topics = Array("AIRDRMAIN", "")
val messages = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val Lines = messages.map(line => line.value())
val AirDRStream = Lines.map(AirDRFilter.parseAirDR)
AirDRStream.foreachRDD(foreachFunc = rdd => {
System.out.println("--- New RDD with " + rdd.count() + " records");
if (rdd.count() == 0) {
println("---WANG No logs received in this time interval=================")
} else {
val sqlContext = SparkSession
.builder()
.appName("Spark SQL basic example")
.getOrCreate()
import sqlContext.implicits._
rdd.toDF().createOrReplaceTempView("AIR")
val FilteredDR = sqlContext.sql("select refillProfileID, count(*) from AIR group by refillProfileID")
FilteredDR.show()
}
})
streamingContext.start()
streamingContext.awaitTermination()
Ниже обновляется код после добавления логики globalTempView.
val schema_string = "subscriberNumber, originNodeType, originHostName, originOperatorID, originTimeStamp, currentServiceClass, voucherBasedRefill, transactionAmount, refillProfileID, voucherGroupID, externalData1, externalData2"
val schema_rdd = StructType(schema_string.split(",")
.map(fieldName => StructField(fieldName, StringType, true)))
val init_df = sqlContext.createDataFrame(sc.emptyRDD[Row], schema_rdd)
println("initial count of initial RDD is " + init_df.count())
init_df.createGlobalTempView("AIRGLOBAL")
AirDRStream.foreachRDD(foreachFunc = rdd => {
System.out.println("--- New RDD with " + rdd.count() + " records");
if (rdd.count() == 0) {
println("--- No logs received in this time interval=================")
} else {
init_df.union(rdd.toDF())
println("after union count of initial RDD is " + init_df.count())
rdd.toDF().createOrReplaceTempView("AIR")
val FilteredDR = sqlContext.sql("select count(*) from AIR ")
val globalviewinsert = sqlContext.sql("Insert into global_temp.AIRGLOBAL select * from AIR ")
val globalview = sqlContext.sql("SELECT COUNT(*) FROM global_temp.AIRGLOBAL ")
FilteredDR.show()
globalviewinsert.show()
globalview.show()
}
})
streamingContext.start()
streamingContext.awaitTermination()