У меня возникла проблема, когда при повторном запуске задания потоковой передачи, если выходной каталог уже существует, он не будет сохранять сообщения, отправленные в тему. Вот мой код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{functions => f}
object saveData {
// security constants
val JAAS_PATH = "./file.jaas"
def main(args: Array[String]): Unit = {
// set java properties for kerberos
System.setProperty("java.security.auth.login.config", JAAS_PATH)
val spark = createSparkSession("Save Power Flow")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("subscribe", "topic")
.load()
var query = df.select(f.col("key").as("key").cast("string"), f.col("value").cast("string"))
query.writeStream
.format("parquet")
.outputMode("append")
.option("path", "hdfs/path")
.partitionBy("col1", "col2")
.option("checkpointLocation", "hdfs/checkpoint/path")
.start()
.awaitTermination()
spark.stop()
}
def createSparkSession(name: String): SparkSession = {
val spark: SparkSession = SparkSession
.builder
.appName("name")
.config("spark.executor.instances", "10")
.config("spark.executor.memory", "2G")
.config("spark.executor.cores", "1")
.config("spark.driver.memory", "4G")
.config("spark.driver.cores", "4")
.config("spark.autoBroadcastJoinThreshold", "-1")
.config("spark.yarn.executor.memoryOverhead", "3072")
.config("spark.yarn.driver.memoryOverhead", "3072")
.config("spark.default.parallelism", "200")
.config("spark.executor.heartbeatInterval", "100000000")
.config("spark.network.timeout", "100000000")
.config("spark.port.maxRetries", "100")
.config("spark.dynamicAllocation.maxExecutors", "10")
.config("spark.port.maxRetries", "80")
.config("spark.sql.shuffle.partitions", "2270")
.config("spark.driver.extraJavaOptions", s"-Djava.security.auth.login.config=$JAAS_PATH")
.config("spark.executor.extraJavaOptions", s"-Djava.security.auth.login.config=$JAAS_PATH")
.config("spark.streaming.kafka.consumer.cache.enabled", "false")
.enableHiveSupport()
.getOrCreate()
return spark
}
}
и искры отправки
hdfs dfs -rm -r hdfs/checkpoint/path
spark-submit \
--master yarn \
--deploy-mode cluster \
--files file.jaas,hive-site.xml,keytab.keytab \
--driver-java-options "-Djava.security.auth.login.config=./key.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
job.jar
Если я удаляю каталог, все работает нормально, но я хотел бы, чтобы задание просто добавлялось к тому, что ужесуществует.
Что мне не хватает, что вызывает эту проблему?