интеграция pyspark - kafka: отсутствует библиотека lib - PullRequest
0 голосов
/ 07 декабря 2018

Я следую инструкциям Databricks по этому адресу, чтобы начать проект с Kafka:

Руководство по интеграции структурированного потокового вещания + Kafka (брокер Kafka версии 0.10.0 или выше

Код:

# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json

spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')


trainingSchema = StructType([
  StructField("code",StringType(),True),
  StructField("ean",StringType(),True),
  StructField("name",StringType(),True),
  StructField("description",StringType(),True),
  StructField("category",StringType(),True),
  StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)

broker, topic = 
['kafka.partner.stg.some.domain:9092','hybris.products']

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 
"kafka.partner.stg.some.domain:9092") \
.option("subscribe", "hybris.products") \
.option("startingOffsets", "earliest") \
.load()

Моя версия Hadoop - 2.6, а версия Spark - 2.3.0

Командная строка с spark-submit:

spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py

Сообщение об ошибке:

Py4JJavaError: Произошла ошибка при вызове o48.load.: Java.lang.NoClassDefFoundError: org / apache / kafka / common/ serialization / ByteArrayDeserializer по адресу org.apache.spark.sql.kafka010.KafkaSourceProvider $. (KafkaSourceProvider.scala: 413) по адресу org.apache.spark.sql.kafka010.KafkaSourceProvider $..sql.kafka010.KafkaSourceProvider.validateStreamOptions (KafkaSourceProvider.scala: 360) в org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema (KafkaSourceProvider.scala: 64) в org.ataSource.sourceSchema (DataSource.scala: 231) в org.apache.spark.sql.execution.datasources.DataSource.sourceInfo $ lzycompute (DataSource.scala: 94) в org.apache.spark.sql.execution.datasources.DataSource.sourceInfo (DataSource.scala: 94) в org.apache.spark.sql.execution.streaming.StreamingRelation $ .apply (StreamingRelation.scala: 33) в org.apache.spark.sql.streaming.DataStreamReader.load (DataStreamReader.scala: 170) at sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImplanj.jjjj.refle.Method.invoke (Method.java:498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (.java: 282) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.commands.CallCommand.execute (CallCommand.java: 79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:745). Причина: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer на java.net.URLClassLoader.findClass (URLClassLoader.java:381) на java.lang.ClassLoader.loadClass (ClassLoader.java:424) на java.lang.ClassLoader.loadClass (ClassLoader.java:357) * 101022 *

Как вы можете проверить на веб-сайте, который я упомянул выше, файл jar , который я импортирую, является точно таким же файлом.Итак, я понятия не имею, почему это происходит.Может быть, другой модуль не упоминается?Я действительно потерян здесь

1 Ответ

0 голосов
/ 07 декабря 2018

Упомянутый JAR не включает в себя все зависимости с клиентами kafka.Вам лучше использовать --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 (как указано в документации в разделе Развертывание: https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...