Scala Flink Объединение двух потоков не работает - PullRequest
0 голосов
/ 04 апреля 2020

Я хотел бы присоединиться к двум потокам от производителя кафки, но объединение не работает. Я использую AssignerWithPeriodicWatermark для определения моего присваивателя, и я пытаюсь соединить два потока, используя 3 минуты windows. Но я не получаю вывод. Я распечатал два потока, чтобы убедиться, что у них есть события, которые достаточно близки по времени.

object Job {

class Assigner extends AssignerWithPeriodicWatermarks[String] {
  // 1 s in ms
  val bound: Long = 1000
  // the maximum observed timestamp
  var maxTs: Long = Long.MinValue

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
   }

  override def extractTimestamp(r: String, previousTS: Long): Long = {    
    maxTs = Math.max(maxTs,previousTS)    
    previousTS
   }
}

 def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment//createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9093")

properties.setProperty("group.id", "test")
val consumerId = new FlinkKafkaConsumer[String]("topic_id", new SimpleStringSchema(), properties)

val streamId = env.addSource(consumerId).assignTimestampsAndWatermarks(new Assigner)

val streamIdParsed=streamId.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("m","w")) }


val consumerV = new FlinkKafkaConsumer[String]("topic_invoice", new SimpleStringSchema(), properties)


val streamV = env.addSource(consumerV).assignTimestampsAndWatermarks(new Assigner)

val streamVParsed = streamV.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("products")(0).toString().parseJson.asJsObject.getFields("id2", "id3")) }


    streamIdParsed.join(streamVParsed).where(_._1).equalTo(_._1).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(1))).apply { (e1, e2) => (e1._1,"test") }.print()
} }

Ответы [ 2 ]

0 голосов
/ 05 апреля 2020

Проблема в том, что вы не установили autoWatermarkInterval и используете PeriodicAssigner. Вам необходимо сделать следующее:

env.getConfig.setAutowatermarkInterval([someinterval])

Это должно решить проблему с водяными знаками, которые не генерируются.

0 голосов
/ 04 апреля 2020

Вещи, которые могут go ошибаться (до вас, чтобы проверить их, поскольку вы предоставили слишком скудную информацию, чтобы сузить круг вещей)

  1. Нет событий на Кафке ни на одном топи c
  2. Нет прогрессии водяного знака на обеих топиках c
  3. Ваши данные работают в минутах, а код работает в секундах
...