Flink DataSource Iterate - PullRequest
       72

Flink DataSource Iterate

0 голосов
/ 20 марта 2020

Я пытаюсь перебрать источник данных:


     val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)

      val job = Job.getInstance
      FileInputFormat.addInputPath(
        job,
        new Path("file.parquet.gz")
      )

      val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
        new HadoopInputFormat(
          new AvroParquetInputFormat[GenericRecord],
          classOf[Void],
          classOf[GenericRecord],
          job
        )
       val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)

Когда я делаю data.print, я вижу данные в кортеже.

Но когда я делаю:


    data.map
     {
       res =>
         println("!!!!!!!!!!!111")
         println( res.f1)
     }

Ничего не печатается.

Я хочу перебрать источник данных и извлечь GenericRecord. Пожалуйста, помогите мне.

Ответы [ 2 ]

0 голосов
/ 20 марта 2020

Вы можете использовать data.collect, а затем использовать: data.iterator (). Next () для итерации

0 голосов
/ 20 марта 2020

Чтобы выполнить пакетную программу Flink без вызова print или collect, вам необходимо вызвать env.execute(). Только этот вызов вызовет выполнение программы при отсутствии вышеупомянутых вызовов API.

...