Я пытаюсь подключиться к производителю Kafka, используя pyspark
.Использование производителя консоли для получения данных.
Ниже приведен код.
import findspark
findspark.init('D:\spark-2.4.0-bin-hadoop2.7')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-
streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
import sys
import time
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
n_secs = 1
topic = ["streamtest1"]
## spark config
conf = SparkConf().setAppName("KafkaStreamTest").setMaster("local[2]")
# spark context creation
sc = SparkContext(conf=conf)
print(sc)
sc.setLogLevel("WARN")
# create streaming context
ssc = StreamingContext(sc,n_secs)
#kafka direct stream to connect to kafka broker
kafkastream = KafkaUtils.createDirectStream(ssc,topic,
{'bootstrap.servers':'localhost:9092',
'group.id':'test123',
'fetch.message.max.bytes':'15728640'})
print(kafkastream)
#map the Dstream data to lines
lines = kafkastream.map(lambda x : x[1])
words = lines.flatMap(lambda line: line.split(" "))
pairs=words.map(lambda word:(word,1))
counts = pairs.reduceByKey(lambda a,b: a+b)
counts.pprint()
# starts the streaming context
ssc.start()
time.sleep(60)
ssc.awaitTermination()
А ниже приведена ошибка:
Py4JJavaError: Ошибка произошла во времявызов o27.awaitTermination.: org.apache.spark.SparkException: Python вызвал исключение: Traceback (последний вызов был последним):