как исправить карту фильтра и функцию mapWithState - PullRequest
0 голосов
/ 28 июня 2018

Я - новый корд Scala, у меня есть функция flatMap, которая возвращает объект FlatMappedDStream, это потоковое задание, функция-дескриптор возвращает Map [String, Any], код приведен ниже:

val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))

функция def:

val parseMatch = (categoryMap: collection.Map[Int, String], record: Map[String, Any]) => {
record.get("operation").get.toString match {
  case "view" => parseView(categoryMap, record)
  case "impression" => parseRecord(record)
  case "click" => parseRecord(record)
  case _ => ListBuffer.apply(record)

}

}

функция parseMatch возвращает обработанные потоковые записи, тип которых Map [String, Any]. Теперь я хочу напечатать все результаты и вставить в новую функцию фильтра карты и функцию mapWithState, я пробую это, но это не работает. неправильный код ниже:

val finalActions = parseAction.filter(record => record.get("invalid").get == None)

val userModels = finalActions.map(record => (record.asInstanceOf[Map[String, Any]].getOrElse("deviceid", ""), record))
                       .mapWithState(StateSpec.function(stateUpdateFunction))

Функция mapWithState:

val stateUpdateFunction = (deviceId: Any, newRecord: Option[Map[String, Any]], stateData: State[Map[String, Any]]) => {
  XXXX

}

но функция фильтра и функция mapWithState не верны, как я могу это исправить?

1 Ответ

0 голосов
/ 30 июня 2018

Я исправил это, я изменяю свой возвращаемый тип с Map [String, Any] на ListBuffer [Map [String, Any]], он работает!

...