Hadoop / MapReduce: чтение и запись классов, сгенерированных из DDL - PullRequest
1 голос
/ 17 мая 2010

Может ли кто-нибудь пройтись по основному рабочему процессу чтения и записи данных с классами, сгенерированными из DDL?

Я определил некоторые структурные записи, используя DDL. Например:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

Я скомпилировал это, чтобы получить класс Customer, и включил его в свой проект. Я легко вижу, как использовать это как ввод и вывод для картографов и редукторов (сгенерированный класс реализует Writable), но не как читать и записывать его в файл.

JavaDoc для пакета org.apache.hadoop.record говорит о сериализации этих записей в двоичном, CSV или XML-формате. Как мне на самом деле это сделать? Скажем, мой редуктор создает ключи IntWritable и значения Customer. Какой выходной формат я использую, чтобы записать результат в формате CSV? Какой InputFormat я бы использовал для чтения полученных файлов позже, если бы я хотел выполнить анализ над ними?

1 Ответ

1 голос
/ 19 мая 2010

Хорошо, я думаю, я понял это. Я не уверен, что это самый простой способ, поэтому, пожалуйста, исправьте меня, если вы знаете более простой рабочий процесс.

Каждый класс, сгенерированный из DDL, реализует интерфейс Record и, следовательно, предоставляет два метода:

сериализация (RecordOutput out) для записи десериализация (вход записи в) для чтения

RecordOutput и RecordInput - это служебные интерфейсы, предоставляемые в пакете org.apache.hadoop.record . Существует несколько реализаций (например, XMLRecordOutput , BinaryRecordOutput , CSVRecordOutput )

Насколько я знаю, вы должны реализовать свои собственные OutputFormat или InputFormat для их использования. Это довольно легко сделать.

Например, OutputFormat, о котором я говорил в исходном вопросе (тот, который записывает целочисленные ключи и значения Customer в формате CSV), будет реализован так:


  private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
  {

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
      JobConf job,
      String name,
      Progressable progress)
    throws IOException {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new CustomerRecordWriter(fileOut);
    }   

    protected static class CustomerRecordWriter 
      implements RecordWriter<IntWritable, Customer> 
    {

      protected DataOutputStream outStream ;

      public AnchorRecordWriter(DataOutputStream out) {
        this.outStream = out ; 
      }

      public synchronized void write(IntWritable key, Customer value) throws IOException {

        CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
        csvOutput.writeInteger(key.get(), "id") ;
        value.serialize(csvOutput) ; 
      }

      public synchronized void close(Reporter reporter) throws IOException {
        outStream.close();
      }
    }
  }

Создание InputFormat во многом аналогично. Поскольку формат csv - это одна запись на строку, мы можем использовать LineRecordReader для выполнения большей части работы.



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {

  public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job,
    Reporter reporter)
  throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new CustomerRecordReader(job, (FileSplit) genericSplit);
  }

  private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {

    private LineRecordReader lrr ;

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{
      this.lrr = new LineRecordReader(job, split);    
    }

    public IntWritable createKey() {
      return new IntWritable();
    }

    public Customer createValue() {
      return new Customer();
    }

    public synchronized boolean next(IntWritable key, Customer value)
    throws IOException {

      LongWritable offset = new LongWritable() ;
      Text line = new Text() ;

      if (!lrr.next(offset, line))
        return false ;

      CsvRecordInput cri = new CsvRecordInput(new      
        ByteArrayInputStream(line.toString().getBytes())) ;
      key.set(cri.readInt("id")) ;
      value.deserialize(cri) ;

      return true ;
    }

    public float getProgress() {
      return lrr.getProgress() ;
    }

    public synchronized long getPos() throws IOException {
      return lrr.getPos() ;
    }

    public synchronized void close() throws IOException {
      lrr.close();
    }
  }
}

...