Я пишу код на основе водяных знаков в структурированном потоке в Pyspark. Все работает нормально, но я получаю дополнительный пустой фрейм данных при отправке некоторых данных из источника.
В моем коде было несколько операторов df.printSchema (), которые я удалил. Все тот же результат.
Вот мой код: -
socketStreamDF = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9991) \
.load()
stocksDF = socketStreamDF.withColumn("value", split("value", ","))\
.withColumn("time", col("value")[0].cast("long").cast("timestamp"))\
.withColumn("symbol", col("value")[1]).withColumn("value", col("value")[2].cast(DoubleType()))
windowedWords = stocksDF\
.withWatermark("time", "5 seconds")\
.groupBy(window("time", "10 seconds"), stocksDF.symbol)\
.sum("value")
query = windowedWords \
.writeStream \
.outputMode("update") \
.format("console") \
.option('truncate', 'false') \
.start()
query.awaitTermination()
Input
1509672910,"aapl",500.0
Ожидаемый результат
Партия 1
Window Symbol Sum(value)
[2017-11-03 07:05:10, 2017-11-03 07:05:20] app1 500
Фактический объем производства
Пакет 1
Window Symbol Sum(value)
[2017-11-03 07:05:10, 2017-11-03 07:05:20] app1 500
Пакет 2
Window Symbol Sum(value)
[Blank]
Обновление 1
Я попытался сохранить ссылку на новый socketStreamDF (с измененными столбцами - значением, временем и символом), как показано ниже, но он все равно не работает: -
socketStreamDF = socketStreamDF.withColumn("value", split("value", ","))\
.withColumn("time", col("value")[0].cast("long").cast("timestamp"))\
.withColumn("symbol", col("value")[1]).withColumn("value", col("value")[2].cast(DoubleType()))