Я создаю сервис весенней загрузки для копирования файлов из локальной системы в hdfs в файл последовательности. У меня есть следующий код:
public static void copyLocalFileToHdfs2(String hdfsUri, Configuration conf, String localPath, String destDir, String fileName) throws Exception {
String seqFile = "seqfile";
try {
FileSystem fs = FileSystem.get(URI.create(hdfsUri), conf);
org.apache.hadoop.fs.Path hdfsWritePath = new org.apache.hadoop.fs.Path(destDir);
if (!fs.exists(hdfsWritePath))
fs.mkdirs(hdfsWritePath);
org.apache.hadoop.fs.Path localFilePath = new org.apache.hadoop.fs.Path(localPath + "/" + fileName);
org.apache.hadoop.fs.Path hdfsFilePath = new org.apache.hadoop.fs.Path(destDir + "/" + seqFile);
FSDataInputStream in = null;
Text key = new Text(fileName);
BytesWritable value = new BytesWritable();
SequenceFile.Writer writer = null;
try {
byte[] contents = Files.readAllBytes(Paths.get(localPath + "/" + fileName));
value.set(contents, 0, contents.length);
if (!fs.exists(hdfsFilePath)) {
log.info("insert to " + hdfsFilePath);
writer = SequenceFile.createWriter(
conf
, SequenceFile.Writer.file(hdfsFilePath)
, SequenceFile.Writer.keyClass(key.getClass())
, SequenceFile.Writer.valueClass(value.getClass())
);
} else {
log.info("append to " + hdfsFilePath);
writer = SequenceFile.createWriter(
conf
, SequenceFile.Writer.file(hdfsFilePath)
, SequenceFile.Writer.keyClass(key.getClass())
, SequenceFile.Writer.valueClass(value.getClass())
, SequenceFile.Writer.appendIfExists(true)
);
}
writer.append(key, value);
} finally {
if (writer != null) {
writer.close();
}
}
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(hdfsFilePath));
while (reader.next(key, value)) {
System.out.println(key/* + "\t" + value*/);
}
} finally {
if (reader != null) {
reader.close();
}
}
} catch (Exception e) {
throw new IOException(String.format("Error copy from %s to HDFS %s. Error: %s", localPath + "/" + fileName, destDir + "/" + seqFile, e.getMessage()));
}
}
}
Этот код работает. Файл последовательности будет создан. Ключ повторения SequenceFile.Reader из созданного файла последовательности. Но где некоторые вопросы:
Файл не виден в hdfs. Например, команда
hdfs dfs -ls / real_dir (где путь real_dir из переменной destDir)
ничего не возвращает.
Когда я отправляю следующий файл для copyinf, fs.exists (hdfsFilePath) всегда возвращает false. Итак, файл последовательности воссоздан. Но я хочу добавить новый ключ и значение к этому.
Спасибо за ваши советы.