Как я могу непрерывно передавать данные из kafka с использованием потоковой структурированной искры? - PullRequest
0 голосов
/ 15 апреля 2019

Я пытаюсь перенести мои API DStream в затрудненную потоковую передачу и сталкиваюсь с тем, как ожидать или не в состоянии соотнести микробатирование со структурированной потоковой передачей.

В приведенном ниже коде я создаю прямой поток и ожидаю вечно, чтобы я мог бесконечно потреблять сообщения kafka.

Как мне добиться того же в структурированной потоковой передаче?

хватит ли sparkSession.streams.awaitAnyTermination?

Я поместил пример кода ниже как в потоковую, так и в структурированную потоковую передачу. Любые указатели были бы очень полезны. Спасибо

val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer], 
        "value.deserializer" -> classOf[StringDeserializer],
        "auto.offset.reset" -> "latest",
        "max.poll.records" -> "1",
        "group.id" -> "test",
        "enable.auto.commit" -> (true: java.lang.Boolean))
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(10))
      val stream = KafkaUtils.createDirectStream[String, String](ssc,  PreferConsistent,Subscribe[String, String]("mytopic",kafkaParams))

performRddComputation(stream, sparkSession)

 ssc.start()
 ssc.awaitTermination()

Структурированный потоковый эквивалент

val df = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("kafkfa.offset.strategy","latest")
      .option("subscribe", "mytopic")
      .load()
      df.printSchema()

      val tdf = df.selectExpr("CAST(value AS STRING)").as[String].select("value").writeStream.format("console")
    .option("truncate","false")
    .start()


    tdf.map(record =>  {//do something})

      sparkSession.streams.awaitAnyTermination

Ответы [ 2 ]

1 голос
/ 18 апреля 2019

Если у вас есть только один запрос, просто используйте awaitTermination для запроса:

val df = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("kafkfa.offset.strategy","latest")
      .option("subscribe", "mytopic")
      .load()
      df.printSchema()

val tdf = df.selectExpr("CAST(value AS STRING)").as[String]
    .select("value")
    .map(record =>  {//do something})
    .writeStream
    .format("console")
    .option("truncate","false")
    .start()

// do something

tdf.awaitTermination()

awaitTermination является блокирующим вызовом, поэтому все, что вы напишете после этого, будет вызываться только после завершения запроса.

Если вам нужно обработать более одного запроса, вы можете использовать awaitAnyTermination на StreamingQueryManager:

sparkSession.streams.awaitAnyTermination()

и если вы хотите, чтобы приложение работало даже в случае сбоя одного из запросов, вызовите awaitAnyTermination(), а затем resetTerminated() в цикле.

1 голос
/ 15 апреля 2019

Я опубликую версию, которая работает со мной:

val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafkfa.offset.strategy","latest")
  .option("subscribe", "mytopic")
  .load()
  //df.printSchema()

  val tdf = df.selectExpr("CAST(value AS STRING)")
    .select("value")
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate","false")
    .start()
  tdf.awaitAnyTermination()

Она должна работать для вас

...