В своем приложении для потоковой передачи данных я пытаюсь выполнить потоковую передачу данных из Azure EventHub и записываю их на несколько каталогов в BLOB-объекте hdfs на основе этих данных. В основном следовал по ссылке несколько writeStream с потоковой передачей
Ниже приведен код:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.writeStream
.format("com.databricks.spark.avro")
.partitionBy("year", "month", "day")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
}
writeStreamer(dtcFinalDF, "/qmctdl/DTC_CheckPoint", "/qmctdl/DTC_DATA")
val query1 = writeStreamer(canFinalDF, "/qmctdl/CAN_CheckPoint", "/qmctdl/CAN_DATA")
query1.awaitTermination()
В настоящее время я наблюдаю, что данные успешно записываются в "/ qmctdl/ CAN_DATA каталог, но данные не записываются в "/ qmctdl / DTC_DATA. Я делаю что-то здесь не так, любая помощь будет принята с благодарностью.