Мы пишем приложение Stream Streaming, чтобы читать сообщения kafka методом createStream, и интервал между пакетами составляет 180 секунд.
Код успешно работает и создает файлы каждые 180 секунд в сегменты s3, но в файлах нет сообщений. Ниже находится окружающая среда
Spark 2.3.0
Какфа 1,0
Пожалуйста, пройдите код и, пожалуйста, дайте мне знать, что здесь не так
#import dependencies
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from pyspark.sql import *
Создание переменных контекста
sc = SparkContext(appName="SparkStreamingwithPython").getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,180)
topic="thirdtopic"
ZkQuorum = "localhost:2181"
Подключитесь к Кафке и создайте Stream
kakfaStream = KafkaUtils.createStream(ssc,ZkQuorum,"Spark-Streaming-Consumer",{topic:1})
def WritetoS3(rdd):
rdd.saveAsTextFile("s3://BucketName/thirdtopic/SparkOut")
kakfaStream.foreachRDD(WritetoS3)
ssc.start()
ssc.awaitTermination()
Заранее спасибо.