Я в основном использую пример, приведенный в документации Spark здесь со встроенным тестовым потоком, в котором один поток опережает на 3 секунды (первоначально использовал kafka, но столкнулся с той же проблемой).Результаты вернули столбцы соответствия правильно, однако через некоторое время тот же ключ возвращается с внешним нулем.
Это ожидаемое поведение?Есть ли способ исключить повторяющиеся внешние нулевые результаты при совпадении?
Код:
val testStream = session.readStream.format("rate")
.option("rowsPerSecond", "5").option("numPartitions", "1").load()
val impressions = testStream
.select(
(col("value") + 15).as("impressionAdId"),
col("timestamp").as("impressionTime"))
val clicks = testStream
.select(
col("value").as("clickAdId"),
col("timestamp").as("clickTime"))
// Apply watermarks on event-time columns
val impressionsWithWatermark =
impressions.withWatermark("impressionTime", "20 seconds")
val clicksWithWatermark =
clicks.withWatermark("clickTime", "30 seconds")
// Join with event-time constraints
val result = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 10 seconds
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)
val query = result.writeStream.outputMode("update").format("console").option("truncate", false).start()
query.awaitTermination()
Результат:
-------------------------------------------
Batch: 19
-------------------------------------------
+--------------+-----------------------+---------+-----------------------+
|impressionAdId|impressionTime |clickAdId|clickTime |
+--------------+-----------------------+---------+-----------------------+
|100 |2018-05-23 22:18:38.362|100 |2018-05-23 22:18:41.362|
|101 |2018-05-23 22:18:38.562|101 |2018-05-23 22:18:41.562|
|102 |2018-05-23 22:18:38.762|102 |2018-05-23 22:18:41.762|
|103 |2018-05-23 22:18:38.962|103 |2018-05-23 22:18:41.962|
|104 |2018-05-23 22:18:39.162|104 |2018-05-23 22:18:42.162|
+--------------+-----------------------+---------+-----------------------+
-------------------------------------------
Batch: 57
-------------------------------------------
+--------------+-----------------------+---------+-----------------------+
|impressionAdId|impressionTime |clickAdId|clickTime |
+--------------+-----------------------+---------+-----------------------+
|290 |2018-05-23 22:19:16.362|290 |2018-05-23 22:19:19.362|
|291 |2018-05-23 22:19:16.562|291 |2018-05-23 22:19:19.562|
|292 |2018-05-23 22:19:16.762|292 |2018-05-23 22:19:19.762|
|293 |2018-05-23 22:19:16.962|293 |2018-05-23 22:19:19.962|
|294 |2018-05-23 22:19:17.162|294 |2018-05-23 22:19:20.162|
|100 |2018-05-23 22:18:38.362|null |null |
|99 |2018-05-23 22:18:38.162|null |null |
|103 |2018-05-23 22:18:38.962|null |null |
|101 |2018-05-23 22:18:38.562|null |null |
|102 |2018-05-23 22:18:38.762|null |null |
+--------------+-----------------------+---------+-----------------------+