Ошибка при запуске spark-submit в скрипте pyspark для чтения потока kafka - PullRequest
1 голос
/ 15 апреля 2020

Я пытаюсь подключиться к потоку kafka, но я получаю следующую ошибку, и, кажется, нет четкого ответа в Интернете. это мой скрипт pyspark:

import pyspark
import os,sys,time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

spark = SparkSession.builder.appName("tweetStream").getOrCreate()

readDF = spark \
        .readStream.format("kafka") \
        .option("kafka.bootstrap.servers","localhost:9092") \
        .option("subscribe", "tweetsTopic") \
        .load()
readDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


query = readDF \
        .writeStream.format("csv") \
        .outputMode("append") \
        .option("checkpointLocation", "/home/kevin/Documents/streamCheckpoints") \
        .option("path","/home/kevin/Documents/tweetCSVs") \
        .start()
query.awaitTermination()

Это ошибка, которую я получаю:

Traceback (most recent call last):
  File "/home/kevin/kafka-pyspark-streaming.py", line 19, in <module>
    .option("subscribe", "tweetsTopic") \
  File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 406, in load
  File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco
  File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 42 more

20/04/15 01:05:17 INFO SparkContext: Invoking stop() from shutdown hook
20/04/15 01:05:17 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
20/04/15 01:05:18 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/15 01:05:18 INFO MemoryStore: MemoryStore cleared
20/04/15 01:05:18 INFO BlockManager: BlockManager stopped
20/04/15 01:05:18 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/15 01:05:18 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/15 01:05:18 INFO SparkContext: Successfully stopped SparkContext
20/04/15 01:05:18 INFO ShutdownHookManager: Shutdown hook called
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-fd96eca2-2f47-4137-b28d-4a1ab500fae0/pyspark-62ca83c3-ff47-4d4d-a233-8c92c1d62059
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-04f64e7c-630f-4e45-a9b5-b7bc6f70ea95
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-fd96eca2-2f47-4137-b28d-4a1ab500fae0

Я думаю, что это может быть проблема совместимости версии или, возможно, проблема переменной среды, но я пытался безрезультатно. Спасибо

1 Ответ

0 голосов
/ 18 апреля 2020
I am getting similar error while running spark-submit in my local VM. Please advice

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 /home/mera/kafka_wordcount.py localhost:2181 test-topic

Ошибка: py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o32.load. : java .lang.NoClassDefFoundError: org / apache / spark / sql / sources / v2 / StreamWriteSupport

kafka_wordcount.py
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("pyspark")
sc = SparkContext(conf=conf)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test-topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
...