Почему потоковый запрос не записывает данные в HDFS? - PullRequest
0 голосов
/ 12 декабря 2018

Я использую Spark Structured Streaming с Spark 2.3.1 и ниже мой код:

val sparkSession = SparkSession
.builder
.appName("xxx")
.config("spark.serializer", 
  "org.apache.spark.serializer.KryoSerializer")
.config("spark.rpc.netty.dispatcher.numThreads", "2")
.config("spark.shuffle.compress", "true")
.config("spark.rdd.compress", "true")
.config("spark.sql.inMemoryColumnarStorage.compressed", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.broadcast.compress", "true")
.config("spark.sql.hive.thriftServer.singleSession", "true")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.streaming.receiver.writeAheadLog.enable","true")
.enableHiveSupport()
.getOrCreate()

val rawStreamDF = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <value>)
.option("subscribe", <value>)
.option("key.serializer", <value>)
.option("value.serializer", <value>)
.option("startingOffsets", "earliest")
.option("auto.offset.reset",earliest)
.option("group.id",  <value>)
.option("fetchOffset.numRetries", 3)
.option("fetchOffset.retryIntervalMs", 10)
.option("IncludeTimestamp", true)
.option("enable.auto.commit",  <value>)
.option("security.protocol",  <value>)
.option("ssl.keystore.location",  <value>)
.option("ssl.keystore.password",  <value>)
.option("ssl.truststore.location",  <value>)
.option("ssl.truststore.password",  <value>)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

Я пытаюсь записать данные в файл в hdfs_path:

val query = rawStreamDF
  .writeStream
  .format("json")
  .option("startingOffsets", "latest")
  .option("path", "STREAM_DATA_PATH")
  .option("checkpointLocation", "checkpointPath")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .start

Logger.log.info("Status:"+query.status)
print("Streaming Status1:"+query.status)

query.awaitTermination(450)

Но я получаю query.status значение, как показано ниже:

Status:{ "message" : "Initializing sources", "isDataAvailable" : false, "isTriggerActive" : false }

Не могли бы вы дать мне знать, где я иду не так?

1 Ответ

0 голосов
/ 24 декабря 2018

Но я получаю значение query.status, как показано ниже.

Status:{ "message" : "Initializing sources", "isDataAvailable" :false, "isTriggerActive" : false }

Не могли бы вы дать мне знать, где я ошибаюсь?

Всекажется в порядке.Механизм потоковой передачи Spark Structured Streaming, похоже, еще не запускал запрос, а просто пометил его как запускаемый в отдельном потоке.

Если вы создали отдельный поток для мониторинга структурированного запроса, вы 'd обратите внимание, что статус изменится сразу после обработки самой первой партии.

Обратитесь к официальной документации в Руководство по программированию структурированной потоковой передачи .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...