вставка нескольких RDD / dataframes в глобальное представление - PullRequest
0 голосов
/ 03 мая 2018

Я использую Streaming для получения записей о звонках от брокера Kakfa каждые 10 минут. Я хочу вставить эти записи в какую-нибудь соблазнительную (глобальную?) И продолжать вставлять, как только получу от Kakfa.

Обратите внимание, что Я не хочу хранить в улье . После каждой вставки я хочу проверить, превысило ли количество звонков по определенному номеру 20 (например). Ниже код, который я написал, который конвертирует каждые rdd в df, а затем создает временное представление. Тем не менее, я думаю, что представление будет содержать только последние RDD. Как продолжать вставлять записи в том же виде и запустить sql позже?

val topics = Array("AIRDRMAIN", "")
val messages = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
val Lines = messages.map(line => line.value())
val AirDRStream = Lines.map(AirDRFilter.parseAirDR)
AirDRStream.foreachRDD(foreachFunc = rdd => {
  System.out.println("--- New RDD with " + rdd.count() + " records");
  if (rdd.count() == 0) {
     println("---WANG No logs received in this time interval=================")
  } else {
     val sqlContext = SparkSession
       .builder()
       .appName("Spark SQL basic example")
       .getOrCreate()
   import sqlContext.implicits._
   rdd.toDF().createOrReplaceTempView("AIR")
   val FilteredDR = sqlContext.sql("select refillProfileID, count(*) from AIR group by refillProfileID")
   FilteredDR.show()
   }
  })
  streamingContext.start()
  streamingContext.awaitTermination()

Ниже обновляется код после добавления логики globalTempView.

val schema_string = "subscriberNumber, originNodeType, originHostName, originOperatorID, originTimeStamp, currentServiceClass, voucherBasedRefill, transactionAmount, refillProfileID, voucherGroupID, externalData1, externalData2"
val schema_rdd = StructType(schema_string.split(",")
                 .map(fieldName => StructField(fieldName, StringType, true)))
val init_df = sqlContext.createDataFrame(sc.emptyRDD[Row], schema_rdd)

println("initial count of initial RDD is " + init_df.count())

init_df.createGlobalTempView("AIRGLOBAL")

AirDRStream.foreachRDD(foreachFunc = rdd => {
  System.out.println("--- New RDD with " + rdd.count() + " records");
  if (rdd.count() == 0) {
    println("--- No logs received in this time interval=================")
  } else {
    init_df.union(rdd.toDF())
    println("after union count of initial  RDD is " + init_df.count())
    rdd.toDF().createOrReplaceTempView("AIR")
    val FilteredDR = sqlContext.sql("select  count(*) from AIR ")
    val globalviewinsert = sqlContext.sql("Insert into global_temp.AIRGLOBAL select * from AIR ")
    val globalview = sqlContext.sql("SELECT COUNT(*) FROM  global_temp.AIRGLOBAL ")
    FilteredDR.show()
    globalviewinsert.show()
    globalview.show()
  }
})
streamingContext.start()
streamingContext.awaitTermination()

1 Ответ

0 голосов
/ 03 мая 2018

Вы можете создать глобальное временное представление. Цитирование из документации

Временные представления в Spark SQL имеют сессионную область и исчезнут, если сеанс, который создает его, заканчивается. Если вы хотите иметь временное представление, которое используется всеми сеансами и сохраняется до приложение Spark завершает работу, вы можете создать глобальный временный Посмотреть. Глобальное временное представление привязано к сохраненной в системе базе данных global_temp, и мы должны использовать полное имя для ссылки на него, например, SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
...