- это модифицированная версия из примера подсчета слов с официального сайта 2. время события и прослушивание порта
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//listening to the port
val text = env.socketTextStream("localhost", 9999)
.assignAscendingTimestamps(item => {
val line = item.split(" ")
//simply print timestamp
println(line.apply(1))
line.apply(1).toLong*1000 - 3000
})
сделать преобразование ниже
// the process here
text.map { each_input =>
{
val line = each_input.split(" ")
(line.apply(0),1,line.apply(1))
}}
.process(new SimpleProcessFunc)
.print()
на самом деле лог c из функции процесса не большие изменения
val mark = context.timerService().currentWatermark()
val timestamp = context.timestamp()
//print some infomation
println(sdf.format(mark) + "===> watermark ===>" + mark)
println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
collector.collect(i)
Я использую cmd для отправки данных через сокет, но с консоли ide кажется странным, что то, как генерируется водяной знак, кажется, не логично c позади
1585977022
03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
2> (epoch,1,1585977022)
1585977034
04/04/2020 13:10:18===> watermark ===>1585977018999
04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
3> (montanin,1,1585977034)
1585977053
04/04/2020 13:10:30===> watermark ===>1585977030999
04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
4> (song,1,1585977053)