Мы используем Flink в Scala для маршрутизации и преобразования событий Protobuf (используя scalapb) в нашем аналитическом конвейере. У меня есть поток данных «PlayStreams» с этой схемой:
message PlayStream {
optional PlayerEvent player_event = 1;
optional BlockAccountIPEvent block_account_ip_event = 2;
}
Полученный сгенерированный класс case имеет playerEvent
член типа сигнатура Option[PlayerEvent]
.
Я хочу преобразовать поток данных в просто PlayerEvents, отфильтровывая все, у которых их нет. Я новичок в Scala, поэтому я не уверен, как это сделать идиоматически. То, что у меня в настоящее время работает нормально:
// in main()
getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.map{ _.getPlayerEvent }
.filter(filterDefaultPlayerEvents(_))
def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
playerEvent match {
case PlayerEvent.defaultInstance => false
case _ => true
}
}
Это работает, потому что getPlayerEvent
в сгенерированном классе просто playerEvent.getOrElse(PlayerEvent.defaultInstance)
, и мы не используем экземпляр по умолчанию для чего-либо. Тем не менее, кажется странным создание группы ссылок на defaultInstance только для немедленной фильтрации их на следующем шаге. Есть ли способ избежать этого, которого я не вижу?