Я - новый корд Scala, у меня есть функция flatMap, которая возвращает объект FlatMappedDStream, это потоковое задание, функция-дескриптор возвращает Map [String, Any], код приведен ниже:
val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))
функция def:
val parseMatch = (categoryMap: collection.Map[Int, String], record: Map[String, Any]) => {
record.get("operation").get.toString match {
case "view" => parseView(categoryMap, record)
case "impression" => parseRecord(record)
case "click" => parseRecord(record)
case _ => ListBuffer.apply(record)
}
}
функция parseMatch возвращает обработанные потоковые записи, тип которых Map [String, Any]. Теперь я хочу напечатать все результаты и вставить в новую функцию фильтра карты и функцию mapWithState, я пробую это, но это не работает.
неправильный код ниже:
val finalActions = parseAction.filter(record => record.get("invalid").get == None)
val userModels = finalActions.map(record => (record.asInstanceOf[Map[String, Any]].getOrElse("deviceid", ""), record))
.mapWithState(StateSpec.function(stateUpdateFunction))
Функция mapWithState:
val stateUpdateFunction = (deviceId: Any, newRecord: Option[Map[String, Any]], stateData: State[Map[String, Any]]) => {
XXXX
}
но функция фильтра и функция mapWithState не верны, как я могу это исправить?