Запись в HBase в MapReduce с использованием MultipleOutputs - PullRequest
4 голосов
/ 12 мая 2011

В настоящее время у меня есть задание MapReduce, которое использует MultipleOutputs для отправки данных в несколько мест HDFS.После этого я использую клиентские вызовы HBase (вне MR), чтобы добавить некоторые из тех же элементов в несколько таблиц HBase.Было бы неплохо добавить выходы HBase как просто дополнительные MultipleOutputs, используя TableOutputFormat.Таким образом, я бы распространял свою обработку HBase.

Проблема в том, что я не могу заставить это работать.Кто-нибудь когда-либо использовал TableOutputFormat в MultipleOutputs ...?С несколькими выходами HBase?

в основном, я настраиваю свои коллекторы, как это ....

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

Кажется, это следует общей идее написания hbase.

Часть проблемы, когда я ее набираю, может быть больше в определении задания.Похоже, что MR (и Hbase) хотят глобальный набор параметров, например ....

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

для предоставления имени таблицы.Беда в том, у меня есть две таблицы ....

Есть идеи?

Спасибо

Ответы [ 3 ]

4 голосов
/ 13 мая 2011

Я поместил данные в HBase 3 разными способами.Наиболее эффективным (и распространенным) было использование класса HFileOutputFormat.

Я настроил задание следующим образом ... (Обратите внимание, что это отредактировано из фактического кода, но основное содержимое остается)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

Как мы видим, мой класс Mapper называется HiveToHBaseMapper - Красиво и оригинально.:) Вот (опять-таки, грубое) определение этого

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

Я не знаю, можете ли вы использовать это, чтобы вписать его в MultipleOutputs или нужно создать новую работу MR.Это лучший способ получить данные в HBase.:)

Мы надеемся, что это поможет вам найти правильное решение.

1 голос
/ 13 мая 2011

Так что, по-видимому, это невозможно со старыми пакетами mapred. В наборе пакетов mapreduce есть новый OutputFormat, но я не хочу сейчас переходить к этому. Итак, мне придется написать несколько MR-заданий.

1 голос
/ 13 мая 2011

По моему опыту, лучший подход - это просто поместить данные в таблицу hbase, как только они у вас появятся (если вы не выполняете массовую загрузку данных).Если у вас есть данные, доступные в задании на карте, то это лучшее время, чтобы отправить их в hbase.Если у вас нет данных до задачи сокращения, добавьте туда push-копию hbase.Пока вы не знаете, что HBase является вашим узким местом, пусть HBase беспокоится о проблемах с кэшированием.

...