KafkaTableSink: как его использовать? - PullRequest
1 голос
/ 11 октября 2019

Это таблица:

val res: Table = tenv.sqlQuery(
 """
   |select event.ID,event.locationID, event.temp
   |from event
   |JOIN patt
   |ON event.ID = patt.ID
   |AND event.temp >= patt.temperature
   |""".stripMargin
)

Это схема, которую я хочу получить в результате:

 res.toAppendStream[Event].print("Alert for these location")

 case class Event(ID: Int, locationID: String, temp: Double)

Я хочу сделать Kafka010TableSink:

  val tableSink = new Kafka010TableSink("ask","Output", properties, new FlinkFixedPartitioner[])

Что происходит в схеме и схеме сериализации, я получаю сообщение об ошибке с FlinkFixedPartitioner.

...