Я получаю предупреждения и ошибки при запуске структурированной потоковой передачи на кластере K8s. Вот часть моих кодов:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
val query = df.writeStream
.outputMode("append")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val rstDF = batchDF.select($"value")
.map(row => valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString)
.transform(runner.spark.read.json)
.transform(trimDF)
println(s"Batch $batchId: ${rstDF.count} rows")
rstDF.show(false)
})
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
query.awaitTermination()
Первая партия (партия 0) в порядке. Однако, когда данные поступили для пакета 1, я получил предупреждения о задаче, потерянной из-за java.lang.NullPointerException.
...
19/10/12 02:02:18 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
...
19/10/12 02:02:18 INFO DAGScheduler: ResultStage 2 (start at MergeKafkaToDelta.scala:124) failed in 17.980 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
19/10/12 02:02:18 ERROR MicroBatchExecution: Query [id = e1f15e44-ad17-452d-97cf-def26f729f38, runId = c0b7c2ba-fca4-4538-8095-cbe2daeec525] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
Вы знаете его коренную причину? Как я могу настроить конфигурации и параметры для spark-submit? У меня есть блог, связанный с ним: Контрольная точка Spark Streaming на Kubernetes
Однако она основана на конкретной облачной платформе. :-( Знаете ли вы общие решения?