Я пытаюсь подключиться к потоку kafka, но я получаю следующую ошибку, и, кажется, нет четкого ответа в Интернете. это мой скрипт pyspark:
import pyspark
import os,sys,time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
spark = SparkSession.builder.appName("tweetStream").getOrCreate()
readDF = spark \
.readStream.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe", "tweetsTopic") \
.load()
readDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = readDF \
.writeStream.format("csv") \
.outputMode("append") \
.option("checkpointLocation", "/home/kevin/Documents/streamCheckpoints") \
.option("path","/home/kevin/Documents/tweetCSVs") \
.start()
query.awaitTermination()
Это ошибка, которую я получаю:
Traceback (most recent call last):
File "/home/kevin/kafka-pyspark-streaming.py", line 19, in <module>
.option("subscribe", "tweetsTopic") \
File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 406, in load
File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco
File "/home/kevin/spark-3.0.0-preview2-bin-hadoop3.2/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 42 more
20/04/15 01:05:17 INFO SparkContext: Invoking stop() from shutdown hook
20/04/15 01:05:17 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
20/04/15 01:05:18 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/15 01:05:18 INFO MemoryStore: MemoryStore cleared
20/04/15 01:05:18 INFO BlockManager: BlockManager stopped
20/04/15 01:05:18 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/15 01:05:18 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/15 01:05:18 INFO SparkContext: Successfully stopped SparkContext
20/04/15 01:05:18 INFO ShutdownHookManager: Shutdown hook called
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-fd96eca2-2f47-4137-b28d-4a1ab500fae0/pyspark-62ca83c3-ff47-4d4d-a233-8c92c1d62059
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-04f64e7c-630f-4e45-a9b5-b7bc6f70ea95
20/04/15 01:05:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-fd96eca2-2f47-4137-b28d-4a1ab500fae0
Я думаю, что это может быть проблема совместимости версии или, возможно, проблема переменной среды, но я пытался безрезультатно. Спасибо