Мне нужна помощь в подключении моего искрового кластера HDInsight на Azure с Eventhubs для структурированной потоковой передачи с помощью Java кода.
Я уже неделю отлаживаю свой код безуспешно.
Вот как я написал свой потоковый запрос для концентратора событий.
Dataset<Row> df= spark.readStream()
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
//.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "false")
.load();
Это мои зависимости POM-файла
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql\_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10\_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10\_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>spark-streaming-eventhubs\_2.11</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark\_2.11</artifactId>
<version>2.3.13</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
Я пытался отправить файл Jar в хранилище BLOB-объектов и начал работу с консоли.
Но каждый раз, когда я запускаю это, я получаю следующую ошибку:
User class threw exception: org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at sample.KafkaConnect.main(KafkaConnect.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Ребята, пожалуйста, помогите мне. Я могу запускать обычные программы в кластере, но я сталкиваюсь с этой проблемой, когда пытаюсь интегрировать концентраторы событий.
Большое спасибо