Cloudera Hadoop 2.6.0-cdh5.14.2 и структурированная потоковая передача - PullRequest
0 голосов
/ 14 ноября 2018

Кто-нибудь смог запустить структурированную потоковую передачу на Hadoop 2.6.0-cdh5.14.2, также используя внешние библиотеки (в основном, spark-sql- *).

Обновление

Прежде всего: отсутствует информация из моего предыдущего поста: Spark имеет версию 2.3.0

По предложению моего удаленного друга я сделал следующее:

  1. Я переехал изPython в Scala (который лучше поддерживается, и это родной язык Spark)
  2. Я запускал структурированное потоковое вещание, используя другие источники, кроме Kafka.

В качестве источника я использовал простой CSV:

$ export SPARK_KAFKA_VERSION=0.10
$ spark2-shell 

scala> import org.apache.spark.sql.Encoders
scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)
scala> val schema = Encoders.product[Amazon].schema
scala> val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]

scala> data.isStreaming 
res0: Boolean = true

scala> val ss = data.writeStream.outputMode("append").format("console")
scala> ss.start()

Магически этот код работал.

Cloudera утверждает, что они не поддерживают структурированную потоковую передачу, в соответствии с этим следующий код, где я только что изменил исходный код, завершается ошибкой:

val data =spark.readStream.format("kafka")... 

Повышение этого исключения:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

И я использую только предоставленные библиотеки Cloudera (без внешних jar).Обратите внимание на

     $ export SPARK_KAFKA_VERSION=0.10

, который используется для принудительного использования версии 0.10 (spark-streaming-kafka- ..), поскольку в кластере также существует версия 0.8.Но нет jar spark-sql-kafka.

На данный момент, AFAIK, проблема в том, что мне не хватает правильной библиотеки (jar).Spark 2.3.0 выглядит здоровым, несмотря на все предупреждения на сайте Cloudera.

enter image description here

Итак ... есть ли возможность иметь "неофициально официальный Cloudera"Баночка ", что решать этот вопрос?Кто-то нашел хороший Jar для развертывания с кодом, который решает эту проблему?Опция Jar from cloudera лучше: внутренняя политика запрещает связывать банки сторонних разработчиков с кодом.

Другой вариант - заново реализовать все компоненты структурированной потоковой передачи с использованием directStreaming.Мне нравится избегать этой работы.

1 Ответ

0 голосов
/ 15 ноября 2018

Я думаю, что это ответ на мой вопрос:

  1. Библиотека от Cloudera существует, и она spark-sql-kafka-0-10_2.11-2.3.0.cloudera2.jar
  2. Если Кафка находится под стражей, он не будет работать. Отключи его.

К сожалению, код должен создать новый group.id для каждого запроса

 18/11/15 10:51:25 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
 18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
 18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2
 18/11/15 10:51:29 ERROR streaming.MicroBatchExecution: Query [id = 099e897f-2a44-4a50-bc57-46f898e05174, runId = b010d8d8-7b73-4f71-8ca5-f3eda47149c6] terminated

И Sentry не позволит этой группе получить доступ к данным. Нет возможности избежать этого, поскольку он закодирован в KafkaSourceProvider.scala код:

enter image description here

Надеюсь, это поможет сэкономить время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...