Я следую инструкциям Databricks по этому адресу, чтобы начать проект с Kafka:
Руководство по интеграции структурированного потокового вещания + Kafka (брокер Kafka версии 0.10.0 или выше
Код:
# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
trainingSchema = StructType([
StructField("code",StringType(),True),
StructField("ean",StringType(),True),
StructField("name",StringType(),True),
StructField("description",StringType(),True),
StructField("category",StringType(),True),
StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)
broker, topic =
['kafka.partner.stg.some.domain:9092','hybris.products']
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"kafka.partner.stg.some.domain:9092") \
.option("subscribe", "hybris.products") \
.option("startingOffsets", "earliest") \
.load()
Моя версия Hadoop - 2.6, а версия Spark - 2.3.0
Командная строка с spark-submit
:
spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py
Сообщение об ошибке:
Py4JJavaError: Произошла ошибка при вызове o48.load.: Java.lang.NoClassDefFoundError: org / apache / kafka / common/ serialization / ByteArrayDeserializer по адресу org.apache.spark.sql.kafka010.KafkaSourceProvider $. (KafkaSourceProvider.scala: 413) по адресу org.apache.spark.sql.kafka010.KafkaSourceProvider $..sql.kafka010.KafkaSourceProvider.validateStreamOptions (KafkaSourceProvider.scala: 360) в org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema (KafkaSourceProvider.scala: 64) в org.ataSource.sourceSchema (DataSource.scala: 231) в org.apache.spark.sql.execution.datasources.DataSource.sourceInfo $ lzycompute (DataSource.scala: 94) в org.apache.spark.sql.execution.datasources.DataSource.sourceInfo (DataSource.scala: 94) в org.apache.spark.sql.execution.streaming.StreamingRelation $ .apply (StreamingRelation.scala: 33) в org.apache.spark.sql.streaming.DataStreamReader.load (DataStreamReader.scala: 170) at sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImplanj.jjjj.refle.Method.invoke (Method.java:498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (.java: 282) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.commands.CallCommand.execute (CallCommand.java: 79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:745). Причина: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer на java.net.URLClassLoader.findClass (URLClassLoader.java:381) на java.lang.ClassLoader.loadClass (ClassLoader.java:424) на java.lang.ClassLoader.loadClass (ClassLoader.java:357) * 101022 *
Как вы можете проверить на веб-сайте, который я упомянул выше, файл jar , который я импортирую, является точно таким же файлом.Итак, я понятия не имею, почему это происходит.Может быть, другой модуль не упоминается?Я действительно потерян здесь