Подключение Azure HDInsight Spark кластера и Eventhubs с протоколом Kafka - PullRequest
0 голосов
/ 25 апреля 2020

Мне нужна помощь в подключении моего искрового кластера HDInsight на Azure с Eventhubs для структурированной потоковой передачи с помощью Java кода.

Я уже неделю отлаживаю свой код безуспешно.

Вот как я написал свой потоковый запрос для концентратора событий.

Dataset<Row> df= spark.readStream()
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
//.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "false")
.load();

Это мои зависимости POM-файла

<dependency> 
<groupId>org.apache.spark</groupId> 
<artifactId>spark-sql\_2.11</artifactId> 
<version>2.4.0</version> 
</dependency> 

<dependency> 
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10\_2.11</artifactId> 
<version>2.4.0</version> 
</dependency> 

<dependency> 
<groupId>org.apache.spark</groupId> 
<artifactId>spark-sql-kafka-0-10\_2.11</artifactId> 
<version>2.4.0</version> 
</dependency> 

<dependency> 
<groupId>com.microsoft.azure</groupId> 
<artifactId>spark-streaming-eventhubs\_2.11</artifactId> 
<version>2.0.4</version> 
</dependency> 

<dependency> 
<groupId>com.azure</groupId> 
<artifactId>azure-messaging-eventhubs</artifactId> 
<version>5.0.1</version> 
</dependency> 

<dependency> 
<groupId>com.microsoft.azure</groupId> 
<artifactId>azure-eventhubs-spark\_2.11</artifactId> 
<version>2.3.13</version> 
</dependency> 

<dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka-clients</artifactId> 
<version>2.4.0</version> 
</dependency>

Я пытался отправить файл Jar в хранилище BLOB-объектов и начал работу с консоли.

Но каждый раз, когда я запускаю это, я получаю следующую ошибку:

User class threw exception: org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)

at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)

at sample.KafkaConnect.main(KafkaConnect.java:51)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)

Ребята, пожалуйста, помогите мне. Я могу запускать обычные программы в кластере, но я сталкиваюсь с этой проблемой, когда пытаюсь интегрировать концентраторы событий.

Большое спасибо

...