Spark 2.3.1 Как установить условие диапазона времени события для структурированного потокового внутреннего соединения - PullRequest
0 голосов
/ 04 апреля 2020

Я пытаюсь присоединиться к потоковому набору данных, так как оба набора данных не имеют компонента времени, текущая временная метка используется для водяных знаков. Я хочу очистить данные от присоединения через 5 минут, поскольку объединения снова не будут соответствовать старым данным. Документация по внутренним соединениям с дополнительным водяным знаком действительно сбивает с толку.

Это говорит

  1. Определить задержки водяного знака на обоих входах - Это делается, как показано ниже

  2. Определить ограничение на событие- время. Это обязательно, чтобы избежать неограниченного состояния. В таких случаях, как я могу очистить его?

val stream1WithWatermark = stream1
                            .withColumn("stream1_processed_time",current_timestamp())
                            .withWatermark("stream1_processed_time","5 minutes")
val stream2WithWatermark = stream2
                            .withColumn("stream2_processed_time",current_timestamp())
                            .withWatermark("stream2_processed_time","5 minutes")
val joinStream = stream1.join(stream2, expr(stream1key = stream2key)).writeStream.....start()

, когда я использую условие stream1_processed_time >= stream1_processed_time + interval 5 minutes, оно выдает ошибку ниже, что указывает на ошибку в 2.3.x.

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'stream1_processed_time

Рассматривая все проблемы при работе со структурированной потоковой передачей, можно ли использовать потоковую потоковую передачу - это хорошая идея для объединения потоков?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...