У меня есть два потока в моей искровой работе. Допустим, у меня есть сообщение «A», которое появляется каждые 5 минут, и сообщение «B», которое появляется один раз каждые 6 часов, но всегда появляется в начале, прежде чем какое-либо сообщение B начнет качать для определенного общего ключа (это не гарантируется чтобы всегда приходить, иногда это никогда не приходит).
Данные в потоке сообщения B имеют одно поле, которое мне нужно запросить и добавить ко всем быстро перемещающимся сообщениям A. Сообщения A и B имеют общий ключ, который используется для поиска правильного сообщения B, которое соответствует конкретному сообщению A. Мы используем Кафку в качестве источника для искровой работы.
Я бы просто хотел запросить поток сообщения B, чтобы получить этот столбец, и использовать .withColumn (), чтобы добавить это поле из потока B в коррелированные сообщения A. Затем немедленно запишите «обогащенное» сообщение «А» без задержек или задержек с водяными знаками. Я не смог заставить это работать с пользовательским UDF.
Я пошел по пути использования левого соединения, но я получаю некоторые ошибки.
Потоковое внешнее объединение между двумя потоковыми наборами данных / наборами данных не поддерживается без водяного знака в ключах соединения или водяного знака со стороны NULL и соответствующего условия диапазона;
Я добавил это
val joinExpr =
expr("""
truckId= tripTruckId AND
processingtimestamp >= associatetimestamp AND
processingtimestamp <= associatetimestamp + interval 1 minutes
"""
)
val truckTripStatus = truckStatus
.join(tripStatusWithTimestamp,$"truckId" === $"tripTruckId", "leftouter")
.selectExpr("CAST(truckId AS STRING) AS key", "to_json(struct(*)) AS value")
но ошибка все еще там. Что такое «водяной знак в ключе соединения»?
Послание B необходимо хранить в неограниченном столе на срок до 5 дней. Быстрое сообщение A следует удалить из таблицы потоковой передачи после того, как сообщение A было записано в тему вывода kafka.
Я думаю, что искра хочет, чтобы я поставил водяной знак на медленно движущемся сообщении B (это обнуляемая сторона левого соединения). И, делая это, искра будет ждать, пока период водяного знака пройдет, прежде чем она записывает данные для сообщения A. Ну, это 5 дней ... так что это не сработает для моих нужд.
Мое требование - выписать все сообщения a немедленно, когда они попадут в поток искры, и если у них есть коррелирующая строка из потока B, возьмите поле из коррелирующего сообщения B и добавьте его в сообщение A, затем запишите его, но без задержек. Мне нужно, чтобы все сообщения B оставались в течение 5 дней или до того времени, когда новое сообщение B с тем же ключом перезапишет все предыдущие сообщения.
Удаляют ли водяные знаки в не оконных потоках / удаляют более старые поступившие вовремя данные во входной таблице в тот момент, когда последнее сообщение с водяным знаком + задержка водяного знака превышает время любого более старого сообщения?