Flink с Guava кешем - реализация ProcessFunction не сериализуема - PullRequest
0 голосов
/ 02 апреля 2019

Я реализовал ProcessFunction, которая использует кеш Guava для фильтрации потока входящих событий.Код выглядит следующим образом:

object myJob {
 private def updateCache(cacheObject, someValue) = {}
 private def getCacheValue(cacheObject, someKey) = {}

 override def run(params, executionEnv) = {
  val inputStream = executionEnv.stream

  val c = CacheBuilder.newBuilder()

  val outStream = inputStream.process(new ProcessFunction() { 
    updateCache()
    getCacheValue} 
    )
 }
}

При отправке работы во Flink я получаю следующую ошибку:

Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the ProcessFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:666)
at org.apache.flink.streaming.api.scala.DataStream.process(DataStream.scala:686)

Есть идеи, что я делаю неправильно?Как я могу устранить эту ошибку сериализации?

1 Ответ

0 голосов
/ 02 апреля 2019

Ошибка говорит, в основном, что вы зависите от объекта, который не сериализуем для Flink.В случае, который Вы показали, пометка поля с загрузчиком как ленивая должна решить проблему:

   lazy val c = CacheBuilder.newBuilder()

Как правило, в этом случае Вы должны обратиться к документации Flink ,который объясняет проблему

...