Обнаружена ошибка NoClassDefFoundError при интеграции структурированного потокового вещания Kafka-Spark через PySpark - PullRequest
0 голосов
/ 08 апреля 2020

Я использую

  • Версия Spark: 3.0.0-preview2
  • Scala версия: 2.12
  • JAVA версия: 1.8
  • Версия Kafka Broker: 2.2.0

Я настроил два JARS ( spark- sql -kafka-0-10_2.12-3.0.0-preview2.jar и kafka-clients-2.2.0.jar ) в spark-defaults.conf сохраните файл JARS в папке $ SPARK_HOME / jars. когда я пытался просмотреть ключ , значение данных с серверов Kafka (поскольку данные из Kafka поступают в виде пар KV в формате JSON), я столкнулся с приведенной ниже ошибкой

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more
Exception in thread "stream execution thread for [id = 504665ad-c59a-4a85-8c46-4d6c741b0adf, runId = 36bc5028-6b34-4d6c-a265-4c38ce66cfcbError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more

А вот код, который я пытался просмотреть пару ключ-значение данных Kafka

from pyspark import *
from pyspark.sql import *
from pyspark.sql.utils import *
from pyspark.streaming import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092") \
  .option("subscribe", "topic1,topic2,topic3") \
  .option("failOnDataLoss", "false") \
  .load()

table = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

query = table \
        .writeStream \
        .outputMode("append") \
        .option("truncate","false") \
        .format("console") \
        .start() \
        .awaitTermination()

Может ли кто-нибудь помочь мне решить эту ошибку? Заранее спасибо!

1 Ответ

0 голосов
/ 09 апреля 2020

Я решил проблему, добавив spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar в папку $ SPARK_HOME / JARS и в spark-defaults.conf файл как

spark.driver.extraClassPath       $SPARK_HOME/jars/*.jar

spark.executor.extraClassPath     $SPARK_HOME/jars/*.jar

И запустил команду spark-submit, как показано ниже:

$SPARK_HOME/bin/spark-submit --master yarn --jars $SPARK_HOME/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,$SPARK_HOME/jars/kafka-clients-2.2.0.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar test.py
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...