Чтобы знать, что соответствующая запись не может быть найдена, ввод должен быть ограничен - поэтому мы будем использовать BatchTableEnvironment
, а не StreamTableEnvironment
. (При потоковом вводе соответствующая запись может в конечном итоге прийти и удовлетворить запрос. Только при пакетном вводе мы можем доказать отсутствие соответствия.)
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector
class MissingResultException() extends Exception {}
object Phone {
case class Visitor(name: String, id: String)
@throws[Exception]
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val rawInput = env.fromElements(
("01", "sam", "23354"),
("02", "jake", "23352"),
("03", "kim", "23351"))
val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
tableEnv.registerTable("events", events)
val resultTable = tableEnv
.from("events")
.select('id, 'name, 'phone)
.where("phone === 'missing'")
val results = resultTable.toDataSet[Row]
results
.map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
.print
val count: DataSet[Long] = env.fromElements(results.count())
count
.flatMap(new FlatMapFunction[Long, Collector[Long]]{
override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
if (x == 0L) {
throw new MissingResultException
}
}})
.print()
}
}
Подход, который я использовал для определения того, что набор результатов является empty кажется чем-то вроде взлома, но я не мог придумать ничего лучше. Обратите внимание, что print()
в самом конце необходим, хотя печатать нечего, потому что любые вычисления, которые в конечном итоге не передаются в приемник, будут оптимизированы, а не выполнены.