Apache Gora Reducer для многостолового вывода с Hbase - PullRequest
0 голосов
/ 15 октября 2019

У меня есть небольшие данные в таблице Hbase, просканированные через Nutch. Мы использовали Apache Gora в качестве ORM. Я нашел много примеров (mapreduce) для обработки данных в одной таблице в Hbase. Но моя проблема в том, что мне нужно скопировать данные в несколько таблиц (в редукторе). Без Горы существует какое-то руководство, например, этот вопрос и т. Д. Но как это сделать для моего случая.

1 Ответ

0 голосов
/ 15 октября 2019

Я никогда не делал то, что вы просите, но вы можете увидеть ответ в разделе Учебное пособие Гора «Построение работы» . Там есть пример конфигурации редуктора, который гласит:

/* Mappers are initialized with GoraMapper.initMapper() or 
 * GoraInputFormat.setInput()*/
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
    , LogAnalyticsMapper.class, true);

/* Reducers are initialized with GoraReducer#initReducer().
 * If the output is not to be persisted via Gora, any reducer 
 * can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

Тогда вместо использования GoraReducer.initReducerJob() вы можете просто настроить свой собственный редуктор, , как сказано по вашей ссылке (если этоправильный ответ) :

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
    , LogAnalyticsMapper.class, true);
job.setOutputFormatClass(MultiTableOutputFormat.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration());

Знайте, что в предыдущем примере маппер испускает (TextLong, LongWritable) значения ключа, поэтому ваш редуктор будет выглядеть примерно так: по ссылке, которую вы написали и ответ :

public class MyReducer extends TableReducer<TextLong, LongWritable, Put> {

    private static final Logger logger = Logger.getLogger( MyReducer.class );

    @SuppressWarnings( "deprecation" )
    @Override
    protected void reduce( TextLong key, Iterable<LongWritable> data, Context context ) throws IOException, InterruptedException {
        logger.info( "Working on ---> " + key.toString() );
        for ( Result res : data ) {
            Put put = new Put( res.getRow() );
            KeyValue[] raw = res.raw();
            for ( KeyValue kv : raw ) {
                put.add( kv );
            }

        ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes("tableName"));
        context.write(key, put);    

        }
    }
}

Опять же, я никогда этого не делал ... так что, может быть, не работает: \

...