Искра версия 2.3.3
Я хочу подключиться к mqtt, используя аргумент --packages в spark-shell
bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2
scala> import java.sql.Timestamp
scala> import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder.appName("MQTTStreamWordCount").getOrCreate()
scala> val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "mytopic").load("tcp://localhost:1883")
Однако ошибка по-прежнему возникает.
java.lang.ClassNotFoundException: Failed to find data source: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
... 49 elided
Caused by: java.lang.ClassNotFoundException: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:622)
... 51 more
Я не знаю, в чем проблема. Я хочу использовать потоковую передачу с использованием mqtt, есть ли другой способ сделать это с помощью bahir?
спасибо,