Я вижу странное поведение в приведенном ниже коде.
Когда выполняется stream_trans.write.insertInto("hivetable")
, записи сохраняются в hivetable
. print ("records to be sent :{}".format(stream_trans.count()))
возвращает 0 , а следующая stream_trans.foreach(sendURL)
не выполняет никакой обработки. У кого-нибудь есть объяснения, поскольку я ожидаю, что те же записи, записанные в таблице улья, должны обрабатываться запись за записью со следующей foreach
?
def processStreamingRDD(rdd):
cnt = rdd.count()
print ("Total records found: "+ str(cnt))
if(cnt != 0):
filesDf = loadData(time, rdd)
stream_trans = check_trans_usage(filesDf)
print ("records to be written to hive:{}".format(stream_trans.count()))
stream_trans.write.insertInto("hivetable")
print ("records to be sent :{}".format(stream_trans.count()))
stream_trans.foreach(sendURL)
print("Creating Spark Session")
spark = SparkSession.builder \
.master("local[2]") \
.appName("applicationStream") \
.getOrCreate()
ssc = StreamingContext(spark.sparkContext, 20)
textFileStream = ssc.textFileStream("hdfs:///path/datastream")
textFileStream.foreachRDD(processStreamingRDD)