Непрерывная обработка паркетных файлов как потоков данных в API DataStream от Flink - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть паркетный файл на HDFS.Ежедневно перезаписывается новым.Моя цель состоит в том, чтобы непрерывно выдавать этот файл партера - , когда он изменяется - в виде DataStream в задании Flink с использованием API DataStream.Конечная цель - использовать содержимое файла в состоянии широковещания, но это выходит за рамки этого вопроса.

  1. Для обработки файла непрерывно , это очень полезноAPI: Источники данных об источниках данных.В частности, FileProcessingMode.PROCESS_CONTINUOUSLY : это именно то, что мне нужно.Это работает для чтения / мониторинга текстовых файлов, без проблем, но не для паркетных файлов:
// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)
Для обработки паркетных файлов я могу использовать Форматы ввода Hadoop с использованием этого API: using-hadoop-inputformats .Однако в этом API нет параметра FileProcessingMode, и он обрабатывает файл только один раз:
// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
  // process the record here ...
}

Я хотел бы как-то объединить два API, чтобы непрерывно обрабатывать файлы Parquet через API DataStream.Кто-нибудь из вас пробовал что-то подобное?

1 Ответ

0 голосов
/ 06 февраля 2019

После просмотра кода Флинка, похоже, что эти два APIS относительно различны, и объединить их вместе не представляется возможным.

Другой подход, который я подробно опишу здесь, заключается в определении вашего собственногоSourceFunction, которая будет периодически читать файл:

class ParquetSourceFunction extends SourceFunction[Int] {
  private var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
    while (isRunning) {
      val path = new Path("path_to_parquet_file")
      val conf = new Configuration()

      val readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER)
      val metadata = readFooter.getFileMetaData
      val schema = metadata.getSchema
      val parquetFileReader = new ParquetFileReader(conf, metadata, path, readFooter.getBlocks, schema.getColumns)
      var pages: PageReadStore = null
      try {
        while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
          val rows = pages.getRowCount
          val columnIO = new ColumnIOFactory().getColumnIO(schema)
          val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
          (0L until rows).foreach { _ =>
            val group = recordReader.read()
            val my_integer = group.getInteger("field_name", 0)
            ctx.collect(my_integer)
          }
        }
      }

      // do whatever logic suits you to stop "watching" the file
      Thread.sleep(60000)
    }
  }

  override def cancel(): Unit = isRunning = false
}

Затем используйте streamExecutionEnvironment для регистрации этого источника:

val dataStream: DataStream[Int] = streamExecutionEnvironment.addSource(new ParquetProtoSourceFunction)
// do what you want with your new datastream
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...