У меня есть работа с потоковым воспроизведением, коды там внизу:
val filterActions = userActions.filter(Utils.filterPageType)
val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))
val finalActions = parseAction.filter(record => record.get("invalid") == None)
val userModels = finalActions.map(record => (record("deviceid"), record)).mapWithState(StateSpec.function(stateUpdateFunction))
, но все функции могут скомпилироваться плавно, кроме функции mapWithState
, тип возврата ParseOperation.parseMatch(categoryMap, record)
равен ListBuffer[Map[String, Any]]
, ошибка, как показано ниже:
[INFO] Compiling 9 source files to /Users/spare/project/campaign-project/stream-official-mall/target/classes at 1530404002409
[ERROR] /Users/spare/project/campaign-project/stream-official-mall/src/main/scala/com/shopee/mall/data/OfficialMallTracker.scala:77: error: overloaded method value function with alternatives:
[ERROR] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function3[KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function4[org.apache.spark.streaming.Time,KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],org.apache.spark.api.java.Optional[MappedType]])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR] [KeyType, ValueType, StateType, MappedType](mappingFunction: (KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => MappedType)org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[ERROR] [KeyType, ValueType, StateType, MappedType](mappingFunction: (org.apache.spark.streaming.Time, KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => Option[MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType]
[ERROR] cannot be applied to ((Any, Map[String,Any], org.apache.spark.streaming.State[Map[String,Any]]) => Some[Map[String,Any]])
[ERROR] val userModels = finalActions.map(record => (record("deviceid"), record)).mapWithState(StateSpec.function(stateUpdateFunction))
[ERROR] ^
[ERROR] one error found
что вызвало проблему? Как мне изменить код?