@ Fabian Hueske
Я пытался указать такую схему, когда читаю из kafka, и скрываю inputtream 2 таблицы.Но я получил исключение:
- Исключение в потоке "main" org.apache.flink.table.api.TableException: вход GenericTypeInfo не может быть преобразован в таблицу.Пожалуйста, укажите тип ввода с RowTypeInfo
И код здесь:
class WormholeDeserializationSchema(schema: String) extends KeyedDeserializationSchema[Row] {
//var keyValueTopic:KeyValueTopic = _
override def deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, partition: Int, offset: Long) = {
val deserializationSchema = new SimpleStringSchema()
val key = if (messageKey != null) deserializationSchema.deserialize(messageKey) else null
val value = if (message != null) deserializationSchema.deserialize(message) else null
val ums = UmsSchemaUtils.toUms(value)
ums.payload_get.map(_.tuple).map(tuple => Row.of(tuple: _*)).head
}
override def isEndOfStream(nextElement: Row): Boolean = false
override def getProducedType: TypeInformation[Row] = {
val (fieldNameArray, fieldTypeArray) = getSchemaMap(schema)
val types =new RowTypeInfo(fieldTypeArray,fieldNameArray)
types
}
private def getSchemaMap(jsonSchema: String) = {
val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
val fields = umsSchema.fields_get
val fieldNameList = ListBuffer.empty[String]
val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
fields.foreach {
field =>
fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`))
}
println(fieldNameList)
println(fieldTypeList)
(fieldNameList.toArray, fieldTypeList.toArray)
}
private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
umsFieldType match {
case STRING => Types.STRING
case INT => Types.INT
case LONG => Types.LONG
case FLOAT => Types.FLOAT
case DOUBLE => Types.DOUBLE
case BOOLEAN => Types.BOOLEAN
case DATE => Types.SQL_DATE
case DATETIME => Types.SQL_TIMESTAMP
case DECIMAL => Types.DECIMAL
}
}
}
val inputStream: DataStream[Row] = env.addSource(myConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)
(исключение здесь)