[apache-flink] Вход GenericTypeInfo <Row>не может быть преобразован в таблицу.Пожалуйста, укажите тип ввода с RowTypeInfo - PullRequest
0 голосов
/ 06 июня 2018

@ Fabian Hueske

Я пытался указать такую ​​схему, когда читаю из kafka, и скрываю inputtream 2 таблицы.Но я получил исключение:

  • Исключение в потоке "main" org.apache.flink.table.api.TableException: вход GenericTypeInfo не может быть преобразован в таблицу.Пожалуйста, укажите тип ввода с RowTypeInfo

И код здесь:

    class WormholeDeserializationSchema(schema: String) extends KeyedDeserializationSchema[Row] {
  //var keyValueTopic:KeyValueTopic = _


  override def deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, partition: Int, offset: Long) = {
    val deserializationSchema = new SimpleStringSchema()
    val key = if (messageKey != null) deserializationSchema.deserialize(messageKey) else null
    val value = if (message != null) deserializationSchema.deserialize(message) else null
    val ums = UmsSchemaUtils.toUms(value)
    ums.payload_get.map(_.tuple).map(tuple => Row.of(tuple: _*)).head
  }

  override def isEndOfStream(nextElement: Row): Boolean = false

  override def getProducedType: TypeInformation[Row] = {
    val (fieldNameArray, fieldTypeArray) = getSchemaMap(schema)
    val types =new RowTypeInfo(fieldTypeArray,fieldNameArray)
    types
  }

  private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}



 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)

(исключение здесь)

1 Ответ

0 голосов
/ 11 июня 2018

Исключение вызвано ошибкой в ​​системе типов Flink (см. FLINK-9556 ).Информация о типе, указанная в схеме десериализации, не рассматривается в Scala API.

В качестве обходного пути вы всегда можете указать информацию о типе вручную с помощью:

env.addSource(myConsumer)(Types.ROW(...)) 
...