Я хотел бы создать DataSet
(или DataStream
) из коллекции классов дел, содержащих Option
значения.В созданной таблице столбцы, полученные из Option
значений, должны содержать либо NULL, либо фактическое значение примитива.
Это то, что я пробовал:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
object OptionExample {
case class Event(time: Timestamp, id: String, value: Option[Int])
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val data = env.fromCollection(Seq(
Event(Timestamp.valueOf("2018-01-01 00:01:00"), "a", Some(3)),
Event(Timestamp.valueOf("2018-01-01 00:03:00"), "a", None),
Event(Timestamp.valueOf("2018-01-01 00:03:00"), "b", Some(7)),
Event(Timestamp.valueOf("2018-01-01 00:02:00"), "a", Some(5))
))
val table = tEnv.fromDataSet(data)
table.printSchema()
// root
// |-- time: Timestamp
// |-- id: String
// |-- value: Option[Integer]
val result = table
.groupBy('id)
.select('id, 'value.avg as 'averageValue)
// Print results
val ds: DataSet[Row] = result.toDataSet
ds.print()
}
}
Но это вызывает исключение в агрегацииpart ...
org.apache.flink.table.api.ValidationException: выражение avg ('значение) не выполнено при проверке ввода: для avg требуются числовые типы, здесь можно получить Option [Integer]
... поэтому при таком подходе Option
не преобразуется в числовой тип с NULL, как описано выше.
Как этого добиться с помощью Flink?
(Я из Apache Spark, там есть наборы данных, созданные из классов дел с опциями. Я хотел бы добиться чего-то похожего с Flink)