Хорошо, я думаю, я понял это. Я не уверен, что это самый простой способ, поэтому, пожалуйста, исправьте меня, если вы знаете более простой рабочий процесс.
Каждый класс, сгенерированный из DDL, реализует интерфейс Record и, следовательно, предоставляет два метода:
сериализация (RecordOutput out) для записи
десериализация (вход записи в) для чтения
RecordOutput и RecordInput - это служебные интерфейсы, предоставляемые в пакете org.apache.hadoop.record . Существует несколько реализаций (например, XMLRecordOutput , BinaryRecordOutput , CSVRecordOutput )
Насколько я знаю, вы должны реализовать свои собственные OutputFormat или InputFormat для их использования. Это довольно легко сделать.
Например, OutputFormat, о котором я говорил в исходном вопросе (тот, который записывает целочисленные ключи и значения Customer в формате CSV), будет реализован так:
private static class CustomerOutputFormat
extends TextOutputFormat<IntWritable, Customer>
{
public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new CustomerRecordWriter(fileOut);
}
protected static class CustomerRecordWriter
implements RecordWriter<IntWritable, Customer>
{
protected DataOutputStream outStream ;
public AnchorRecordWriter(DataOutputStream out) {
this.outStream = out ;
}
public synchronized void write(IntWritable key, Customer value) throws IOException {
CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
csvOutput.writeInteger(key.get(), "id") ;
value.serialize(csvOutput) ;
}
public synchronized void close(Reporter reporter) throws IOException {
outStream.close();
}
}
}
Создание InputFormat во многом аналогично. Поскольку формат csv - это одна запись на строку, мы можем использовать LineRecordReader для выполнения большей части работы.
private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {
public RecordReader<IntWritable, Customer> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new CustomerRecordReader(job, (FileSplit) genericSplit);
}
private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {
private LineRecordReader lrr ;
public CustomerRecordReader(Configuration job, FileSplit split)
throws IOException{
this.lrr = new LineRecordReader(job, split);
}
public IntWritable createKey() {
return new IntWritable();
}
public Customer createValue() {
return new Customer();
}
public synchronized boolean next(IntWritable key, Customer value)
throws IOException {
LongWritable offset = new LongWritable() ;
Text line = new Text() ;
if (!lrr.next(offset, line))
return false ;
CsvRecordInput cri = new CsvRecordInput(new
ByteArrayInputStream(line.toString().getBytes())) ;
key.set(cri.readInt("id")) ;
value.deserialize(cri) ;
return true ;
}
public float getProgress() {
return lrr.getProgress() ;
}
public synchronized long getPos() throws IOException {
return lrr.getPos() ;
}
public synchronized void close() throws IOException {
lrr.close();
}
}
}