Сценарий немного отличается от классического объединения поток-поток
streamA : поток транзакций: transTS, userid, productid, ...
streamB : поток созданных новых продуктов: productid, productname, createTS, ...)
Я хочу объединить транзакции с productIds, но не могу найти комбинацию водяных знаков / условий объединения, чтобы сделатьэто случилось.
streamA_wm = streamA.withWatermark("transTS", "3 minutes")
streamB_wm = streamB.withWatermark("createTS", "1 day")
streamA_wm
.join(streamB_wm, "productId AND transTS >= createTS", "leftOuter")
Результат пуст.
Что я делаю не так?