У меня есть программа структурированного потокового вещания, которая считает слова:
#1
var inputTable = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "XX.XX.XXX.XX:9092")
.option("subscribe", "topic-name")
.option("startingOffsets", "earliest")
.load()
#2
val df = inputTable.select(explode(split($"value".cast("string"), "\\s+")).as("word"))
.groupBy($"word")
.count
#3
val query = df.select($"word", $"count").writeStream.outputMode("complete").format("console").start()
#4
query.awaitTermination()
Теперь я хочу создать окно по времени события (во входной таблице есть столбец «отметка времени»).
Так что мне нужно изменить # 2. Я пробовал:
val df = inputTable.select(explode(split($"value".cast("string"), "\\s+")).as("word"), "timestamp")
.groupBy(window($"timestamp", "1 minute", $"word"))
.count
Но, очевидно, компилятор жалуется, что метод выбора не соответствует сигнатуре метода.