У меня есть паркетный файл на HDFS.Ежедневно перезаписывается новым.Моя цель состоит в том, чтобы непрерывно выдавать этот файл партера - , когда он изменяется - в виде DataStream в задании Flink с использованием API DataStream.Конечная цель - использовать содержимое файла в состоянии широковещания, но это выходит за рамки этого вопроса.
- Для обработки файла непрерывно , это очень полезноAPI: Источники данных об источниках данных.В частности, FileProcessingMode.PROCESS_CONTINUOUSLY : это именно то, что мне нужно.Это работает для чтения / мониторинга текстовых файлов, без проблем, но не для паркетных файлов:
// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)
Для обработки
паркетных файлов я могу использовать
Форматы ввода Hadoop с использованием этого API:
using-hadoop-inputformats .Однако в этом API нет параметра FileProcessingMode, и он обрабатывает файл только один раз:
// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
// process the record here ...
}
Я хотел бы как-то объединить два API, чтобы непрерывно обрабатывать файлы Parquet через API DataStream.Кто-нибудь из вас пробовал что-то подобное?