Я пытаюсь создать приложение pyspark, которое считывает потоковые данные от производителя kafka. Я также начал производство кафки на "localhost: 9092". Я использую kafka_2.13-2.4.1.
Это мой код для потокового приложения Pyspark:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.5'
topic = "covid"
if __name__ == "__main__":
print("Starting Structured Streaming")
#schema = S
spark = SparkSession \
.builder \
.appName("twitterStreamer") \
.master("local[*]") \
.config("spark.jars","file:///C://Users//Kaushik Bhartiya//Desktop//Kartik//BI-Dashboard//jars//spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar") \
.config("spark.driver.extraClassPath", "file:///C://Users//Kaushik Bhartiya//Desktop//Kartik//BI-Dashboard//jars//kafka-clients-2.5.0.jar") \
.getOrCreate()
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","covid").load()
print("=========================")
#sc = SparkContext(appName="twitterStreamer")
#sc.setLogLevel("WARN")
при его запуске выдает следующую ошибку:
20/04/21 15:41:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/21 15:41:24 INFO SparkContext: Running Spark version 3.0.0-preview2
20/04/21 15:41:24 INFO ResourceUtils: ==============================================================
20/04/21 15:41:24 INFO ResourceUtils: Resources for spark.driver:
20/04/21 15:41:24 INFO ResourceUtils: ==============================================================
20/04/21 15:41:24 INFO SparkContext: Submitted application: StructuredKafkaWordCount
20/04/21 15:41:25 INFO SecurityManager: Changing view acls to: Kaushik Bhartiya
20/04/21 15:41:25 INFO SecurityManager: Changing modify acls to: Kaushik Bhartiya
20/04/21 15:41:25 INFO SecurityManager: Changing view acls groups to:
20/04/21 15:41:25 INFO SecurityManager: Changing modify acls groups to:
20/04/21 15:41:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kaushik Bhartiya); groups with view permissions: Set(); users with modify permissions: Set(Kaushik Bhartiya); groups with modify permissions: Set()
20/04/21 15:41:28 INFO Utils: Successfully started service 'sparkDriver' on port 63341.
20/04/21 15:41:28 INFO SparkEnv: Registering MapOutputTracker
20/04/21 15:41:28 INFO SparkEnv: Registering BlockManagerMaster
20/04/21 15:41:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/04/21 15:41:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/04/21 15:41:28 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
20/04/21 15:41:28 INFO DiskBlockManager: Created local directory at C:\Users\Kaushik Bhartiya\AppData\Local\Temp\blockmgr-01094264-a7b0-4e00-9fd0-8532dda94a0e
20/04/21 15:41:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
20/04/21 15:41:29 INFO SparkEnv: Registering OutputCommitCoordinator
20/04/21 15:41:30 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/04/21 15:41:30 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-TJ2G2FM:4040
20/04/21 15:41:31 INFO Executor: Starting executor ID driver on host DESKTOP-TJ2G2FM
20/04/21 15:41:31 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63398.
20/04/21 15:41:31 INFO NettyBlockTransferService: Server created on DESKTOP-TJ2G2FM:63398
20/04/21 15:41:31 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/04/21 15:41:31 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-TJ2G2FM:63398 with 366.3 MiB RAM, BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/Users/Kaushik%20Bhartiya/Desktop/Kartik/BI-Dashboard/spark-warehouse').
20/04/21 15:41:33 INFO SharedState: Warehouse path is 'file:/C:/Users/Kaushik%20Bhartiya/Desktop/Kartik/BI-Dashboard/spark-warehouse'.
Traceback (most recent call last):
File "C:/spark-3.0.0-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py", line 64, in <module>
lines = spark\
File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\streaming.py", line 406, in load
File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 98, in deco
File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at java.util.ServiceLoader$LazyIterator.nextService(Unknown Source)
at java.util.ServiceLoader$LazyIterator.next(Unknown Source)
at java.util.ServiceLoader$1.next(Unknown Source)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 44 more
20/04/21 15:41:34 INFO SparkContext: Invoking stop() from shutdown hook
20/04/21 15:41:35 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-TJ2G2FM:4040
20/04/21 15:41:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/21 15:41:35 INFO MemoryStore: MemoryStore cleared
20/04/21 15:41:35 INFO BlockManager: BlockManager stopped
20/04/21 15:41:35 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/21 15:41:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/21 15:41:35 INFO SparkContext: Successfully stopped SparkContext
20/04/21 15:41:35 INFO ShutdownHookManager: Shutdown hook called
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-639a5526-614d-4399-bda7-e113206f3a76
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-385da83d-384e-4e56-af42-af5614985797
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-385da83d-384e-4e56-af42-af5614985797\pyspark-bc843f59-81b8-431a-b88a-b21e0bcf4b4b
Я даже пробовал запускать примеры Кафки из папки примеров из spark, которая выдает ту же ошибку.