Kafka-pyspark Потоковая передача: KafkaException: не удалось построить потребитель kafka - PullRequest
0 голосов
/ 07 февраля 2020

Я пытаюсь подписаться на Kafka topi c через pyspark со следующим кодом:

spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate()

lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("kafka.partition.assignment.strategy","range").option("subscribe", "test-events").load()

words = lines.select(explode(split(lines.value, " ")).alias("word"))

wordCounts = words.groupBy("word").count()

query = wordCounts.writeStream.outputMode("complete").format("console").start()

query.awaitTermination()

и использую следующую команду:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 test_events.py

и версии для spark , kafka, java и scala:

spark=2.4.0
kafka=2.12-2.3.0
scala=2.11.12
openJDK=1.8.0_221

Я продолжаю получать следующие ошибки:

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Aggregate [word#26], [word#26, count(1) AS count#30L]
+- Project [word#26]
   +- Generate explode(split(cast(value#8 as string),  )), false, [word#26]
      +- StreamingExecutionRelation KafkaV2[Subscribe[test-events]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:610)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:85)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:199)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:288)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:286)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:255)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:196)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:195)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:190)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:190)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:83)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:83)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:87)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:353)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:349)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
Caused by: org.apache.kafka.common.KafkaException: range ClassNotFoundException exception occurred
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:425)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:400)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:772)
    ... 50 more
Caused by: java.lang.ClassNotFoundException: range
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:348)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:337)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:423)
    ... 53 more


During handling of the above exception, another exception occurred:
pyspark.sql.utils.StreamingQueryException: 'Failed to construct kafka consumer\n=== Streaming Query ===\nIdentifier: [id = 671c0c25-2f29-49f9-8698-c59a89626da7, runId = 37b4d397-4338-4416-a521-384c8853e99b]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nAggregate [word#26], [word#26, count(1) AS count#30L]\n+- Project [word#26]\n   +- Generate explode(split(cast(value#8 as string),  )), false, [word#26]\n      +- StreamingExecutionRelation KafkaV2[Subscribe[test-events]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'
2020-02-07 10:03:38 INFO  SparkContext:54 - Invoking stop() from shutdown hoo

В Интернете есть несколько похожих вопросов, но ни один ответ мне не помог, поэтому далеко. Я также попробовал вышеупомянутое со свечой 2.4.4 со следующим: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 test_events.py, но я продолжаю получать те же самые ошибки.

Ответы [ 3 ]

0 голосов
/ 07 февраля 2020

Решено с помощью:

Кафка версии 2.12-2.2.0
Искра 2.4.0-bin-hadoop2.7
scala 2.11.12

0 голосов
/ 07 февраля 2020

java .lang.ClassNotFoundException: range

Если вам явно не нужна стратегия назначения, удалите эту опцию.

В противном случае она должна быть полностью квалифицированный Java имя класса

0 голосов
/ 07 февраля 2020

Попробуйте изменить kafka.partition.assignment.strategy на roundrobin из диапазона и посмотрите, работает ли он.

lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("kafka.partition.assignment.strategy","roundrobin").option("subscribe", "test-events").load()

Если это не сработает даже после этого, попробуйте добавить kafka-clients-0.10.0.1.jar при отправке задания зажигания.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --jars local:///root/sources/jars/kafka-clients-0.10.0.1.jar --driver-class-path local:///root/sources/jars/kafka-clients-0.10.0.1.jar test_events.py
...