Apache Результат запроса Flink Table в виде строковых значений - PullRequest
0 голосов
/ 29 мая 2020

Я пишу запрос из api таблицы flink для получения записи. Затем проверьте, была ли найдена запись, и если да, получите строковое значение каждого из значений столбца записи.

ie

users: 
|id | name | phone |
|---|------|-------|
| 01| sam  | 23354 |
| 02| jake | 23352 |
| 03| kim  | 23351 |

Проблема в том, что flink возвращает только таблицу из запроса, поэтому я не могу 1: проверить, найдена ли запись и 2: получить отдельные значения найденных значения записи

sudo code:

foundRecord = find record by phone
  if foundRecord {
    create new instance of Visitor
    Visitor.name = foundRecord.name
    Visitor.id = foundRecord.id
  } else {
    throw exception
  }

Код ниже, рекомендованный flink docs, дает мне таблицу, но не уверен, как реализовать приведенный выше код sudo, поскольку он возвращается как другая таблица и Мне нужны фактические значения записи.

Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));

Flink Docs для ссылки: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression -syntax

1 Ответ

0 голосов
/ 30 мая 2020

Чтобы знать, что соответствующая запись не может быть найдена, ввод должен быть ограничен - поэтому мы будем использовать BatchTableEnvironment, а не StreamTableEnvironment. (При потоковом вводе соответствующая запись может в конечном итоге прийти и удовлетворить запрос. Только при пакетном вводе мы можем доказать отсутствие соответствия.)

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector

class MissingResultException() extends Exception {}

object Phone {
  case class Visitor(name: String, id: String)

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)

    val rawInput = env.fromElements(
      ("01", "sam", "23354"),
      ("02", "jake", "23352"),
      ("03", "kim", "23351"))

    val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
    tableEnv.registerTable("events", events)
    val resultTable = tableEnv
      .from("events")
      .select('id, 'name, 'phone)
      .where("phone === 'missing'")

    val results = resultTable.toDataSet[Row]

    results
      .map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
      .print

    val count: DataSet[Long] = env.fromElements(results.count())

    count
      .flatMap(new FlatMapFunction[Long, Collector[Long]]{

        override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
          if (x == 0L) {
            throw new MissingResultException
          }
        }})

      .print()
  }
}

Подход, который я использовал для определения того, что набор результатов является empty кажется чем-то вроде взлома, но я не мог придумать ничего лучше. Обратите внимание, что print() в самом конце необходим, хотя печатать нечего, потому что любые вычисления, которые в конечном итоге не передаются в приемник, будут оптимизированы, а не выполнены.

...