Я пытаюсь подписаться на Kafka topi c через pyspark со следующим кодом:
spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate()
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("kafka.partition.assignment.strategy","range").option("subscribe", "test-events").load()
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
и использую следующую команду:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 test_events.py
и версии для spark , kafka, java и scala:
spark=2.4.0
kafka=2.12-2.3.0
scala=2.11.12
openJDK=1.8.0_221
Я продолжаю получать следующие ошибки:
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Aggregate [word#26], [word#26, count(1) AS count#30L]
+- Project [word#26]
+- Generate explode(split(cast(value#8 as string), )), false, [word#26]
+- StreamingExecutionRelation KafkaV2[Subscribe[test-events]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:610)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:85)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:199)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:288)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:286)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:255)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:196)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:195)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:190)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:190)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:83)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:83)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:87)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:349)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Caused by: org.apache.kafka.common.KafkaException: range ClassNotFoundException exception occurred
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:425)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:400)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:387)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:772)
... 50 more
Caused by: java.lang.ClassNotFoundException: range
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:348)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:337)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:423)
... 53 more
During handling of the above exception, another exception occurred:
pyspark.sql.utils.StreamingQueryException: 'Failed to construct kafka consumer\n=== Streaming Query ===\nIdentifier: [id = 671c0c25-2f29-49f9-8698-c59a89626da7, runId = 37b4d397-4338-4416-a521-384c8853e99b]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nAggregate [word#26], [word#26, count(1) AS count#30L]\n+- Project [word#26]\n +- Generate explode(split(cast(value#8 as string), )), false, [word#26]\n +- StreamingExecutionRelation KafkaV2[Subscribe[test-events]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'
2020-02-07 10:03:38 INFO SparkContext:54 - Invoking stop() from shutdown hoo
В Интернете есть несколько похожих вопросов, но ни один ответ мне не помог, поэтому далеко. Я также попробовал вышеупомянутое со свечой 2.4.4 со следующим: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 test_events.py
, но я продолжаю получать те же самые ошибки.