Вы можете увидеть следующий запрос извлечения в apache Bahir: Запрос притяжения Bahir
Где вы можете видеть MQTTUtils.createPairedStream добавление.
Вы импортируете в свой pom / gradle / sbt ... используя следующий артефакт:
spark-sql-streaming-mqtt_2.11 версия 2.3.2 из группы org.apache.bahir .
До Spark 1.6 вы могли использовать в Maven:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-mqtt -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_2.11</artifactId>
<version>1.6.3</version>
</dependency>
Для Spark 2.3.2 вам необходимо использовать:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-mqtt_2.11</artifactId>
<version>2.3.2</version>
</dependency>
или в SBT:
libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.3.2"
Дополнительную информацию можно найти по адресу: org.apache.bahir: spark-streaming-mqtt
bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.3.0
Вы импортируете пакет, используя scala:
import org.apache.spark.streaming.mqtt._
и создать экземпляр:
val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
Надеюсь, это поможет.