Spark Structure потоковой чтения данных дважды за каждую микропакет. Как избежать - PullRequest
0 голосов
/ 10 апреля 2020

У меня очень странная проблема с потоковой структурой искры. Потоковая структура Spark создает две искровые работы для каждой микро-партии. В результате прочитайте данные из Кафки дважды. Вот простой фрагмент кода.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object CheckHowSparkReadFromKafka {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .config(new SparkConf()
        .setAppName(s"simple read from kafka with repartition")
        .setMaster("local[*]")
        .set("spark.driver.host", "localhost"))
      .getOrCreate()
    val testPath = "/tmp/spark-test"
    FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
    import session.implicits._
    val stream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",        "kafka-20002-prod:9092")
      .option("subscribe", "topic")
      .option("maxOffsetsPerTrigger", 1000)
      .option("failOnDataLoss", false)
      .option("startingOffsets", "latest")
      .load()
      .repartitionByRange( $"offset")
      .writeStream
      .option("path", testPath + "/data")
      .option("checkpointLocation", testPath + "/checkpoint")
      .format("parquet")
      .trigger(Trigger.ProcessingTime(10.seconds))
      .start()
    stream.processAllAvailable()

Это происходит потому, что если .repartitionByRange( $"offset"), если я уберу эту строку, все хорошо. Но с помощью spark создайте два задания, одно с 1-й стадией, только что прочитанное с Кафки, второе с 3-й стадией: чтение -> перемешивание -> запись Таким образом, результат первого задания никогда не использовался.

Это существенно влияет на производительность. В некоторых моих темах по Кафке 1550 разделов, поэтому читать их дважды - большое дело. Если я добавлю кеш, дела пойдут лучше, но это не для меня. В локальном режиме первое задание в пакетном режиме занимает менее 0,1 мс, кроме пакета с индексом 0. Но в кластере YARN и Messos оба задания полностью ожидаемы и по моим темам занимают около 1,2 мин.

Почему это происходит? Как я могу избежать этого? Похоже, ошибка?

PS Я использую свечи 2.4.3.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...