Я пытаюсь написать потоковый код Flink в Scala для чтения из Kafka topi c и после выполнения некоторой операции над сообщением записать данные обратно в Kafka Topi c. Я использую Flink Table API.
Код выполняется без каких-либо исключений, но в Sink Topi c.
не было найдено ни одного сообщения. Аналогичный код работает нормально при использовании MySQL в качестве приемника.
Не в состоянии понять, в чем проблема, код довольно прост в соответствии с документацией Flink 1.9.
Пожалуйста, помогите мне найти ключ к отладке этой проблемы.
Подробности:
Версия Flink - 1,9
Зависимость - SBT
Версия Kafka - 2,1
Разъем Kafka - flink-connector-kafka-0.10% 1.9.1
Код :
package com.flink
// Import Packages
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.java._
import scala.collection.JavaConverters._
import org.apache.flink.table.descriptors.Json
import org.apache.flink.table.descriptors.Kafka
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.descriptors.Schema
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
object kafkaToKafka2 {
def main(args: Array[String]): Unit = {
// Setting Streaming Environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
// Define Table environment
val tableEnv = StreamTableEnvironment.create(env)
val schema = new Schema()
.field("userId",Types.STRING)
.field("jobTitleName", Types.STRING)
.field("firstName", Types.STRING)
.field("lastName", Types.STRING)
val sinkSchema = new Schema()
.field("user_id",Types.STRING)
.field("job_name", Types.STRING)
.field("name", Types.STRING)
// Reading from Kafka
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("sourceTopic")
.property("bootstrap.servers", "localhost:9092") // local
.property("group.id", "poc1234")
.startFromEarliest()
//.startFromLatest()
)
.withFormat(
new Json()
.failOnMissingField(true)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("json_tab")
// Writing to Kafka
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("myTopic2")
.property("bootstrap.servers", "localhost:9092") // local
.property("group.id", "poc")
.sinkPartitionerRoundRobin()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(sinkSchema)
.inAppendMode()
.registerTableSink("json_sink_tab")
// SQL query where we can do transformation using ANSI SQL
val sql = "SELECT userId as user_id, jobTitleName as job_name, CONCAT(firstName, ' ', lastName) as name " +
"FROM json_tab " +
"WHERE userId <> '' ";
// Executing SQL query
val result:Table = tableEnv.sqlQuery(sql)
result.insertInto("json_sink_tab");
// Printing Schema for checking purpose
result.printSchema()
System.out.println("======================= " + result.toString)
env.execute("kafkaJsonJob")
}
}