управляющий итератор в mapreduce - PullRequest
4 голосов
/ 14 августа 2010

Я пытаюсь найти сумму любых заданных точек, используя hadoop. У меня проблема с получением всех значений от данного ключа в одном редукторе.Это выглядит так.

Reducer:

 public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, DoubleWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException {
        Text word = new Text();

        Iterator<IntWritable> tr = values;
        IntWritable v;
        while (tr.hasNext()) {
             v = tr.next();

            Iterator<IntWritable> td = values;
            while (td.hasNext()) {

                IntWritable u = td.next();
                double sum = u+v;
                word.set( u + " + " + v);
                output.collect(word, new DoubleWritable(sum));
            }
        }
    }
}

И я пытаюсь создать две копии переменной Iterator, чтобы я мог просмотреть все значения второго итератора, пока я получаюодно значение из предыдущего итератора (два цикла while выше), но два итератора все время содержат одно и то же значение.

Я не уверен, что это правильный способ, любая помощь действительно приветствуется.

Спасибо,

Цегай

Ответы [ 4 ]

29 голосов
/ 13 декабря 2010

Итераторы в редукторе не так просты, как вы думаете.

Проблема в том, что общее количество элементов, которые вы перебираете, может не помещаться в памяти.Это означает, что итератор может читать с диска.Если у вас есть две независимые копии итератора, то одна из них может быть намного впереди другой, что означает, что данные, между которыми указывают точки двух итераторов, не могут быть отброшены.

Для простоты реализации,Hadoop не поддерживает наличие более одного итератора для значений приведения.

Практическое влияние этого состоит в том, что вы не можете пройти один и тот же итератор дважды.Это не хорошо, но это так.Если вы точно знаете, что количество элементов поместится в память, вы можете скопировать все элементы в список, как это предложено MrGomez.Если вы этого не знаете, возможно, вам придется использовать вторичное хранилище.

Лучшим подходом является перепроектирование вашей программы, чтобы вам не требовалось неограниченное хранилище в редукторе.Это может быть немного сложнее, но есть стандартные подходы к проблеме.

Для вашей конкретной задачи у вас есть квадратичный рост размера выхода по сравнению с наибольшим входным набором сокращения.Обычно это действительно плохая идея.В большинстве случаев вам не нужны ВСЕ пары, только самые важные пары.Если вы можете каким-то образом обрезать набор пар, то все готово, и вы сможете удалить ограничение всех пар.

Например, если вы пытаетесь найти 100 пар с наибольшимсуммой для каждого набора сокращений вы можете сохранить приоритетную очередь с 100 самыми большими входными данными, замеченными до сих пор, и приоритетную очередь с 100 самыми большими суммами, замеченными до сих пор.Для каждого нового ввода вы можете сформировать сумму с самыми большими 100 числами, которые когда-либо видели, и попытаться вставить эти суммы во вторую очередь.Наконец, вы должны вставить новый ввод в первую очередь и обрезать обе очереди до 100 элементов, удалив наименьшие значения (при необходимости).В методе close при редуцировании вы должны сбросить очередь с приоритетами.Этот подход гарантирует, что вам нужно только минимум (n ^ 2, 200) элементов хранилища, что позволяет избежать проблемы n ^ 2 и избежать двойного прохода через вход, сохраняя 100 самых больших видимых элементов, а не все видимые элементы.

12 голосов
/ 14 августа 2010

Я не уверен, что именно вы пытаетесь достичь, но я знаю это очень много: поведение итераторов Hadoop немного странно.Вызов Iterator.next () всегда возвращает экземпляр SAME EXACT IntWritable, а содержимое этого экземпляра заменяется следующим значением.Поэтому удержание ссылки на IntWritable через вызовы Iterator.next () почти всегда является ошибкой.Я считаю, что такое поведение предназначено для уменьшения объема создания объекта и накладных расходов GC.

Один из способов обойти это - использовать WritableUtils.clone () для клонирования экземпляра, который вы пытаетесь сохранить при вызовах.в Iterator.next ().

2 голосов
/ 10 февраля 2012

Чтобы скопировать итератор, вы не можете назначить итератор новой переменной.Вы должны "клонировать" итератор в новую переменную класса итератора.Когда итератору A назначают другую переменную итератора B, две переменные итератора указывают одинаковые данные.

1 голос
/ 11 декабря 2010

Исходя из вашего предыдущего вопроса, вы, кажется, застряли на описанной пиколбо проблеме итератора. Формулировка вашего редуктора также указывает на то, что вы отказались от предложенных им алгоритмов для наивногоподход ... который будет работать, хотя и неоптимально.

Позвольте мне немного очистить ваш код с моим ответом:

// Making use of Hadoop's Iterable reduce, assuming it's available to you
//
//  The method signature is:
//
//  protected void reduce(KEYIN key, java.lang.Iterable<VALUEIN> values, 
//   org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context 
//   context) throws java.io.IOException, java.lang.InterruptedException
//
public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {

    // I assume you declare this here to save on GC
    Text outKey = new Text();
    IntWritable outVal = new IntWritable();

    // Since you've forgone piccolbo's approach, you'll need to maintain the
    // data structure yourself. Since we always walk the list forward and
    // wish to optimize the insertion speed, we use LinkedList. Calls to
    // IntWritable.get() will give us an int, which we then copy into our list.
    LinkedList<Integer> valueList = new LinkedList<Integer>();

    // Here's why we changed the method signature: use of Java's for-each
    for (IntWritable iw: values) {
        valueList.add(iw.get());
    }

    // And from here, we construct each value pair as an O(n^2) operation
    for (Integer i: valueList) {
        for (Integer j: valueList) {
            outKey.set(i + " + " + j);
            outVal.set(i + j);
            context.write(outKey, outVal);
        }
    }

    // Do note: I've also changed your return value from DoubleWritable to
    // IntWritable, since you should always be performing integer operations
    // as defined. If your points are Double, supply DoubleWritable instead.
}

Это работает, но делает несколько предположений, которые ограничивают производительностьпри построении матрицы расстояний, включая необходимость выполнения комбинации в одной операции уменьшения.

Рассмотрите подход Пикколбо , если заранее знаете размер и размерность ваших входных данных.Это должно быть доступно, в худшем случае, путем обхода строк ввода за линейное время.

(См. этот поток , почему мы не можем реализовать это как прямой итератор.)

...