Spark Streaming: перезапуск приложения приводит к тому, что файлы не сохраняются в существующий каталог - PullRequest
0 голосов
/ 28 октября 2019

У меня возникла проблема, когда при повторном запуске задания потоковой передачи, если выходной каталог уже существует, он не будет сохранять сообщения, отправленные в тему. Вот мой код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{functions => f}

object saveData {

  // security constants
  val JAAS_PATH = "./file.jaas"

  def main(args: Array[String]): Unit = {
    // set java properties for kerberos
    System.setProperty("java.security.auth.login.config", JAAS_PATH)

    val spark = createSparkSession("Save Power Flow")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "server")
      .option("kafka.security.protocol", "SASL_PLAINTEXT")
      .option("kafka.sasl.kerberos.service.name", "kafka")
      .option("subscribe", "topic")
      .load()

    var query = df.select(f.col("key").as("key").cast("string"), f.col("value").cast("string"))

    query.writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", "hdfs/path")
      .partitionBy("col1", "col2")
      .option("checkpointLocation", "hdfs/checkpoint/path")
      .start()
      .awaitTermination()
    spark.stop()
  }

  def createSparkSession(name: String): SparkSession = {
    val spark: SparkSession = SparkSession
      .builder
      .appName("name")
      .config("spark.executor.instances", "10")
      .config("spark.executor.memory", "2G")
      .config("spark.executor.cores", "1")
      .config("spark.driver.memory", "4G")
      .config("spark.driver.cores", "4")
      .config("spark.autoBroadcastJoinThreshold", "-1")
      .config("spark.yarn.executor.memoryOverhead", "3072")
      .config("spark.yarn.driver.memoryOverhead", "3072")
      .config("spark.default.parallelism", "200")
      .config("spark.executor.heartbeatInterval", "100000000")
      .config("spark.network.timeout", "100000000")
      .config("spark.port.maxRetries", "100")
      .config("spark.dynamicAllocation.maxExecutors", "10")
      .config("spark.port.maxRetries", "80")
      .config("spark.sql.shuffle.partitions", "2270")
      .config("spark.driver.extraJavaOptions", s"-Djava.security.auth.login.config=$JAAS_PATH")
      .config("spark.executor.extraJavaOptions", s"-Djava.security.auth.login.config=$JAAS_PATH")
      .config("spark.streaming.kafka.consumer.cache.enabled", "false")
      .enableHiveSupport()
      .getOrCreate()
    return spark
  }

}

и искры отправки

hdfs dfs -rm -r hdfs/checkpoint/path

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --files file.jaas,hive-site.xml,keytab.keytab \
  --driver-java-options "-Djava.security.auth.login.config=./key.conf" \
  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf" \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
  job.jar

Если я удаляю каталог, все работает нормально, но я хотел бы, чтобы задание просто добавлялось к тому, что ужесуществует.

Что мне не хватает, что вызывает эту проблему?

...