Я пытаюсь присоединиться к потоковому набору данных, так как оба набора данных не имеют компонента времени, текущая временная метка используется для водяных знаков. Я хочу очистить данные от присоединения через 5 минут, поскольку объединения снова не будут соответствовать старым данным. Документация по внутренним соединениям с дополнительным водяным знаком действительно сбивает с толку.
Это говорит
Определить задержки водяного знака на обоих входах - Это делается, как показано ниже
Определить ограничение на событие- время. Это обязательно, чтобы избежать неограниченного состояния. В таких случаях, как я могу очистить его?
val stream1WithWatermark = stream1
.withColumn("stream1_processed_time",current_timestamp())
.withWatermark("stream1_processed_time","5 minutes")
val stream2WithWatermark = stream2
.withColumn("stream2_processed_time",current_timestamp())
.withWatermark("stream2_processed_time","5 minutes")
val joinStream = stream1.join(stream2, expr(stream1key = stream2key)).writeStream.....start()
, когда я использую условие stream1_processed_time >= stream1_processed_time + interval 5 minutes
, оно выдает ошибку ниже, что указывает на ошибку в 2.3.x.
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'stream1_processed_time
Рассматривая все проблемы при работе со структурированной потоковой передачей, можно ли использовать потоковую потоковую передачу - это хорошая идея для объединения потоков?