Я реализовал ProcessFunction, которая использует кеш Guava для фильтрации потока входящих событий.Код выглядит следующим образом:
object myJob {
private def updateCache(cacheObject, someValue) = {}
private def getCacheValue(cacheObject, someKey) = {}
override def run(params, executionEnv) = {
val inputStream = executionEnv.stream
val c = CacheBuilder.newBuilder()
val outStream = inputStream.process(new ProcessFunction() {
updateCache()
getCacheValue}
)
}
}
При отправке работы во Flink я получаю следующую ошибку:
Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the ProcessFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:666)
at org.apache.flink.streaming.api.scala.DataStream.process(DataStream.scala:686)
Есть идеи, что я делаю неправильно?Как я могу устранить эту ошибку сериализации?