Несколько писем на потоковом искре - PullRequest
0 голосов
/ 07 ноября 2019

В своем приложении для потоковой передачи данных я пытаюсь выполнить потоковую передачу данных из Azure EventHub и записываю их на несколько каталогов в BLOB-объекте hdfs на основе этих данных. В основном следовал по ссылке несколько writeStream с потоковой передачей

Ниже приведен код:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("com.databricks.spark.avro")
    .partitionBy("year", "month", "day")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}

writeStreamer(dtcFinalDF, "/qmctdl/DTC_CheckPoint", "/qmctdl/DTC_DATA")

val query1 = writeStreamer(canFinalDF, "/qmctdl/CAN_CheckPoint", "/qmctdl/CAN_DATA")

query1.awaitTermination()

В настоящее время я наблюдаю, что данные успешно записываются в "/ qmctdl/ CAN_DATA каталог, но данные не записываются в "/ qmctdl / DTC_DATA. Я делаю что-то здесь не так, любая помощь будет принята с благодарностью.

Ответы [ 2 ]

0 голосов
/ 07 ноября 2019

Можете ли вы попробовать это

spark.streams.awaitAnyTermination() 

Вместо

query1.awaittTermination()
0 голосов
/ 07 ноября 2019

Посмотрите на этот ответ: Выполнение отдельных потоковых запросов в потоковой структурированной потоковой передаче

Я не знаю, как работает Azure EventHub, но в основном я думаю, что один поток читает все данные идругой поток не обслуживается никакими данными.

...