• 1000
Что создает DStream.
Чтобы работать со временем события (а не временем обработки), я сделал следующее:
outputStream
.foreachRDD(rdd => {
rdd.toDF().withWatermark("timestamp", "60 seconds")
.groupBy(
window($"timestamp", "60 seconds", "10 seconds")
)
.sum("meterIncrement")
.toJSON
.toDF("value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "taxi-dollar-accurate")
.start()
)
})
И я получаю сообщение об ошибке
'writeStream' можно вызывать только в потоковом Dataset / DataFrame
Что меня удивило, потому что источником DF является DStream. В любом случае, мне удалось решить эту проблему, заменив .writeStream на .write и .start () на .save ().
Но у меня возникло ощущение, что я каким-то образом потерял потоковую мощность на этом foreach. Понятно, поэтому я пишу этот вопрос. Это правильный подход? Я видел другие сценарии, в которых используется
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
. Но я не знаю, насколько это отличается от простого вызова foreach в DStream и последующего преобразования каждого RDD в DF.