Как позволить Flink сбрасывать последнюю строку, когда производитель (kafka) не производит новую строку - PullRequest
0 голосов
/ 03 апреля 2019

, когда моя программа Flink находится в режиме времени события, приемник не получит последнюю строку (скажем, строка A).Если я введу новую строку (строку B) для Flink, я получу строку A, но все равно не могу получить строку b.

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")

    val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)

    val stream: DataStream[String] = env.addSource(consumer).setParallelism(1)

    stream.map { m =>
      val result = JSON.parseFull(m).asInstanceOf[Some[Map[String, Any]]].get
      val msg = result("message").asInstanceOf[String]
      val num = parseMessage(msg)
      val key = s"${num.zoneId} ${num.subZoneId}"
      (key, num, num.onlineNum)
    }.filter { data =>
      data._2.subZoneId == 301 && data._2.zoneId == 5002
    }.assignTimestampsAndWatermarks(new MyTimestampExtractor()).keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
        .allowedLateness(Time.minutes(1))
      .maxBy(2).addSink { v =>
      System.out.println(s"${v._2.time} ${v._1}: ${v._2.onlineNum} ")
    }
class MyTimestampExtractor() extends AscendingTimestampExtractor[(String, OnlineNum, Int)](){
  val byMinute = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:SS")
  override def extractAscendingTimestamp(element: (String, OnlineNum, Int)): Long = {
    val dateTimeString =  element._2.date + " " + element._2.time
    val c1 = byMinute.parse(dateTimeString).getTime
    if ( element._2.time.contains("22:59") && element._2.subZoneId == 301){
      //System.out.println(s"${element._2.time} ${element._1}: ${element._2.onlineNum} ")
      // System.out.println(s"${element._2.time} ${c1 - getCurrentWatermark.getTimestamp}")
    }

    // System.out.println(s"${element._2.time} ${c1} ${c1 - getCurrentWatermark.getTimestamp}")
    return c1
  }
}

образец данных:

01:01:14 5002 301: 29 
01:01:36 5002 301: 27 
01:02:05 5002 301: 27 
01:02:31 5002 301: 29 
01:03:02 5002 301: 29 
01:03:50 5002 301: 29 
01:04:52 5002 301: 29 
01:07:24 5002 301: 26 
01:09:28 5002 301: 21 
01:11:04 5002 301: 22 
01:12:11 5002 301: 24 
01:13:54 5002 301: 23 
01:15:13 5002 301: 22 
01:16:04 5002 301: 19 (I can not get this line )

Затем я нажимаю новую строку на Flink (через kafka)

01:17:28 5002 301: 15 

Я получу 01:16:04 5002 301: 19, но 01:17:28 5002 301: 15 может удерживаться во Flink.

Ответы [ 2 ]

1 голос
/ 05 апреля 2019

это происходит потому, что это время события и метка времени события используется для измерения потока времени для окон.

В таком случае, когда в окне только одно событие, Flink не знает, что окно должно быть опущено. По этой причине, когда вы добавляете следующее событие, предыдущее окно закрывается и элементы генерируются (в вашем случае 19), но затем снова создается следующее окно (в вашем случае 15).

Вероятно, лучшая идея в таком случае - добавить пользовательский ProcessingTimeTrigger, который в основном позволит вам запустить окно через некоторое время, независимо от того, протекают ли события или нет. Вы можете найти информацию о Trigger в документации .

0 голосов
/ 24 апреля 2019

Какое окончательное решение, пожалуйста? Я также столкнулся с подобной ситуацией, которая может быть решена с помощью нового Watermark (System.CurrtTimeMillis ()), но, похоже, он не соответствует цели Watermark. Разве это не распространенная проблема, или разработчики приложений сознательно игнорируют ее, а сообщества игнорируют ее?

Почему бы не вовремя, когда я использовал сообщение kafka, используя потоковую потоковую группу sql по TUMBLE (rowtime)?

...