Как использовать sql в потоке flink kafka? - PullRequest
0 голосов
/ 22 сентября 2019

Я загрузил таблицу правил в виде таблицы Flink из базы данных postgresql. Затем прочитал сообщение Кафки и классифицировал сообщение по этим правилам.код такой:

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.enableCheckpointing(5000)
    val stenv=StreamTableEnvironment.create(senv)
    val streamsource=senv.createInput(inputFormat)
    stenv.registerDataStream("rules",streamsource)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", KAFKA_BROKER)
    properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    properties.setProperty("group.id", TRANSACTION_GROUP)
    val fkp = new FlinkKafkaProducer010[String](TOPIC1, new SimpleStringSchema(), properties)
    val fkc = new FlinkKafkaConsumer010[String](TOPIC, new SimpleStringSchema(), properties)
    val stream = senv.addSource(fkc).setParallelism(3)
    val jsons = stream.map {
    {
      r => {
        val sub = JSON.parseObject(r.toString)
        val value = sub.getDouble("value")
        val time = sub.getLong("time")
        val tag = sub.getString("name")
        val error = sub.getString("error")
        val t = stenv.sqlQuery("select * from rules").where("nodeid=" + tag) //error is here
        //todo
        }
    } 

ошибка такая

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:686)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1143)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
    at cettest$.main(cettest.scala:63)
    at cettest.main(cettest.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
    ... 7 more

Я пробовал много способов решить проблему. Но не удалось!

1 Ответ

0 голосов
/ 22 сентября 2019

Добро пожаловать в переполнение стека!Было бы полезно, если бы вы могли перечислить ваши попытки до сих пор, но решение вашей проблемы выглядит довольно простым - похоже, что StreamTableEnvironmentImpl не расширяет черту Serializable: https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch12s08.html

Однако использование класса @Internal от Flink кажется неправильным.Я бы лучше создал свой собственный сериализуемый класс или, скорее всего, класс дел, который по умолчанию сериализуем.

Надеюсь, это поможет!

...