PySpark: Py4JJavaError: Произошла ошибка при вызове o27.awaitTermination - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь подключиться к производителю Kafka, используя pyspark.Использование производителя консоли для получения данных.

Ниже приведен код.

import findspark
findspark.init('D:\spark-2.4.0-bin-hadoop2.7')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark- 
streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

import sys
import time
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
n_secs = 1
topic = ["streamtest1"]

## spark config
conf = SparkConf().setAppName("KafkaStreamTest").setMaster("local[2]")

# spark context creation
sc = SparkContext(conf=conf)
print(sc)
sc.setLogLevel("WARN")

# create streaming context
ssc = StreamingContext(sc,n_secs)

#kafka direct stream to connect to kafka broker
kafkastream = KafkaUtils.createDirectStream(ssc,topic, 
{'bootstrap.servers':'localhost:9092',
                                        'group.id':'test123',

                         'fetch.message.max.bytes':'15728640'})
print(kafkastream)

#map the Dstream data to lines
lines = kafkastream.map(lambda x : x[1])

words = lines.flatMap(lambda line: line.split(" "))
pairs=words.map(lambda word:(word,1))
counts = pairs.reduceByKey(lambda a,b: a+b)
counts.pprint()

# starts the streaming context
ssc.start()
time.sleep(60)
ssc.awaitTermination()

А ниже приведена ошибка:

Py4JJavaError: Ошибка произошла во времявызов o27.awaitTermination.: org.apache.spark.SparkException: Python вызвал исключение: Traceback (последний вызов был последним):

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...