Flink 1.5 DataStream: фильтрация дубликатов - PullRequest
0 голосов
/ 01 июля 2018

У меня есть несвязанный DataStream, который представляет дружбу в социальной сети. Эти дружеские отношения могут быть двунаправленными и поэтому появляются в потоке дважды.

Структура данных: временная метка | пользователь1 | пользователь2. Например:

2010-03-09T02:51:11.571+0000|143|1219
2010-03-09T06:08:51.942+0000|1242|4624
2010-03-09T08:24:03.773+0000|2191|4986
2010-03-09T09:37:09.788+0000|459|4644

Я хочу удалить двунаправленную дружбу, чтобы считать их только один раз. На практике я хотел бы фильтровать дубликаты. Я нашел решение здесь

Моя функция фильтра выглядит так:

   def filter(ds: DataStream[String]): DataStream[(String, String, String)] = {

    val res = data.
      mapWith(line => {
        val str = line.split("\\|")
        if (str(1).toLong > str(2).toLong)
          (str(0), str(1), str(2))
        else
           (str(0), str(2), str(1))
      })
       .keyBy(tuple => (tuple._2, tuple._3))
      .flatMap(new FilterFunction())

    res
  }

И я реализовал свою функцию RichFlatMapFunction следующим образом:

class FilterFunction extends RichFlatMapFunction[(String, String, String), (String, String, String)] {

  private var seen: ValueState[Boolean] = _

  override def flatMap(value: (String, String, String), out: 
Collector[(String, String, String)]): Unit = {

     if (!seen.value() || seen.value() == null) {
       seen.update(true)
       out.collect(value)
     }
   }

  override def open(parameters: Configuration): Unit = {
     seen = getRuntimeContext.getState(
       new ValueStateDescriptor("seen", classOf[Boolean])
     )
   }
}

Однако, когда я печатаю, я получаю случайные результаты. Я попытался выполнить подсчет за временной интервал в 1 год:

val da1 = filter(data)
  .mapWith(tuple => Parser.parseUserConnection(tuple).get)
  .assignAscendingTimestamps(connection => connection.timestamp.getMillis)
  .mapWith(connection => (connection, 1))
  .timeWindowAll(Time.days(365))
  .sum(1)
  .mapWith(tuple => tuple._2)
  .print()

Моя консоль печатает в первый раз:

1> 33735

Тогда:

1> 10658
2> 33735

и для последующего выполнения разные результаты (всего 33735 кажется стабильным). Я не могу понять это странное поведение.

1 Ответ

0 голосов
/ 01 июля 2018

Трудно понять, что вас удивляет. Но общая методика отладки такого приложения - распечатать результаты различных этапов конвейера, чтобы увидеть, в какой момент результаты становятся странными. Или отладьте задание в IDE и выполните его.

...