Spark Structured Streaming: консольный приемник не работает должным образом - PullRequest
0 голосов
/ 23 мая 2018

У меня есть следующий код для чтения и обработки данных Kafka с использованием структурированной потоковой передачи

object ETLTest {

  case class record(value: String, topic: String)

  def main(args: Array[String]): Unit = {
    run();
  }

  def run(): Unit = {

    val spark = SparkSession
      .builder
      .appName("Test JOB")
      .master("local[*]")
      .getOrCreate()

    val kafkaStreamingDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "...")
      .option("subscribe", "...")
      .option("failOnDataLoss", "false")
      .option("startingOffsets","earliest")
      .load()
      .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")

    val sdvWriter = new ForeachWriter[record] {
      def open(partitionId: Long, version: Long): Boolean = {
        true
      }
      def process(record: record) = {
        println("record:: " + record)
      }
      def close(errorOrNull: Throwable): Unit = {}
    }

    val sdvDF = kafkaStreamingDF
      .as[record]
      .filter($"value".isNotNull)

    // DOES NOT WORK
    /*val query = sdvDF
        .writeStream
        .format("console")
        .start()
        .awaitTermination()*/

    // WORKS
    /*val query = sdvDF
      .writeStream
      .foreach(sdvWriter)
      .start()
      .awaitTermination()
      */

  }

}

Я запускаю этот код из IntellijIdea IDE, и когда я использую foreach (sdvWriter), я вижу, что записи используютсяот Kafka, но когда я использую .writeStream.format ("console"), я не вижу никаких записей.Я предполагаю, что поток записи консоли поддерживает какую-то контрольную точку и предполагает, что он обработал все записи.Это тот случай?Я что-то упускаю здесь очевидное?

1 Ответ

0 голосов
/ 19 августа 2019

воспроизвел ваш код здесь оба варианта сработали.на самом деле в обоих вариантах без
import spark.implicits._ произойдет сбой, поэтому я не уверен, что вам не хватает.может быть некоторые зависимости настроены не правильно.Вы можете добавить pom.xml?

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.Trigger



object Check {

  case class record(value: String, topic: String)


  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder().master("local[2]")
      .getOrCreate


    import spark.implicits._

    val kafkaStreamingDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets","earliest")
      .option("failOnDataLoss", "false")
      .load()
      .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


    val sdvDF = kafkaStreamingDF
      .as[record]
      .filter($"value".isNotNull)

    val query = sdvDF.writeStream
          .format("console")
          .option("truncate","false")
          .start()
          .awaitTermination()

  }


}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...