Кто-нибудь смог запустить структурированную потоковую передачу на Hadoop 2.6.0-cdh5.14.2, также используя внешние библиотеки (в основном, spark-sql- *).
Обновление
Прежде всего: отсутствует информация из моего предыдущего поста: Spark имеет версию 2.3.0
По предложению моего удаленного друга я сделал следующее:
- Я переехал изPython в Scala (который лучше поддерживается, и это родной язык Spark)
- Я запускал структурированное потоковое вещание, используя другие источники, кроме Kafka.
В качестве источника я использовал простой CSV:
$ export SPARK_KAFKA_VERSION=0.10
$ spark2-shell
scala> import org.apache.spark.sql.Encoders
scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)
scala> val schema = Encoders.product[Amazon].schema
scala> val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]
scala> data.isStreaming
res0: Boolean = true
scala> val ss = data.writeStream.outputMode("append").format("console")
scala> ss.start()
Магически этот код работал.
Cloudera утверждает, что они не поддерживают структурированную потоковую передачу, в соответствии с этим следующий код, где я только что изменил исходный код, завершается ошибкой:
val data =spark.readStream.format("kafka")...
Повышение этого исключения:
java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 50 more
И я использую только предоставленные библиотеки Cloudera (без внешних jar).Обратите внимание на
$ export SPARK_KAFKA_VERSION=0.10
, который используется для принудительного использования версии 0.10 (spark-streaming-kafka- ..), поскольку в кластере также существует версия 0.8.Но нет jar spark-sql-kafka.
На данный момент, AFAIK, проблема в том, что мне не хватает правильной библиотеки (jar).Spark 2.3.0 выглядит здоровым, несмотря на все предупреждения на сайте Cloudera.
Итак ... есть ли возможность иметь "неофициально официальный Cloudera"Баночка ", что решать этот вопрос?Кто-то нашел хороший Jar для развертывания с кодом, который решает эту проблему?Опция Jar from cloudera лучше: внутренняя политика запрещает связывать банки сторонних разработчиков с кодом.
Другой вариант - заново реализовать все компоненты структурированной потоковой передачи с использованием directStreaming.Мне нравится избегать этой работы.