У меня есть события DataStream of Kafka (соответствующие показаниям с устройства), поданные в следующий код, который выдает среднее значение по каждому устройству для чтения в скользящем окне. Это отлично работает. Затем я хочу вычислить сумму всех средних значений для каждого устройства в одном и том же окне, это та часть, которую я не могу синтаксически правильно выразить.
Эта часть работает:
val stream = env
// words is our Kafka topic
.addSource(kafkaConsumer)
// configure timestamp and watermark assigner
.assignTimestampsAndWatermarks(new DeviceTSAssigner)
.keyBy(_.deviceIdFull)
.timeWindow(Time.minutes(5), Time.minutes(1))
/* count events in window */
.apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
}
stream.print()
Вывод что-то вроде
(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
...
...
Следующий код - часть, с которой у меня проблемы, я не уверен, как именно это кодировать, но я думаю, что это должно быть примерно так:
val avgStream = stream
.keyBy(2) // 2 represents the window.End from stream, see code above
.timeWindow(Time.minutes(1)) // tumbling window
.apply { (
key: Long,
window: TimeWindow,
events: Iterable[(String, Long, Double)],
out: Collector[(Long, Double)]) =>
out.collect( (key, events.map( _._3 ).sum ))
}
Я получаю следующие ошибки при компиляции этого кода ..
Error:(70, 52) type mismatch;
found : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
out: Collector[(Long, Double)]) =>
Я пробовал и другие варианты, такие как использование AggregtionFunctions, но не смог пройти компиляцию. Судя по ошибке, мне нужно преобразовать элементы входного потока в кортежи, я видел некоторый код, но не совсем уверен, как это сделать. Я совершенно новичок в Scala, поэтому я думаю, что это главная проблема, потому что то, что я хочу сделать, не сложно.
Обновлено 07.04 2018
Я думаю, что у меня есть решение для моего вопроса, кажется, работает нормально, но я все еще хотел бы держать это открытым, надеясь, что кто-то еще может прокомментировать его (вопрос, а также мое решение).
По сути, я удалил первое поле (выполнив карту), которое было названием устройства, поскольку оно нам не нужно, а затем keyBy на отметке времени (из предыдущего этапа), оконные события в падающее окно и затем просто сложите 2-е поле (на основе индекса 1, 0), которое является средним показателем предыдущего этапа.
val avgStream = stream
.map(r => (r._2, r._3))
.keyBy(0)
.timeWindowAll(Time.minutes(1))
.sum(1)
.print()