Почему объединение использует строки, которые были отправлены после водяного знака через 20 секунд? - PullRequest
0 голосов
/ 06 декабря 2018

Я использую водяной знак для объединения двух потоков, как вы можете видеть ниже:

val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))

Насколько я понимаю, с помощью приведенного выше кода он будет удерживать каждый поток в течение 20 секунд.После того, как это произойдет, но, когда я даю один поток сейчас, а другой - после 20 сек, тогда и оба присоединяются.Кажется, даже после того, как водяной знак был закончен, Spark хранит данные в памяти.Я даже попробовал через 45 секунд, и это тоже стало соединением.

Это создает путаницу в моем разуме относительно водяного знака.

Ответы [ 2 ]

0 голосов
/ 11 декабря 2018

Цитирование документа из: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking

Другими словами, вам нужно будет выполнить следующие дополнительные шаги в объединении.

Определите задержки водяного знака на обоих входах, чтобы двигательзнает, насколько задержанным может быть ввод (аналогично потоковой агрегации)

Определить ограничение на время события для двух входов, чтобы механизм мог выяснить, когда старые строки одного входа не требуются (т.е. не будет удовлетворять временному ограничению) для совпадений с другим вводом.Это ограничение может быть определено одним из двух способов:

Условия соединения во временном диапазоне (например, ... СОЕДИНЕНИЕ В ЛЕВОМ ВРЕМЕНИ МЕЖДУ ПРАВАМИ ВРЕМЕНИ И ВПРАВО + ИНТЕРВАЛ 1 ЧАСА),

Присоединение к событию-временные окна (например ... JOIN ON leftTimeWindow = rightTimeWindow).

0 голосов
/ 08 декабря 2018

После того, как оно наступит, но, когда я даю один поток сейчас, а другой - через 20 сек, тогда оба они объединяются.

Это возможно, поскольку измеренное время не является временемсобытий по мере их поступления, но время, которое находится внутри поля с водяными знаками, т.е. tstamp_trans.Вы должны убедиться, что последний раз в tstamp_trans составляет 20 секунд после строк, которые будут участвовать в объединении.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...