Spark Streaming Left Объединяет быстро движущиеся данные с медленно движущимися данными с правой стороны, но не замедляет вывод с левой стороны - PullRequest
0 голосов
/ 17 мая 2019

У меня есть два потока в моей искровой работе. Допустим, у меня есть сообщение «A», которое появляется каждые 5 минут, и сообщение «B», которое появляется один раз каждые 6 часов, но всегда появляется в начале, прежде чем какое-либо сообщение B начнет качать для определенного общего ключа (это не гарантируется чтобы всегда приходить, иногда это никогда не приходит).

Данные в потоке сообщения B имеют одно поле, которое мне нужно запросить и добавить ко всем быстро перемещающимся сообщениям A. Сообщения A и B имеют общий ключ, который используется для поиска правильного сообщения B, которое соответствует конкретному сообщению A. Мы используем Кафку в качестве источника для искровой работы.

Я бы просто хотел запросить поток сообщения B, чтобы получить этот столбец, и использовать .withColumn (), чтобы добавить это поле из потока B в коррелированные сообщения A. Затем немедленно запишите «обогащенное» сообщение «А» без задержек или задержек с водяными знаками. Я не смог заставить это работать с пользовательским UDF.

Я пошел по пути использования левого соединения, но я получаю некоторые ошибки.

Потоковое внешнее объединение между двумя потоковыми наборами данных / наборами данных не поддерживается без водяного знака в ключах соединения или водяного знака со стороны NULL и соответствующего условия диапазона;

Я добавил это


        val joinExpr =
      expr("""
      truckId= tripTruckId AND
      processingtimestamp >= associatetimestamp AND
      processingtimestamp <= associatetimestamp + interval 1 minutes
      """
      )

    val truckTripStatus = truckStatus
      .join(tripStatusWithTimestamp,$"truckId" === $"tripTruckId", "leftouter")
      .selectExpr("CAST(truckId AS STRING) AS key", "to_json(struct(*)) AS value")

но ошибка все еще там. Что такое «водяной знак в ключе соединения»?

Послание B необходимо хранить в неограниченном столе на срок до 5 дней. Быстрое сообщение A следует удалить из таблицы потоковой передачи после того, как сообщение A было записано в тему вывода kafka.

Я думаю, что искра хочет, чтобы я поставил водяной знак на медленно движущемся сообщении B (это обнуляемая сторона левого соединения). И, делая это, искра будет ждать, пока период водяного знака пройдет, прежде чем она записывает данные для сообщения A. Ну, это 5 дней ... так что это не сработает для моих нужд.

Мое требование - выписать все сообщения a немедленно, когда они попадут в поток искры, и если у них есть коррелирующая строка из потока B, возьмите поле из коррелирующего сообщения B и добавьте его в сообщение A, затем запишите его, но без задержек. Мне нужно, чтобы все сообщения B оставались в течение 5 дней или до того времени, когда новое сообщение B с тем же ключом перезапишет все предыдущие сообщения.

Удаляют ли водяные знаки в не оконных потоках / удаляют более старые поступившие вовремя данные во входной таблице в тот момент, когда последнее сообщение с водяным знаком + задержка водяного знака превышает время любого более старого сообщения?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...