Найти Top K элементов в WindowedStream - Flink - PullRequest
0 голосов
/ 02 мая 2019

Я довольно новичок в мире потоков и с первой попытки столкнулся с некоторыми проблемами.

Я хотел бы найти элемент Top K в window: WindowdStream ниже.Я пытался реализовать свою собственную функцию, но не уверен, как она на самом деле работает.

Кажется, что ничего не печатается

Может, у вас есть подсказка?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_city}",
          record.get
        )
      }

val topLocations = parsedStream
      .keyBy(_._1)
      .timeWindow(Time.days(7))
      .process(new SortByCountFunction)

SortByCountFunction

class SortByCountFunction
    extends ProcessWindowFunction[(String, Response), MeetUpLocationWindow, String, TimeWindow] {

    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Response)],
                         out: Collector[MeetUpLocationWindow]): Unit = {

      val count: Map[String, Iterable[(String, Response)]] = elements.groupBy(_._1)

      val locAndCount: Seq[MeetUpLocation] = count.toList.map(tmp => {
        val location: String = tmp._1
        val meetUpList: Iterable[(String, Response)] = tmp._2
        MeetUpLocation(location, tmp._2.size, meetUpList.map(_._2).toList)
      })

      val output: List[MeetUpLocation] = locAndCount.sortBy(tup => tup.count).take(20).toList

      val windowEnd = context.window.getEnd

      out.collect(MeetUpLocationWindow(windowEnd, output))
    }
  }

case class MeetUpLocationWindow(endTs: Long, locations: List[MeetUpLocation])

case class MeetUpLocation(location: String, count: Int, meetUps: List[Response])

1 Ответ

0 голосов
/ 02 мая 2019

Когда заданию Flink DataStream не удается выдать какой-либо вывод, обычные подозрения:

  • задание не вызывает execute () в StreamExecutionEnvironment (например, env.execute())
  • к заданию не подключен приемник (например, TopLocations.print())
  • задание предназначено для использования времени события, но водяные знаки настроены неправильно или источник не работаетводяные знаки от продвижения
  • задание записывает в журналы диспетчера задач, но никто не заметил
  • сериализатор для типа вывода не выводит

Без дополнительной информации этоТрудно догадаться, что из этого может быть проблемой в этом случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...