Как мне управлять функцией MapWithState? - PullRequest
0 голосов
/ 01 июля 2018

У меня есть работа с потоковым воспроизведением, коды там внизу:

val filterActions = userActions.filter(Utils.filterPageType)

val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))

val finalActions = parseAction.filter(record => record.get("invalid") == None)

val userModels = finalActions.map(record => (record("deviceid"), record)).mapWithState(StateSpec.function(stateUpdateFunction))

, но все функции могут скомпилироваться плавно, кроме функции mapWithState, тип возврата ParseOperation.parseMatch(categoryMap, record) равен ListBuffer[Map[String, Any]], ошибка, как показано ниже:

[INFO] Compiling 9 source files to /Users/spare/project/campaign-project/stream-official-mall/target/classes at 1530404002409

[ERROR] /Users/spare/project/campaign-project/stream-official-mall/src/main/scala/com/shopee/mall/data/OfficialMallTracker.scala:77: error: overloaded method value function with alternatives:
[ERROR]   [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function3[KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR]   [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function4[org.apache.spark.streaming.Time,KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],org.apache.spark.api.java.Optional[MappedType]])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR]   [KeyType, ValueType, StateType, MappedType](mappingFunction: (KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => MappedType)org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR]   [KeyType, ValueType, StateType, MappedType](mappingFunction: (org.apache.spark.streaming.Time, KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => Option[MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType]

[ERROR]  cannot be applied to ((Any, Map[String,Any], org.apache.spark.streaming.State[Map[String,Any]]) => Some[Map[String,Any]])
[ERROR]     val userModels = finalActions.map(record => (record("deviceid"), record)).mapWithState(StateSpec.function(stateUpdateFunction))
[ERROR]                                                                                                      ^

[ERROR] one error found

что вызвало проблему? Как мне изменить код?

1 Ответ

0 голосов
/ 01 июля 2018

Я исправил это, это вызвало StateSpec.function (stateUpdateFunction)) требовал, чтобы тип входного параметра был Map [String, Any], прежде чем вызывать его, я использовал функцию map, код приведен ниже:

val finalActions = parseAction.filter(record => record.get("invalid") == None).map(Utils.parseFinalRecord)
val parseFinalRecord = (record: Map[String, Any]) => {
val recordMap = collection.mutable.Map(record.toSeq: _*)
logger.info(s"recordMap: ${recordMap}")
recordMap.toMap

}

это работает!

...