Использование Flink для получения счета в окне с ключом - PullRequest
0 голосов
/ 04 октября 2018

Я использую Flink через интерфейс Scala для обработки некоторых данных.У меня есть некоторые пользовательские данные, которые поступают в виде кортежей:

(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")

Я хочу набрать пользователем ключ, создать окно и затем посчитать, сколько раз пользователь просматривал определенный фильм в этом окне, поэтомучто я получаю карту от каждого фильма до количества просмотров для каждого пользователя.Например, для user1 правильный вывод - Map("titanic" -> 2, "batman" -> 1).Я знаю, что первая часть моего кода должна выглядеть примерно так:

keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))

Но я не знаю, как выполнить дальнейшую агрегацию в окне, чтобы я в итогес Картой просмотров считается для каждого пользователя / окна.Я попытался написать свою собственную функцию AggregateFunction, которая собирает эти значения в изменчивую карту, но, к сожалению, изменяемая карта не может быть сериализована, поэтому происходит сбой.

Как я могу это сделать?

1 Ответ

0 голосов
/ 06 октября 2018

Вы должны быть в состоянии решить проблему, используя AggregateFunction:

source
  .keyBy(0)
  .timeWindow(Time.seconds(10L))
  .aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {
    override def createAccumulator(): (String, Map[String, Int]) = ("", Map())

    override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
      val counter = accumulator._2.getOrElse(value._2, 0)
      (value._1, accumulator._2 + (value._2 -> (counter + 1)))
    }

    override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator

    override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
      (a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
    }
  })
...