Flink: реализация RichSinkFunction не сериализуема - PullRequest
0 голосов
/ 22 мая 2019

Я пытаюсь передать поток в корзину s3 и реализует интерфейс Encoder.

case class Info(vecId: Long, bkCode: String, state: String)

class S3Encoder extends Encoder[Info] {

    private val gson = new Gson()

    override def encode(element: Info, stream: OutputStream): Unit = {

      val json = new util.HashMap[String, Any]()

      json.put("vecId", element.vecId)
      json.put("bk", element.bkCode)
      json.put("state", element.state)

//      println(gson.toJson(json))
      stream.write(gson.toJson(json).getBytes("UTF-8"))
      stream.write('\n')

    }

И я добавил приемник в

val sink: StreamingFileSink[Info] = StreamingFileSink
            .forRowFormat(new Path(s3_path), new S3Encoder)
            .build()

Но встретил ошибку non-serializable Кто-нибудь может пролить свет на это?Большое спасибо!

...