Я пытался преобразовать старую версию теста потоковой передачи Yahoo для Flink в новую версию, удалив устаревшие классы.
Сейчас я застрял в преобразовании устаревшей кратности () в агрегат (). Я не мог сопоставить существующие параметры сгиба с теми в совокупности.
//old version using fold
val windowedCounts = windowedEvents.fold(new WindowedCount(null, "", 0, new java.sql.Timestamp(0L)),
(acc: WindowedCount, r: (String, String, Timestamp)) => {
val lastUpdate = if (acc.lastUpdate.getTime < r._3.getTime) r._3 else acc.lastUpdate
acc.count += 1
acc.lastUpdate = lastUpdate
acc
},
(key: Tuple, window: TimeWindow, input: Iterable[WindowedCount], out: Collector[WindowedCount]) => {
val windowedCount = input.iterator.next()
println(windowedCount.lastUpdate)
out.collect(new WindowedCount(new java.sql.Timestamp(window.getStart), key.getField(0), windowedCount.count, windowedCount.lastUpdate))
//out.collect(new WindowedCount(new java.sql.Timestamp(window.getStart), key.getField(0), windowedCount.count, windowedCount.lastUpdate))
}
)
val windowedCounts = windowedEvents.aggregate (new CountAggregate)
Я хочу создать класс CountAggregate, расширив класс AggregateFunction (что-то вроде):
class CountAggregate extends AggregateFunction[(String, String, Timestamp), WindowedCount, Collector[WindowedCount]] {
override def createAccumulator() = WindowedCount(null, "", 0, new java.sql.Timestamp(0L))
override def accumulate(acc: WindowedCount, r: (String, String, Timestamp)): WindowedCount = {
val lastUpdate = if (acc.lastUpdate.getTime < r._3.getTime) r._3 else acc.lastUpdate
acc.count += 1
acc.lastUpdate = lastUpdate
acc
}
override def getValue (acc: WindowedCount) = { (key: Tuple, window: TimeWindow, input: Iterable[WindowedCount], out: Collector[WindowedCount]) =>
val windowedCount = input.iterator.next()
println(windowedCount.lastUpdate)
out.collect(new WindowedCount(new java.sql.Timestamp(window.getStart), key.getField(0), windowedCount.count, windowedCount.lastUpdate))
}
Буду признателен за любую помощь в переписывании класса CountAggregate.