Я никогда не делал то, что вы просите, но вы можете увидеть ответ в разделе Учебное пособие Гора «Построение работы» . Там есть пример конфигурации редуктора, который гласит:
/* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
, LogAnalyticsMapper.class, true);
/* Reducers are initialized with GoraReducer#initReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
Тогда вместо использования GoraReducer.initReducerJob()
вы можете просто настроить свой собственный редуктор, , как сказано по вашей ссылке (если этоправильный ответ) :
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
, LogAnalyticsMapper.class, true);
job.setOutputFormatClass(MultiTableOutputFormat.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration());
Знайте, что в предыдущем примере маппер испускает (TextLong, LongWritable)
значения ключа, поэтому ваш редуктор будет выглядеть примерно так: по ссылке, которую вы написали и ответ :
public class MyReducer extends TableReducer<TextLong, LongWritable, Put> {
private static final Logger logger = Logger.getLogger( MyReducer.class );
@SuppressWarnings( "deprecation" )
@Override
protected void reduce( TextLong key, Iterable<LongWritable> data, Context context ) throws IOException, InterruptedException {
logger.info( "Working on ---> " + key.toString() );
for ( Result res : data ) {
Put put = new Put( res.getRow() );
KeyValue[] raw = res.raw();
for ( KeyValue kv : raw ) {
put.add( kv );
}
ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes("tableName"));
context.write(key, put);
}
}
}
Опять же, я никогда этого не делал ... так что, может быть, не работает: \