У нас похожая проблема, нам нужно сканировать миллионы строк по ключам, и мы использовали для этого методы сокращения карт.Для этого не существует стандартного решения, поэтому мы пишем собственный формат ввода, который расширяет InputFormat<ImmutableBytesWritable, Result>
.Существует описание, как мы это сделали.
Сначала вам нужно создать разбиение, чтобы ключи шли к машине, на которой расположен регион, в котором он находится:
public List<InputSplit> getSplits(JobContext context) throws IOException {
context.getConfiguration();
//read key for scan
byte[][] filterKeys = readFilterKeys(context);
if (table == null) {
throw new IOException("No table was provided.");
}
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region.");
}
List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
//get key for current region
//it should lying between start and end key of region
byte[][] regionKeys =
getRegionKeys(keys.getFirst()[i], keys.getSecond()[i],filterKeys);
if (regionKeys == null) {
continue;
}
String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
getServerAddress().getHostname();
//create a split for region
InputSplit split = new MultiplyValueSplit(table.getTableName(),
regionKeys, regionLocation);
splits.add(split);
}
return splits;
}
Класс 'MultiplyValueSplit' содержитинформация о ключах и таблицах
public class MultiplyValueSplit extends InputSplit
implements Writable, Comparable<MultiplyValueSplit> {
private byte[] tableName;
private byte[][] keys;
private String regionLocation;
}
В методе createRecordReader
в классе входного формата 'MultiplyValueReader', который содержит логику создания значения чтения из таблицы.
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
HTable table = this.getHTable();
if (table == null) {
throw new IOException("Cannot create a record reader because of a" +
" previous error. Please look at the previous logs lines from" +
" the task's full log for more details.");
}
MultiplyValueSplit mSplit = (MultiplyValueSplit) split;
MultiplyValuesReader mvr = new MultiplyValuesReader();
mvr.setKeys(mSplit.getKeys());
mvr.setHTable(table);
mvr.init();
return mvr;
}
Class 'MultiplyValuesReader 'содержит логику о том, как читать данные из HTable
public class MultiplyValuesReader
extends RecordReader<ImmutableBytesWritable, Result> {
.......
@Override
public ImmutableBytesWritable getCurrentKey() {
return key;
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (this.results == null) {
return false;
}
while (this.results != null) {
if (resultCurrentKey >= results.length) {
this.results = getNextResults();
continue;
}
if (key == null) key = new ImmutableBytesWritable();
value = results[resultCurrentKey];
resultCurrentKey++;
if (value != null && value.size() > 0) {
key.set(value.getRow());
return true;
}
}
return false;
}
public float getProgress() {
// Depends on the total number of tuples
return (keys.length > 0 ? ((float) currentKey) / keys.length : 0.0f);
}
private Result[] getNextResults() throws IOException {
if (currentKey <= keys.length) {
return null;
}
//using batch for faster scan
ArrayList<Get> batch = new ArrayList<Get>(BATCH_SIZE);
for (int i = currentKey;
i < Math.min(currentKey + BATCH_SIZE, keys.length); i++) {
batch.add(new Get(keys[i]));
}
currentKey = Math.min(currentKey + BATCH_SIZE, keys.length);
resultCurrentKey = 0;
return htable.get(batch);
}
}
Более подробно вы можете посмотреть исходный код классов TableInputFormat
, TableInputFormatBase
, TableSplit
и TableRecordReader
.