Чтение файла последовательности Hadoop во Flink - PullRequest
0 голосов
/ 25 мая 2019

Как прочитать файл последовательности Hadoop во Flink?Я столкнулся с несколькими проблемами с помощью подхода ниже.

У меня есть:

DataSource<String> source = env.readFile(new SequenceFileInputFormat(config), filePath);

и

public static class SequenceFileInputFormat extends FileInputFormat<String> {
    ...
    @Override
    public void setFilePath(String filePath) {
        org.apache.hadoop.conf.Configuration config = HadoopUtils.getHadoopConfiguration(configuration);
        logger.info("Initializing:"+filePath);
        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(filePath);

        try {
            reader = new SequenceFile.Reader(hadoopPath.getFileSystem(config), hadoopPath, config);
            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
            value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), config);
        } catch (IOException e) {
            logger.error("sequence file creation failed.", e);
        }
    }

}

Одна из проблем: Можетне читается оболочка кода пользователя: SequenceFileInputFormat.

1 Ответ

1 голос
/ 25 мая 2019

Как только вы получите InputFormat, вы можете позвонить ExecutionEnvironment.createInput(<input format>), чтобы создать свой DataSource.

Для SequenceFile s тип данных всегда Tuple2<key, value>, поэтому вы должны использовать функцию карты для преобразования в любой тип, который вы пытаетесь прочитать.

Я использую этот код для чтения SequenceFile, который содержит Каскадные кортежи ...

Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(directory));
env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), Tuple.class, Tuple.class, job);
...