Как создать столбцы таблицы Flink NULLable из классов случаев Scala, которые содержат типы Option - PullRequest
0 голосов
/ 25 октября 2018

Я хотел бы создать DataSet (или DataStream) из коллекции классов дел, содержащих Option значения.В созданной таблице столбцы, полученные из Option значений, должны содержать либо NULL, либо фактическое значение примитива.

Это то, что я пробовал:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object OptionExample {
  case class Event(time: Timestamp, id: String, value: Option[Int])

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val data = env.fromCollection(Seq(
      Event(Timestamp.valueOf("2018-01-01 00:01:00"), "a", Some(3)),
      Event(Timestamp.valueOf("2018-01-01 00:03:00"), "a", None),
      Event(Timestamp.valueOf("2018-01-01 00:03:00"), "b", Some(7)),
      Event(Timestamp.valueOf("2018-01-01 00:02:00"), "a", Some(5))
    ))

    val table = tEnv.fromDataSet(data)
    table.printSchema()
//    root
//    |-- time: Timestamp
//    |-- id: String
//    |-- value: Option[Integer]

    val result = table
      .groupBy('id)
      .select('id, 'value.avg as 'averageValue)

    // Print results
    val ds: DataSet[Row] = result.toDataSet
    ds.print()
  }
}

Но это вызывает исключение в агрегацииpart ...

org.apache.flink.table.api.ValidationException: выражение avg ('значение) не выполнено при проверке ввода: для avg требуются числовые типы, здесь можно получить Option [Integer]

... поэтому при таком подходе Option не преобразуется в числовой тип с NULL, как описано выше.

Как этого добиться с помощью Flink?

(Я из Apache Spark, там есть наборы данных, созданные из классов дел с опциями. Я хотел бы добиться чего-то похожего с Flink)

...