Я пытаюсь заставить потоковую агрегацию / groupBy работать в режиме вывода дополнений, чтобы иметь возможность использовать полученный поток в соединении поток-поток. Я работаю над (Py) Spark 2.3.2 и использую темы Кафки.
Мой псевдокод, как показано ниже, работает в блокноте Zeppelin
orderStream = spark.readStream().format("kafka").option("startingOffsets", "earliest").....
orderGroupDF = (orderStream
.withWatermark("LAST_MOD", "20 seconds")
.groupBy("ID", window("LAST_MOD", "10 seconds", "5 seconds"))
.agg(
collect_list(struct("attra", "attrb2",...)).alias("orders"),
count("ID").alias("number_of_orders"),
sum("PLACED").alias("number_of_placed_orders"),
min("LAST_MOD").alias("first_order_tsd")
)
)
debug = (orderGroupDF.writeStream
.outputMode("append")
.format("memory").queryName("debug").start()
)
После этого я ожидал, что данные появятся в запросе debug
, и я смогу выбрать из него (после истечения 20-секундного окна позднего прибытия. Но никаких данных в запросе отладки не появляется (я ждал несколько минут) )
Когда я изменил режим вывода на update
, запрос работает немедленно.
Есть подсказка, что я делаю не так?
РЕДАКТИРОВАТЬ: после еще нескольких экспериментов, я могу добавить следующее (но я все еще не понимаю).
При запуске приложения Spark по теме, из которой я потребляю, довольно много старых данных (с метками времени события << текущее время). После запуска он, кажется, читает все эти сообщения (например, MicroBatchExecution в отчетах журнала «numRowsTotal = 6224»), но на выходе ничего не генерируется, а водяной знак eventTime в журнале от MicroBatchExecution остается в эпоху (1970-01- 01). </p>
После создания свежего сообщения в теме ввода с очень близким к текущему времени eventTimestamp, запрос немедленно выводит сразу все «поставленные в очередь» записи и ударяет водяной знак eventTime в запросе.
Что я также вижу, что, похоже, проблема с часовым поясом. Мои программы Spark работают в CET (UTC + 2 в настоящее время). Временные метки во входящих сообщениях Kafka указаны в UTC, например, "LAST__MOD": "2019-05-14 12:39:39.955595000"
. Я установил spark_sess.conf.set("spark.sql.session.timeZone", "UTC")
. Тем не менее, в микробатном отчете после того, как это «новое» сообщение было создано в теме ввода, говорится
"eventTime" : {
"avg" : "2019-05-14T10:39:39.955Z",
"max" : "2019-05-14T10:39:39.955Z",
"min" : "2019-05-14T10:39:39.955Z",
"watermark" : "2019-05-14T10:35:25.255Z"
},
Таким образом, eventTime каким-то образом связывается со временем во входящем сообщении, но у него 2 часа свободного времени. Разница UTC была вычтена дважды. Кроме того, я не вижу, как работает расчет водяного знака. Учитывая, что я установил его на 20 секунд, я ожидал бы, что он будет на 20 секунд старше максимального времени события. Но, видимо, это на 4 минуты 14 секунд старше. Я не вижу логики этого.
Я очень смущен ...