получить ошибку при использовании таблицы (поток данных) в Flink - PullRequest
0 голосов
/ 01 октября 2019

Я использую Flink 1.9.0 и не могу импортировать или получить файлы таблиц

Я пытался импортировать различные SBT, связанные с ним

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val tempSource = CsvTableSource.builder()
      .path("/home/amulya/Desktop/csvForBroadcast/CSV.csv")
      .fieldDelimiter(",")
      .field("locationID", Types.STRING())
      .field("temp", Types.DOUBLE())
      .build()

    tEnv.registerTableSource("Temperatures", tempSource)

    val askTable = tEnv
      .scan("Temperatures")
      .where(" 'Temperature >= 50")
      .select("'locationID, 'temp")

    val stream = tEnv.toAppendStream[Events](askTable)
      .print()
    env.execute()


  }
  case class Events(locationID: String, temp: Long)
}


У меня естьпростые данные в формате CSV: -

locationID,temp
"1",25
"2",25
"3",35
"4",45
"5",55

Это ошибка: -

Error:scalac: missing or invalid dependency detected while loading class file 'ScalaCaseClassSerializer.class'.
Could not access type SelfResolvingTypeSerializer in object org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'ScalaCaseClassSerializer.class' was compiled against an incompatible version of org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.

Я пытаюсь выполнить CEP на этих базовых данных, чтобы начать работу с apacheфлинк, любая помощь будет высоко оценена

1 Ответ

0 голосов
/ 01 октября 2019

Попробуйте flink-table-api-java-bridge.

В настоящее время модуль моста scala обеспечивает только базовое преобразование между таблицей и набором данных / потоком данных.

...