Файл последовательности, созданный из программы весенней загрузки java, не виден в hdfs - PullRequest
0 голосов
/ 09 февраля 2020

Я создаю сервис весенней загрузки для копирования файлов из локальной системы в hdfs в файл последовательности. У меня есть следующий код:

public static void copyLocalFileToHdfs2(String hdfsUri, Configuration conf, String localPath, String destDir, String fileName) throws Exception {
        String seqFile = "seqfile";

        try {
            FileSystem fs = FileSystem.get(URI.create(hdfsUri), conf);

            org.apache.hadoop.fs.Path hdfsWritePath = new org.apache.hadoop.fs.Path(destDir);
            if (!fs.exists(hdfsWritePath))
                fs.mkdirs(hdfsWritePath);

            org.apache.hadoop.fs.Path localFilePath = new org.apache.hadoop.fs.Path(localPath + "/" + fileName);
            org.apache.hadoop.fs.Path hdfsFilePath = new org.apache.hadoop.fs.Path(destDir + "/" + seqFile);

            FSDataInputStream in = null;
            Text key = new Text(fileName);

            BytesWritable value = new BytesWritable();
            SequenceFile.Writer writer = null;

            try {
                byte[] contents = Files.readAllBytes(Paths.get(localPath + "/" + fileName));
                value.set(contents, 0, contents.length);

                if (!fs.exists(hdfsFilePath)) {
                    log.info("insert to " + hdfsFilePath);
                    writer = SequenceFile.createWriter(
                            conf
                            , SequenceFile.Writer.file(hdfsFilePath)
                            , SequenceFile.Writer.keyClass(key.getClass())
                            , SequenceFile.Writer.valueClass(value.getClass())
                    );
                } else {
                    log.info("append to " + hdfsFilePath);
                    writer = SequenceFile.createWriter(
                            conf
                            , SequenceFile.Writer.file(hdfsFilePath)
                            , SequenceFile.Writer.keyClass(key.getClass())
                            , SequenceFile.Writer.valueClass(value.getClass())
                            , SequenceFile.Writer.appendIfExists(true)
                    );
                }
                writer.append(key, value);

            } finally {
                if (writer != null) {
                    writer.close();
                }
            }

            SequenceFile.Reader reader = null;
            try {
                reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(hdfsFilePath));

                while (reader.next(key, value)) {
                    System.out.println(key/* + "\t" + value*/);
                }
            } finally {
                if (reader != null) {
                    reader.close();
                }
            }
        } catch (Exception e) {
            throw new IOException(String.format("Error copy from %s to HDFS %s. Error: %s", localPath + "/" + fileName, destDir + "/" + seqFile, e.getMessage()));
        }
    }

}

Этот код работает. Файл последовательности будет создан. Ключ повторения SequenceFile.Reader из созданного файла последовательности. Но где некоторые вопросы:

  1. Файл не виден в hdfs. Например, команда

    hdfs dfs -ls / real_dir (где путь real_dir из переменной destDir)

ничего не возвращает.

Когда я отправляю следующий файл для copyinf, fs.exists (hdfsFilePath) всегда возвращает false. Итак, файл последовательности воссоздан. Но я хочу добавить новый ключ и значение к этому.

Спасибо за ваши советы.

...