Я хочу сканировать большое количество данных (запросы на основе диапазона), что я могу сделать во время записи данных, чтобы ускорить сканирование? - PullRequest
3 голосов
/ 08 декабря 2011

У меня есть миллиард строк в базе данных. Я хочу сканировать миллион строк одновременно.Каковы лучшие методы оптимизации , которые я могу сделать, чтобы сделать это сканирование максимально быстрым.

1 Ответ

1 голос
/ 31 января 2012

У нас похожая проблема, нам нужно сканировать миллионы строк по ключам, и мы использовали для этого методы сокращения карт.Для этого не существует стандартного решения, поэтому мы пишем собственный формат ввода, который расширяет InputFormat<ImmutableBytesWritable, Result>.Существует описание, как мы это сделали.

Сначала вам нужно создать разбиение, чтобы ключи шли к машине, на которой расположен регион, в котором он находится:

public List<InputSplit> getSplits(JobContext context) throws IOException {
    context.getConfiguration();

    //read key for scan
    byte[][] filterKeys = readFilterKeys(context);

    if (table == null) {
        throw new IOException("No table was provided.");
    }

    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
    if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
        throw new IOException("Expecting at least one region.");
    }

    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
    for (int i = 0; i < keys.getFirst().length; i++) {
        //get key for current region 
        //it should lying between start and end key of region 
        byte[][] regionKeys =
                getRegionKeys(keys.getFirst()[i], keys.getSecond()[i],filterKeys);
        if (regionKeys == null) {
            continue;
        }
        String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
                getServerAddress().getHostname();
        //create a split for region
        InputSplit split = new MultiplyValueSplit(table.getTableName(),
                regionKeys, regionLocation);
        splits.add(split);

    }
    return splits;
}

Класс 'MultiplyValueSplit' содержитинформация о ключах и таблицах

public class MultiplyValueSplit extends InputSplit
    implements Writable, Comparable<MultiplyValueSplit> {

    private byte[] tableName;
    private byte[][] keys;
    private String regionLocation;
}

В методе createRecordReader в классе входного формата 'MultiplyValueReader', который содержит логику создания значения чтения из таблицы.

@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    HTable table = this.getHTable();
    if (table == null) {
        throw new IOException("Cannot create a record reader because of a" +
                " previous error. Please look at the previous logs lines from" +
                " the task's full log for more details.");
    }

    MultiplyValueSplit mSplit = (MultiplyValueSplit) split;
    MultiplyValuesReader mvr = new MultiplyValuesReader();

    mvr.setKeys(mSplit.getKeys());
    mvr.setHTable(table);
    mvr.init();

    return mvr;
}

Class 'MultiplyValuesReader 'содержит логику о том, как читать данные из HTable

public class MultiplyValuesReader 
        extends RecordReader<ImmutableBytesWritable, Result> {
    .......

    @Override
    public ImmutableBytesWritable getCurrentKey() {
        return key;
    }

    @Override
    public Result getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (this.results == null) {
            return false;
        }

        while (this.results != null) {
            if (resultCurrentKey >= results.length) {
                this.results = getNextResults();
                continue;
            }

            if (key == null) key = new ImmutableBytesWritable();
            value = results[resultCurrentKey];
            resultCurrentKey++;

            if (value != null && value.size() > 0) {
                key.set(value.getRow());
                return true;
            }

        }
        return false;
    }

    public float getProgress() {
        // Depends on the total number of tuples
        return (keys.length > 0 ? ((float) currentKey) / keys.length : 0.0f);
    }

    private Result[] getNextResults() throws IOException {
        if (currentKey <= keys.length) {
            return null;
        }

        //using batch for faster scan
        ArrayList<Get> batch = new ArrayList<Get>(BATCH_SIZE);
        for (int i = currentKey; 
             i < Math.min(currentKey + BATCH_SIZE, keys.length); i++) {
            batch.add(new Get(keys[i]));
        }

        currentKey = Math.min(currentKey + BATCH_SIZE, keys.length);
        resultCurrentKey = 0;
        return htable.get(batch);
    }

}

Более подробно вы можете посмотреть исходный код классов TableInputFormat, TableInputFormatBase, TableSplit и TableRecordReader.

...