Не удалось найти источник данных: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider - PullRequest
0 голосов
/ 22 марта 2019

Искра версия 2.3.3 Я хочу подключиться к mqtt, используя аргумент --packages в spark-shell

bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2

scala> import java.sql.Timestamp
scala> import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder.appName("MQTTStreamWordCount").getOrCreate()
scala> val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "mytopic").load("tcp://localhost:1883")

Однако ошибка по-прежнему возникает.

java.lang.ClassNotFoundException: Failed to find data source: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:622)
  ... 51 more

Я не знаю, в чем проблема. Я хочу использовать потоковую передачу с использованием mqtt, есть ли другой способ сделать это с помощью bahir?

спасибо,

...