Flink Streaming Кафка Источник в Кафку Раковина - PullRequest
0 голосов
/ 11 марта 2020

Я пытаюсь написать потоковый код Flink в Scala для чтения из Kafka topi c и после выполнения некоторой операции над сообщением записать данные обратно в Kafka Topi c. Я использую Flink Table API.

Код выполняется без каких-либо исключений, но в Sink Topi c.

не было найдено ни одного сообщения. Аналогичный код работает нормально при использовании MySQL в качестве приемника.

Не в состоянии понять, в чем проблема, код довольно прост в соответствии с документацией Flink 1.9.

Пожалуйста, помогите мне найти ключ к отладке этой проблемы.

Подробности:

Версия Flink - 1,9

Зависимость - SBT

Версия Kafka - 2,1

Разъем Kafka - flink-connector-kafka-0.10% 1.9.1

Код :

    package com.flink

    // Import Packages
    import org.apache.flink.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.java._
    import scala.collection.JavaConverters._
    import org.apache.flink.table.descriptors.Json
    import org.apache.flink.table.descriptors.Kafka
    import org.apache.flink.table.sinks.CsvTableSink
    import org.apache.flink.table.descriptors.Schema
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.streaming.connectors.kafka._
    import org.apache.flink.table.api.{Table, TableEnvironment}
    import org.apache.flink.streaming.api.datastream.DataStream
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.table.api.java.StreamTableEnvironment
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

    object kafkaToKafka2 {
      def main(args: Array[String]): Unit = {

        // Setting Streaming Environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.enableCheckpointing(5000)

        // Define Table environment
        val tableEnv = StreamTableEnvironment.create(env)

        val schema = new Schema()
          .field("userId",Types.STRING)
          .field("jobTitleName", Types.STRING)
          .field("firstName", Types.STRING)
          .field("lastName", Types.STRING)

        val sinkSchema = new Schema()
          .field("user_id",Types.STRING)
          .field("job_name", Types.STRING)
          .field("name", Types.STRING)

        // Reading from Kafka
        tableEnv.connect(
          new Kafka()
            .version("0.10")
            .topic("sourceTopic")
            .property("bootstrap.servers", "localhost:9092") // local
            .property("group.id", "poc1234")
            .startFromEarliest()
          //.startFromLatest()
        )
          .withFormat(
            new Json()
              .failOnMissingField(true)
              .deriveSchema()
          )
          .withSchema(schema)
          .inAppendMode()
          .registerTableSource("json_tab")

        // Writing to Kafka
        tableEnv.connect(
          new Kafka()
            .version("0.10")
            .topic("myTopic2")
            .property("bootstrap.servers", "localhost:9092")   // local
            .property("group.id", "poc")
            .sinkPartitionerRoundRobin()

        )
          .withFormat(
            new Json()
              .failOnMissingField(false)
              .deriveSchema()
          )
          .withSchema(sinkSchema)
          .inAppendMode()
          .registerTableSink("json_sink_tab")

            // SQL query where we can do transformation using ANSI SQL
            val sql = "SELECT userId as user_id, jobTitleName as job_name, CONCAT(firstName, ' ', lastName) as name " +
              "FROM json_tab " +
              "WHERE userId <> '' ";

            // Executing SQL query
            val result:Table = tableEnv.sqlQuery(sql)

        result.insertInto("json_sink_tab");

        // Printing Schema for checking purpose
        result.printSchema()
        System.out.println("======================= " + result.toString)

        env.execute("kafkaJsonJob")
      }
    }
...