Как я могу использовать Apache Flink для чтения файла паркета в HDFS? - PullRequest
0 голосов
/ 23 октября 2018

Я нахожу только TextInputFormat и CsvInputFormat.Итак, как я могу использовать Apache Flink для чтения файла паркета в HDFS?

1 Ответ

0 голосов
/ 24 октября 2018

Хорошо.Я уже нашел способ чтения файла паркета в HDFS через Apache Flink.

  1. Вы должны добавить ниже зависимости в вашем pom.xml

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>1.10.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.1</version>
    </dependency>
    
  2. Создайте файл avsc для определения схемы.Опыт:

    {"namespace": "com.flinklearn.models",
     "type": "record",
     "name": "AvroTamAlert",
     "fields": [
        {"name": "raw_data", "type": ["string","null"]}
     ]
    }

Запустите "java -jar D: \ avro-tools-1.8.2.jar схема компиляции alert.avsc."для создания Java-класса и копирования AvroTamAlert.java в ваш проект.

Используйте AvroParquetInputFormat для чтения файла паркета в формате hdf:

class Main {
    def startApp(): Unit ={
        val env = ExecutionEnvironment.getExecutionEnvironment

        val job = Job.getInstance()

        val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
        FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/testpath"))

        val dataset = env.createInput(dIf)

        println(dataset.count())

        env.execute("start hdfs parquet test")
    }
}

object Main {
    def main(args:Array[String]):Unit = {
        new Main().startApp()
    }
}
...