Flink WindowFunction Fold - PullRequest
       7

Flink WindowFunction Fold

0 голосов
/ 30 августа 2018

Я создаю скользящее окно и надеюсь рекурсивно упаковать все элементы, входящие в этот период окна, Это кусок кода

.map(x => ((x.pickup.get.latitude, x.pickup.get.longitude), (x.dropoff.get.latitude, x.dropoff.get.longitude)))
        .windowAll(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
        .fold(List[((Double, Double), (Double, Double))]) {(acc, v) => acc :+ ((v._1._1, v._1._2), (v._2._1, v._2._2))}

Я надеюсь создать List, в котором элементы tuple, но это не работает.

Я попробовал это, и это работает:

val l2 : List[((Int, Int), (Int, Int))] = List(((1, 1), (2, 2)))
val newl2 = l2 :+ ((3, 3), (4, 4))

Как я могу это сделать? Большое спасибо

1 Ответ

0 голосов
/ 31 августа 2018

Первый аргумент функции fold должен быть начальным значением, а не типом. Изменение последней строки на:

.fold(List.empty[((Long, Long), (Long, Long))]) {(acc, v) => acc :+ ((v._1._1, v._1._2), (v._2._1, v._2._2))}

должен сделать трюк.

...