У меня есть конвейер, который использует данные следующей формы: case class Foo(source: String, destination: String){def key=source+destination}
Я хочу удалить все дубликаты source+destination
, поступающие в один и тот же час, а затем я хочу подсчитать все вызовы, поступающие в пункт назначения в тот же час , Я создал конвейер с src ~> timewindow1(1 hour, keyBy:key) ~> timewindow2(1 hour, keyBy:destination) ~> ...
, я должен использовать timewindowAll
в timewindow2?