Spark структурированные потоковые приемники на выходе задерживаются - PullRequest
1 голос
/ 10 июля 2019

Приведенный ниже искровой структурированный потоковый код собирает данные из Kafka каждые 10 секунд:

window($"timestamp", "10 seconds")

Я ожидал, что результаты будут выводиться на консоль каждые 10 секунд.Но я замечаю, что прием к консоли происходит каждые ~ 2 минуты или выше.Могу я узнать, что я делаю не так?

def streaming(): Unit = {
    System.setProperty("hadoop.home.dir", "/Documents/ ")
    val conf: SparkConf = new SparkConf().setAppName("Histogram").setMaster("local[8]")
    conf.set("spark.eventLog.enabled", "false");
    val sc: SparkContext = new SparkContext(conf)
    val sqlcontext = new SQLContext(sc)
    val spark = SparkSession.builder().config(conf).getOrCreate()

    import sqlcontext.implicits._
    import org.apache.spark.sql.functions.window

    val inputDf = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "wonderful")
      .option("startingOffsets", "latest")
      .load()
    import scala.concurrent.duration._

    val personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
      .withWatermark("timestamp", "500 milliseconds")
      .groupBy(
    window($"timestamp", "10 seconds")).count()

    val consoleOutput = personJsonDf.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Update())
      .start()
    consoleOutput.awaitTermination()
  }

object SparkExecutor {
  val spE: SparkExecutor = new SparkExecutor();
  def main(args: Array[String]): Unit = {
    println("test")
    spE.streaming
  }
}

1 Ответ

0 голосов
/ 17 июля 2019

Я думаю, что вы можете пропустить определение триггера для запроса personJsonDf во время операции writeStream. 2-минутный период может быть периодом по умолчанию (не уверен).

Окно groupBy, которое вы определили, будет использоваться в запросе, но оно не определяет его периодичность.

Один из способов настроить это может быть:

val consoleOutput = personJsonDf.writeStream
  .outputMode("complete")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .option("truncate", "false")
  .outputMode(OutputMode.Update())
  .start()

Наконец, класс Trigger содержит несколько полезных методов, которые вы хотите проверить.

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...