Фильтрация потока данных Flink на необязательный подобъект в Scala - PullRequest
0 голосов
/ 01 апреля 2020

Мы используем Flink в Scala для маршрутизации и преобразования событий Protobuf (используя scalapb) в нашем аналитическом конвейере. У меня есть поток данных «PlayStreams» с этой схемой:

message PlayStream {
  optional PlayerEvent player_event = 1;
  optional BlockAccountIPEvent block_account_ip_event = 2;
}

Полученный сгенерированный класс case имеет playerEvent член типа сигнатура Option[PlayerEvent].

Я хочу преобразовать поток данных в просто PlayerEvents, отфильтровывая все, у которых их нет. Я новичок в Scala, поэтому я не уверен, как это сделать идиоматически. То, что у меня в настоящее время работает нормально:

   // in main()
   getDataStream(name, env, config.get("KafkaSource"))
      .keyBy[String](PlayStreamFunctions.key(_))
      .map{ _.getPlayerEvent }
      .filter(filterDefaultPlayerEvents(_))


  def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
    playerEvent match {
      case PlayerEvent.defaultInstance => false
      case _ => true
    }
  }

Это работает, потому что getPlayerEvent в сгенерированном классе просто playerEvent.getOrElse(PlayerEvent.defaultInstance), и мы не используем экземпляр по умолчанию для чего-либо. Тем не менее, кажется странным создание группы ссылок на defaultInstance только для немедленной фильтрации их на следующем шаге. Есть ли способ избежать этого, которого я не вижу?

1 Ответ

0 голосов
/ 02 апреля 2020

Хотелось уточнить, что я поставил этот вопрос под Flink, так как все функции карты являются реализациями, определенными Flink c. Я понял, что flatMap был доступен, и, учитывая, что операции с картами более идиоматичны c, чем сопоставление с образцом для Options , я пошел с этой реализацией:

  getDataStream(name, env, config.get("KafkaSource"))
      .keyBy[String](PlayStreamFunctions.key(_))
      .flatMap{ _.playerEvent.toList }
      .flatMap(toFlatPlayerEvent(_))

, начиная с toList возвращает пустой список, если Option не существует, или унарный список со значением, если он существует, плоское сопоставление между ними решает мою проблему.

...