Порождение водяных знаков Flink - PullRequest
0 голосов
/ 04 апреля 2020
  1. это модифицированная версия из примера подсчета слов с официального сайта 2. время события и прослушивание порта
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //listening to the port
    val text = env.socketTextStream("localhost", 9999)
      .assignAscendingTimestamps(item => {
        val line = item.split(" ")
        //simply print timestamp
        println(line.apply(1))
        line.apply(1).toLong*1000 - 3000
      })
сделать преобразование ниже
    // the process here
    text.map { each_input =>
    {
      val line = each_input.split(" ")
      (line.apply(0),1,line.apply(1))
    }}
        .process(new SimpleProcessFunc)
        .print()
на самом деле лог c из функции процесса не большие изменения
    val mark = context.timerService().currentWatermark()
    val timestamp = context.timestamp()
    //print some infomation
    println(sdf.format(mark) + "===> watermark ===>" + mark)
    println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
    collector.collect(i)
Я использую cmd для отправки данных через сокет, но с консоли ide кажется странным, что то, как генерируется водяной знак, кажется, не логично c позади
    1585977022
    03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
    04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
    2> (epoch,1,1585977022)
    1585977034
    04/04/2020 13:10:18===> watermark ===>1585977018999
    04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
    3> (montanin,1,1585977034)
    1585977053
    04/04/2020 13:10:30===> watermark ===>1585977030999
    04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
    4> (song,1,1585977053)

1 Ответ

0 голосов
/ 04 апреля 2020

Вот логика c за значениями водяного знака:

Исходный водяной знак имеет значение Long.MIN_VALUE, то есть -9223372036854775808.

Так получилось, что водяные знаки следуют за элементом потока, отметка времени которого использовалась в качестве основы для создания водяного знака. А водяные знаки делают заявление о полноте потока вверх через определенный момент времени. Таким образом, элемент потока в момент времени 1585977019000 предшествует водяному знаку для времени 1585977018999 (поскольку после водяного знака все еще может быть другой элемент потока для времени 1585977019000, было бы неправильно, чтобы этот водяной знак имел метку времени 1585977019000).

Генератор водяных знаков с восходящей отметкой времени представляет собой генератор периодических c водяных знаков, который по умолчанию будет генерировать новый водяной знак каждые 200 мсек c - но только в том случае, если водяной знак повышается.

Когда вы получаете доступ к текущему водяному знаку на одном входе ProcessFunction, вы получаете самый последний водяной знак, полученный этим экземпляром. Во время метода processElement() этот водяной знак еще не будет отражать то, что узнал генератор водяных знаков при обработке события, которое теперь передается в processElement() - что обновление водяного знака произойдет позже, после того, как таймер 200 mse c выключится.

Для получения дополнительной информации о водяных знаках вы также видите страницу с водяными знаками из курса Flink .

...