Я использую Apache bahir для получения данных с сервера mqtt.
Ниже приведен мой пример кода
val sensorRawData = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerURL)
.selectExpr("CAST(payload AS STRING)")
.as[String]
val query = sensorRawData.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Вывод в консоли такой же, как показано ниже
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
| payload|
+--------------------+
|[{"timestamp":"20...|
|[{"timestamp":"20...|
|[{"timestamp":"20...|
+--------------------+
У меня здесь два вопроса
- Можем ли мы увидеть полные данные в консоли вместо усеченного
- Можно ли записать эти данные в базу данных улья, я вижу разъемы Flink, но не ульев разъемы для Bahir