Относительно потока потока Flink к hdf - PullRequest
0 голосов
/ 07 января 2019

Я пишу код flink, в котором я читаю файл из локальной системы и записываю его в базу данных, используя "writeUsingOutputFormat".

Теперь мое требование - писать в hdfs вместо базы данных.

Не могли бы вы помочь мне, как я могу сделать в Flink.

Примечание: hdfs запущен на моей локальной машине.

Ответы [ 2 ]

0 голосов
/ 07 января 2019

Более новый потоковый приемник файлов , вероятно, является лучшим выбором, чем Bucketing Sink на данный момент. Это описание взято из примечаний к выпуску Flink 1.6 (обратите внимание, что поддержка S3 была добавлена ​​в Flink 1.7):

Новый StreamingFileSink - это единовременный приемник для записи в файловые системы, которые используют знания, полученные из предыдущий BucketingSink. Точно один раз поддерживается через интеграцию раковины с контрольно-пропускным механизмом Флинка. Новая раковина построен на собственной абстракции FileSystem от Flink и поддерживает локальную файловая система и HDFS, с планами поддержки S3 в ближайшем будущем [теперь включена в Flink 1.7]. Это выставляет подключаемые файлы каталогов и политик. Помимо Строковые форматы кодирования, новый StreamingFileSink поставляется с Подставка для паркета. Другие форматы массового кодирования, такие как ORC, могут быть легко добавляется с помощью открытых API.

0 голосов
/ 07 января 2019

Flink предоставляет HDFS-соединитель , который можно использовать для записи данных в любую файловую систему, поддерживаемую Hadoop Filesystem .

Предоставленный приемник - это приемник Bucketing, который разбивает поток данных на папки, содержащие циклические файлы. Поведение сегмента, а также запись можно настроить с помощью таких параметров, как batch size и batch roll over time interval

В документе Flink приведен следующий пример -

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);
...