Я хочу рассчитать совокупное количество значений в столбце фрейма данных за последние 1 час, используя движущееся окно. Я могу получить ожидаемый результат с помощью оконной функции pyspark (без потоковой передачи), используя rangeBetween, но я хочу использовать обработку данных в реальном времени, поэтому стараюсь использовать потоковую структурированную искру, чтобы, если в систему поступала какая-либо новая запись / транзакция, я получал желаемый результат.
данные похожи на
time,col
2019-04-27 01:00:00,A
2019-04-27 00:01:00,A
2019-04-27 00:05:00,B
2019-04-27 01:01:00,A
2019-04-27 00:08:00,B
2019-04-27 00:03:00,A
2019-04-27 03:03:00,A
с использованием pyspark (без потоковой передачи)
from pyspark.sql.window import Window
df = sqlContext.read.format("csv") \
.options(header='true', inferschema='false',delimiter=',') \
.load(r'/datalocation')
df=df.withColumn("numddate",unix_timestamp(df.time, "yyyy-MM-dd HH:mm:ss"))
w1=Window.partitionBy("col").orderBy("numddate").rangeBetween(-3600, -1)
df=df.withColumn("B_cumulative_count", count("col").over(w1))
+-------------------+---+----------+------------------+
| time|col| numddate|B_cumulative_count|
+-------------------+---+----------+------------------+
|2019-04-27 00:05:00| B|1556348700| 0|
|2019-04-27 00:08:00| B|1556348880| 1|
|2019-04-27 00:01:00| A|1556348460| 0|
|2019-04-27 00:03:00| A|1556348580| 1|
|2019-04-27 01:00:00| A|1556352000| 2|
|2019-04-27 01:01:00| A|1556352060| 3|
|2019-04-27 03:03:00| A|1556359380| 0|
+-------------------+---+----------+------------------+
(This is what I required, so getting it by above code)
Структурированная потоковая передача, вот что я пытаюсь
userSchema = StructType([
StructField("time", TimestampType()),
StructField("col", StringType())
])
lines2 = spark \
.readStream \
.format('csv')\
.schema(userSchema)\
.csv("/datalocation")
windowedCounts = lines2.groupBy(
window(lines2.time, "1 hour"),
lines2.col
).count()
windowedCounts.writeStream.format("memory").outputMode("complete").queryName("test2").option("truncate","false").start()
spark.table("test2").show(truncate=False)
streaming output:
+------------------------------------------+---+-----+
|window |col|count|
+------------------------------------------+---+-----+
|[2019-04-27 03:00:00, 2019-04-27 04:00:00]|A |1 |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|A |2 |
|[2019-04-27 01:00:00, 2019-04-27 02:00:00]|A |2 |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|B |2 |
+------------------------------------------+---+-----+
Как тиражировать то же самое с использованием искровой структурированной потоковой передачи?