Spark структурированная потоковая группа не работает в режиме добавления (работает в обновлении) - PullRequest
0 голосов
/ 13 мая 2019

Я пытаюсь заставить потоковую агрегацию / groupBy работать в режиме вывода дополнений, чтобы иметь возможность использовать полученный поток в соединении поток-поток. Я работаю над (Py) Spark 2.3.2 и использую темы Кафки.

Мой псевдокод, как показано ниже, работает в блокноте Zeppelin

orderStream = spark.readStream().format("kafka").option("startingOffsets", "earliest").....

orderGroupDF = (orderStream
    .withWatermark("LAST_MOD", "20 seconds")
    .groupBy("ID", window("LAST_MOD", "10 seconds", "5 seconds"))
    .agg(
        collect_list(struct("attra", "attrb2",...)).alias("orders"),
        count("ID").alias("number_of_orders"),
        sum("PLACED").alias("number_of_placed_orders"),
        min("LAST_MOD").alias("first_order_tsd")
    )
)

debug = (orderGroupDF.writeStream
  .outputMode("append")
  .format("memory").queryName("debug").start()
)

После этого я ожидал, что данные появятся в запросе debug, и я смогу выбрать из него (после истечения 20-секундного окна позднего прибытия. Но никаких данных в запросе отладки не появляется (я ждал несколько минут) )

Когда я изменил режим вывода на update, запрос работает немедленно.

Есть подсказка, что я делаю не так?

РЕДАКТИРОВАТЬ: после еще нескольких экспериментов, я могу добавить следующее (но я все еще не понимаю).

При запуске приложения Spark по теме, из которой я потребляю, довольно много старых данных (с метками времени события << текущее время). После запуска он, кажется, читает все эти сообщения (например, MicroBatchExecution в отчетах журнала «numRowsTotal = 6224»), но на выходе ничего не генерируется, а водяной знак eventTime в журнале от MicroBatchExecution остается в эпоху (1970-01- 01). </p>

После создания свежего сообщения в теме ввода с очень близким к текущему времени eventTimestamp, запрос немедленно выводит сразу все «поставленные в очередь» записи и ударяет водяной знак eventTime в запросе.

Что я также вижу, что, похоже, проблема с часовым поясом. Мои программы Spark работают в CET (UTC + 2 в настоящее время). Временные метки во входящих сообщениях Kafka указаны в UTC, например, "LAST__MOD": "2019-05-14 12:39:39.955595000". Я установил spark_sess.conf.set("spark.sql.session.timeZone", "UTC"). Тем не менее, в микробатном отчете после того, как это «новое» сообщение было создано в теме ввода, говорится

"eventTime" : {
  "avg" : "2019-05-14T10:39:39.955Z",
  "max" : "2019-05-14T10:39:39.955Z",
  "min" : "2019-05-14T10:39:39.955Z",
  "watermark" : "2019-05-14T10:35:25.255Z"
},

Таким образом, eventTime каким-то образом связывается со временем во входящем сообщении, но у него 2 часа свободного времени. Разница UTC была вычтена дважды. Кроме того, я не вижу, как работает расчет водяного знака. Учитывая, что я установил его на 20 секунд, я ожидал бы, что он будет на 20 секунд старше максимального времени события. Но, видимо, это на 4 минуты 14 секунд старше. Я не вижу логики этого.

Я очень смущен ...

1 Ответ

0 голосов
/ 04 июня 2019

Похоже, это было связано с версией Spark 2.3.2, которую я использовал, и, возможно, более конкретно с SPARK-24156 .Я обновился до Spark 2.4.3, и здесь я сразу получаю результаты groupBy (ну, конечно, после истечения срока действия водяного знака lateThreshold, но «в ожидаемые сроки».

...