Почему DataFrame.write.insertinto при выполнении я обнаружил, что на следующих шагах датафрейм пуст? - PullRequest
0 голосов
/ 23 октября 2019

Я вижу странное поведение в приведенном ниже коде.

Когда выполняется stream_trans.write.insertInto("hivetable"), записи сохраняются в hivetable. print ("records to be sent :{}".format(stream_trans.count())) возвращает 0 , а следующая stream_trans.foreach(sendURL) не выполняет никакой обработки. У кого-нибудь есть объяснения, поскольку я ожидаю, что те же записи, записанные в таблице улья, должны обрабатываться запись за записью со следующей foreach?

def processStreamingRDD(rdd):
    cnt = rdd.count()
    print ("Total records found: "+ str(cnt)) 
    if(cnt != 0):
        filesDf = loadData(time, rdd)
        stream_trans = check_trans_usage(filesDf)
        print ("records to be written to hive:{}".format(stream_trans.count()))
        stream_trans.write.insertInto("hivetable")
        print ("records to be sent :{}".format(stream_trans.count()))
        stream_trans.foreach(sendURL)
print("Creating Spark Session")
spark = SparkSession.builder \
    .master("local[2]") \
    .appName("applicationStream") \
    .getOrCreate()
ssc = StreamingContext(spark.sparkContext, 20)
textFileStream = ssc.textFileStream("hdfs:///path/datastream")
textFileStream.foreachRDD(processStreamingRDD)
...