Пустой фрейм данных с помощью структурированного потокового фрейма PySpark - PullRequest
0 голосов
/ 10 июня 2019

Я пишу код на основе водяных знаков в структурированном потоке в Pyspark. Все работает нормально, но я получаю дополнительный пустой фрейм данных при отправке некоторых данных из источника.

В моем коде было несколько операторов df.printSchema (), которые я удалил. Все тот же результат.

Вот мой код: -

socketStreamDF = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9991) \
.load()

stocksDF = socketStreamDF.withColumn("value", split("value", ","))\
.withColumn("time", col("value")[0].cast("long").cast("timestamp"))\
.withColumn("symbol", col("value")[1]).withColumn("value", col("value")[2].cast(DoubleType()))

windowedWords = stocksDF\
.withWatermark("time", "5 seconds")\
.groupBy(window("time", "10 seconds"), stocksDF.symbol)\
.sum("value")

query = windowedWords \
.writeStream \
.outputMode("update") \
.format("console") \
.option('truncate', 'false') \
.start()

query.awaitTermination()

Input

1509672910,"aapl",500.0

Ожидаемый результат

Партия 1

Window                                        Symbol   Sum(value)
[2017-11-03 07:05:10, 2017-11-03 07:05:20]    app1      500

Фактический объем производства

Пакет 1

Window                                        Symbol   Sum(value)
[2017-11-03 07:05:10, 2017-11-03 07:05:20]    app1      500

Пакет 2

Window                                        Symbol   Sum(value)
[Blank]

Обновление 1

Я попытался сохранить ссылку на новый socketStreamDF (с измененными столбцами - значением, временем и символом), как показано ниже, но он все равно не работает: -

socketStreamDF = socketStreamDF.withColumn("value", split("value", ","))\
    .withColumn("time", col("value")[0].cast("long").cast("timestamp"))\
    .withColumn("symbol", col("value")[1]).withColumn("value", col("value")[2].cast(DoubleType()))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...